skip to Main Content

I’m trying to submit a Flink job created with Scala 2.11 which uses the Twitter streaming API in a local Flink cluster by running in the command line:

flink run -c org.myClass C:pathtojarFile.jar

And get the following error:

2019-06-09 23:40:47,758 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2019-06-09 23:40:47,762 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
    at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/twitter/TwitterSource$EndpointInitializer
    at msciss.TwitterHashtagCounter.main(TwitterHashtagCounter.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
    at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
    ... 7 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.twitter.TwitterSource$EndpointInitializer
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    ... 15 more

However in the program I’ve set the TwitterSource library in the build.sbt s below:

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-twitter" % flinkVersion,
  "commons-logging" % "commons-logging" % "1.2",
  "org.apache.logging.log4j" % "log4j-core" % "2.11.2",
  "org.apache.commons" % "commons-text" % "1.6")

The app also runs without issues in IntelliJ and sbt buld / package do not produce any problems. How can I resolve this issue?

2

Answers


  1. You need to use sbt assembly plugin or any other plugin that allows to create a Fat Jar (Uber Jar). Currently Your package does not contain external libraries and flink connectors are considered external libraries since they are not included in the standard binary build. Thus You are actually creating the package you are creating does not contain twitter-connector, but neither does the Flink itself, that is why You are getting the ClassNotFoundException.

    Login or Signup to reply.
  2. I have a fat (uber) jar. When I explode it, I can see that the connector dependencies as wel. Yet, when I submit the jar as a flink job, I get a classnotfoundexception.

    What could be the reason?

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search