I’m trying to submit a Flink job created with Scala 2.11 which uses the Twitter streaming API in a local Flink cluster by running in the command line:
flink run -c org.myClass C:pathtojarFile.jar
And get the following error:
2019-06-09 23:40:47,758 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2019-06-09 23:40:47,762 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/twitter/TwitterSource$EndpointInitializer
at msciss.TwitterHashtagCounter.main(TwitterHashtagCounter.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.twitter.TwitterSource$EndpointInitializer
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 15 more
However in the program I’ve set the TwitterSource library in the build.sbt s below:
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-twitter" % flinkVersion,
"commons-logging" % "commons-logging" % "1.2",
"org.apache.logging.log4j" % "log4j-core" % "2.11.2",
"org.apache.commons" % "commons-text" % "1.6")
The app also runs without issues in IntelliJ and sbt buld / package do not produce any problems. How can I resolve this issue?
2
Answers
You need to use
sbt assembly
plugin or any other plugin that allows to create a Fat Jar (Uber Jar). Currently Your package does not contain external libraries and flink connectors are considered external libraries since they are not included in the standard binary build. Thus You are actually creating the package you are creating does not containtwitter-connector
, but neither does the Flink itself, that is why You are getting theClassNotFoundException
.I have a fat (uber) jar. When I explode it, I can see that the connector dependencies as wel. Yet, when I submit the jar as a flink job, I get a classnotfoundexception.
What could be the reason?