skip to Main Content

I want to stream data using spark from Kafka topic in Horton works. I have started the zookeeper and Kafka server. Then I have successfully built a maven project in Eclipse with all the streaming dependencies. Given below is the pom file.

POM.XML

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>spark-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
    <repository>
        <id>hortonworks</id>
        <name>hortonworks repo</name>
        <url>http://repo.hortonworks.com/content/repositories/releases/</url>
    </repository>
</repositories>
<dependencies>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
    <defaultGoal>package</defaultGoal>
    <resources>
        <resource>
            <directory>src/main/resources</directory>
            <filtering>true</filtering>
        </resource>
        <resource>
            <directory>src/test/resources</directory>
            <filtering>true</filtering>
        </resource>
    </resources>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <configuration>
                <encoding>UTF-8</encoding>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>copy-resources</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <configuration>
                <recompileMode>incremental</recompileMode>
                <args>
                    <arg>-target:jvm-1.7</arg>
                </args>
                <javacArgs>
                    <javacArg>-source</javacArg>
                    <javacArg>1.7</javacArg>
                    <javacArg>-target</javacArg>
                    <javacArg>1.7</javacArg>
                </javacArgs>
            </configuration>
            <executions>
                <execution>
                    <id>scala-compile</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>

            <executions>
                <execution>
                    <phase>compile</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>


        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <finalName>uber-${project.artifactId}-${project.version}</finalName>
            </configuration>
        </plugin>

    </plugins>

</build>

STREAMING.SCALA

package com.jesperdj.example

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
object HelloScala  {
def main(args:Array[String])
{
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver");
val ssc = new StreamingContext(conf, Seconds(10))

val kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181","spark-group", Map("streaming" -> 5))
//need to change the topic name and the port number accordingly

kafkaStream.print()  //prints the stream of data received


ssc.start()
ssc.awaitTermination()
}
}

On running the scala program in Eclipse it shows the following error,

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/05/03 12:15:18 INFO SparkContext: Running Spark version 1.6.0
18/05/03 12:15:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/05/03 12:15:19 INFO SecurityManager: Changing view acls to: u60888
18/05/03 12:15:19 INFO SecurityManager: Changing modify acls to: u60888
18/05/03 12:15:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(u60888); users with modify permissions: Set(u60888)
18/05/03 12:15:20 INFO Utils: Successfully started service 'sparkDriver' on port 59787.
18/05/03 12:15:21 INFO Slf4jLogger: Slf4jLogger started
18/05/03 12:15:21 INFO Remoting: Starting remoting
18/05/03 12:15:22 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:59800]
18/05/03 12:15:22 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 59800.
18/05/03 12:15:22 INFO SparkEnv: Registering MapOutputTracker
18/05/03 12:15:22 INFO SparkEnv: Registering BlockManagerMaster
18/05/03 12:15:22 INFO DiskBlockManager: Created local directory at C:Usersu60888AppDataLocalTemp19blockmgr-eda85d0d-70f0-48c8-8910-9a883bacdd38
18/05/03 12:15:22 INFO MemoryStore: MemoryStore started with capacity 4.4 GB
18/05/03 12:15:22 INFO SparkEnv: Registering OutputCommitCoordinator
18/05/03 12:15:23 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/05/03 12:15:23 INFO SparkUI: Started SparkUI at http://10.100.170.138:4040
18/05/03 12:15:23 INFO Executor: Starting executor ID driver on host localhost
18/05/03 12:15:23 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59829.
18/05/03 12:15:23 INFO NettyBlockTransferService: Server created on 59829
18/05/03 12:15:23 INFO BlockManagerMaster: Trying to register BlockManager
18/05/03 12:15:23 INFO BlockManagerMasterEndpoint: Registering block manager localhost:59829 with 4.4 GB RAM, BlockManagerId(driver, localhost, 59829)
18/05/03 12:15:23 INFO BlockManagerMaster: Registered BlockManager
18/05/03 12:15:25 INFO ReceiverTracker: Starting 1 receivers
18/05/03 12:15:25 INFO ReceiverTracker: ReceiverTracker started
18/05/03 12:15:25 INFO ForEachDStream: metadataCleanupDelay = -1
18/05/03 12:15:25 INFO KafkaInputDStream: metadataCleanupDelay = -1
18/05/03 12:15:25 INFO KafkaInputDStream: Slide time = 10000 ms
18/05/03 12:15:25 INFO KafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
18/05/03 12:15:25 INFO KafkaInputDStream: Checkpoint interval = null
18/05/03 12:15:25 INFO KafkaInputDStream: Remember duration = 10000 ms
18/05/03 12:15:25 INFO KafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.KafkaInputDStream@269e1b99
18/05/03 12:15:25 INFO ForEachDStream: Slide time = 10000 ms
18/05/03 12:15:25 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
18/05/03 12:15:25 INFO ForEachDStream: Checkpoint interval = null
18/05/03 12:15:25 INFO ForEachDStream: Remember duration = 10000 ms
18/05/03 12:15:25 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7f09a1b9
18/05/03 12:15:25 INFO RecurringTimer: Started timer for JobGenerator at time 1525329930000
18/05/03 12:15:25 INFO JobGenerator: Started JobGenerator at 1525329930000 ms
18/05/03 12:15:25 INFO JobScheduler: Started JobScheduler
18/05/03 12:15:25 INFO StreamingContext: StreamingContext started
18/05/03 12:15:26 INFO ReceiverTracker: Receiver 0 started
18/05/03 12:15:26 INFO DAGScheduler: Got job 0 (start at HelloScala.scala:20) with 1 output partitions
18/05/03 12:15:26 INFO DAGScheduler: Final stage: ResultStage 0 (start at HelloScala.scala:20)
18/05/03 12:15:26 INFO DAGScheduler: Parents of final stage: List()
18/05/03 12:15:26 INFO DAGScheduler: Missing parents: List()
18/05/03 12:15:26 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588), which has no missing parents
18/05/03 12:15:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 34.1 KB, free 34.1 KB)
18/05/03 12:15:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 11.0 KB, free 45.1 KB)
18/05/03 12:15:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:59829 (size: 11.0 KB, free: 4.4 GB)
18/05/03 12:15:26 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
18/05/03 12:15:26 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:588)
18/05/03 12:15:26 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/05/03 12:15:26 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2988 bytes)
18/05/03 12:15:26 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/05/03 12:15:26 INFO RecurringTimer: Started timer for BlockGenerator at time 1525329926800
18/05/03 12:15:26 INFO BlockGenerator: Started BlockGenerator
18/05/03 12:15:26 INFO BlockGenerator: Started block pushing thread
18/05/03 12:15:26 INFO ReceiverTracker: Registered receiver for stream 0 from 10.100.170.138:59787
18/05/03 12:15:26 INFO ReceiverSupervisorImpl: Starting receiver
18/05/03 12:15:26 INFO KafkaReceiver: Starting Kafka Consumer Stream with group: spark-group
18/05/03 12:15:26 INFO KafkaReceiver: Connecting to Zookeeper: localhost:2181
18/05/03 12:15:26 INFO VerifiableProperties: Verifying properties
18/05/03 12:15:26 INFO VerifiableProperties: Property group.id is overridden to spark-group
18/05/03 12:15:26 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181
18/05/03 12:15:26 INFO VerifiableProperties: Property zookeeper.connection.timeout.ms is overridden to 10000
18/05/03 12:15:26 INFO ZookeeperConsumerConnector: [spark-group_trvhsdapv036-1525329926880-ccdd31c7], Connecting to zookeeper instance at localhost:2181
18/05/03 12:15:26 INFO ZkEventThread: Starting ZkClient event thread.
18/05/03 12:15:26 INFO ZooKeeper: Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
18/05/03 12:15:26 INFO ZooKeeper: Client environment:host.name=trvhsdapv036.ustr.com
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.version=1.8.0
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.vendor=Oracle Corporation
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.home=C:Program FilesJavajdk1.8.0jre
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.class.path=U:KafkaWorkspace2scala-maven-example-masterscala-maven-example-mastertargetclasses;U:KafkaWorkspace2scala-maven-example-masterscala-maven-example-mastertargettest-classes;U:mavenorgapachesparkspark-streaming-kafka_2.101.3.0spark-streaming-kafka_2.10-1.3.0.jar;U:mavenorgapachekafkakafka_2.10.8.1.1kafka_2.10-0.8.1.1.jar;U:mavencomyammermetricsmetrics-core2.2.0metrics-core-2.2.0.jar;U:mavenorgxerialsnappysnappy-java1.0.5snappy-java-1.0.5.jar;U:mavencom101teczkclient.3zkclient-0.3.jar;U:mavenorgspark-projectsparkunused1.0.0unused-1.0.0.jar;U:mavenorgapachesparkspark-streaming_2.101.6.0spark-streaming_2.10-1.6.0.jar;U:mavenorgapachesparkspark-core_2.101.6.0spark-core_2.10-1.6.0.jar;U:mavenorgapacheavroavro-mapred1.7.7avro-mapred-1.7.7-hadoop2.jar;U:mavenorgapacheavroavro-ipc1.7.7avro-ipc-1.7.7.jar;U:mavenorgapacheavroavro1.7.7avro-1.7.7.jar;U:mavenorgapacheavroavro-ipc1.7.7avro-ipc-1.7.7-tests.jar;U:mavenorgcodehausjacksonjackson-core-asl1.9.13jackson-core-asl-1.9.13.jar;U:mavenorgcodehausjacksonjackson-mapper-asl1.9.13jackson-mapper-asl-1.9.13.jar;U:mavencomtwitterchill_2.10.5.0chill_2.10-0.5.0.jar;U:mavencomesotericsoftwarekryokryo2.21kryo-2.21.jar;U:mavencomesotericsoftwarereflectasmreflectasm1.07reflectasm-1.07-shaded.jar;U:mavencomesotericsoftwareminlogminlog1.2minlog-1.2.jar;U:mavenorgobjenesisobjenesis1.2objenesis-1.2.jar;U:mavencomtwitterchill-java.5.0chill-java-0.5.0.jar;U:mavenorgapachexbeanxbean-asm5-shaded4.4xbean-asm5-shaded-4.4.jar;U:mavenorgapachehadoophadoop-client2.2.0hadoop-client-2.2.0.jar;U:mavenorgapachehadoophadoop-common2.2.0hadoop-common-2.2.0.jar;U:mavencommons-clicommons-cli1.2commons-cli-1.2.jar;U:mavenorgapachecommonscommons-math2.1commons-math-2.1.jar;U:mavenxmlencxmlenc.52xmlenc-0.52.jar;U:mavencommons-configurationcommons-configuration1.6commons-configuration-1.6.jar;U:mavencommons-collectionscommons-collections3.2.1commons-collections-3.2.1.jar;U:mavencommons-digestercommons-digester1.8commons-digester-1.8.jar;U:mavencommons-beanutilscommons-beanutils1.7.0commons-beanutils-1.7.0.jar;U:mavencommons-beanutilscommons-beanutils-core1.8.0commons-beanutils-core-1.8.0.jar;U:mavenorgapachehadoophadoop-auth2.2.0hadoop-auth-2.2.0.jar;U:mavenorgapachecommonscommons-compress1.4.1commons-compress-1.4.1.jar;U:mavenorgtukaanixz1.0xz-1.0.jar;U:mavenorgapachehadoophadoop-hdfs2.2.0hadoop-hdfs-2.2.0.jar;U:mavenorgmortbayjettyjetty-util6.1.26jetty-util-6.1.26.jar;U:mavenorgapachehadoophadoop-mapreduce-client-app2.2.0hadoop-mapreduce-client-app-2.2.0.jar;U:mavenorgapachehadoophadoop-mapreduce-client-common2.2.0hadoop-mapreduce-client-common-2.2.0.jar;U:mavenorgapachehadoophadoop-yarn-client2.2.0hadoop-yarn-client-2.2.0.jar;U:mavencomgoogleinjectguice3.0guice-3.0.jar;U:mavenjavaxinjectjavax.inject1javax.inject-1.jar;U:mavenaopallianceaopalliance1.0aopalliance-1.0.jar;U:mavencomsunjerseyjersey-test-frameworkjersey-test-framework-grizzly21.9jersey-test-framework-grizzly2-1.9.jar;U:mavencomsunjerseyjersey-test-frameworkjersey-test-framework-core1.9jersey-test-framework-core-1.9.jar;U:mavenjavaxservletjavax.servlet-api3.0.1javax.servlet-api-3.0.1.jar;U:mavencomsunjerseyjersey-client1.9jersey-client-1.9.jar;U:mavencomsunjerseyjersey-grizzly21.9jersey-grizzly2-1.9.jar;U:mavenorgglassfishgrizzlygrizzly-http2.1.2grizzly-http-2.1.2.jar;U:mavenorgglassfishgrizzlygrizzly-framework2.1.2grizzly-framework-2.1.2.jar;U:mavenorgglassfishgmbalgmbal-api-only3.0.0-b023gmbal-api-only-3.0.0-b023.jar;U:mavenorgglassfishexternalmanagement-api3.0.0-b012management-api-3.0.0-b012.jar;U:mavenorgglassfishgrizzlygrizzly-http-server2.1.2grizzly-http-server-2.1.2.jar;U:mavenorgglassfishgrizzlygrizzly-rcm2.1.2grizzly-rcm-2.1.2.jar;U:mavenorgglassfishgrizzlygrizzly-http-servlet2.1.2grizzly-http-servlet-2.1.2.jar;U:mavenorgglassfishjavax.servlet3.1javax.servlet-3.1.jar;U:mavencomsunjerseyjersey-json1.9jersey-json-1.9.jar;U:mavenorgcodehausjettisonjettison1.1jettison-1.1.jar;U:mavenstaxstax-api1.0.1stax-api-1.0.1.jar;U:mavencomsunxmlbindjaxb-impl2.2.3-1jaxb-impl-2.2.3-1.jar;U:mavenjavaxxmlbindjaxb-api2.2.2jaxb-api-2.2.2.jar;U:mavenjavaxactivationactivation1.1activation-1.1.jar;U:mavenorgcodehausjacksonjackson-jaxrs1.8.3jackson-jaxrs-1.8.3.jar;U:mavenorgcodehausjacksonjackson-xc1.8.3jackson-xc-1.8.3.jar;U:mavencomsunjerseycontribsjersey-guice1.9jersey-guice-1.9.jar;U:mavenorgapachehadoophadoop-yarn-server-common2.2.0hadoop-yarn-server-common-2.2.0.jar;U:mavenorgapachehadoophadoop-mapreduce-client-shuffle2.2.0hadoop-mapreduce-client-shuffle-2.2.0.jar;U:mavenorgapachehadoophadoop-yarn-api2.2.0hadoop-yarn-api-2.2.0.jar;U:mavenorgapachehadoophadoop-mapreduce-client-core2.2.0hadoop-mapreduce-client-core-2.2.0.jar;U:mavenorgapachehadoophadoop-yarn-common2.2.0hadoop-yarn-common-2.2.0.jar;U:mavenorgapachehadoophadoop-mapreduce-client-jobclient2.2.0hadoop-mapreduce-client-jobclient-2.2.0.jar;U:mavenorgapachehadoophadoop-annotations2.2.0hadoop-annotations-2.2.0.jar;U:mavenorgapachesparkspark-launcher_2.101.6.0spark-launcher_2.10-1.6.0.jar;U:mavenorgapachesparkspark-network-common_2.101.6.0spark-network-common_2.10-1.6.0.jar;U:mavenorgapachesparkspark-network-shuffle_2.101.6.0spark-network-shuffle_2.10-1.6.0.jar;U:mavenorgfusesourceleveldbjnileveldbjni-all1.8leveldbjni-all-1.8.jar;U:mavencomfasterxmljacksoncorejackson-annotations2.4.4jackson-annotations-2.4.4.jar;U:mavenorgapachesparkspark-unsafe_2.101.6.0spark-unsafe_2.10-1.6.0.jar;U:mavennetjavadevjets3tjets3t.7.1jets3t-0.7.1.jar;U:mavencommons-codeccommons-codec1.3commons-codec-1.3.jar;U:mavencommons-httpclientcommons-httpclient3.1commons-httpclient-3.1.jar;U:mavenorgapachecuratorcurator-recipes2.4.0curator-recipes-2.4.0.jar;U:mavenorgapachecuratorcurator-framework2.4.0curator-framework-2.4.0.jar;U:mavenorgapachecuratorcurator-client2.4.0curator-client-2.4.0.jar;U:mavenorgapachezookeeperzookeeper3.4.5zookeeper-3.4.5.jar;U:mavenjlinejline.9.94jline-0.9.94.jar;U:mavencomgoogleguavaguava14.0.1guava-14.0.1.jar;U:mavenorgeclipsejettyorbitjavax.servlet3.0.0.v201112011016javax.servlet-3.0.0.v201112011016.jar;U:mavenorgapachecommonscommons-lang33.3.2commons-lang3-3.3.2.jar;U:mavenorgapachecommonscommons-math33.4.1commons-math3-3.4.1.jar;U:mavencomgooglecodefindbugsjsr3051.3.9jsr305-1.3.9.jar;U:mavenorgslf4jslf4j-api1.7.10slf4j-api-1.7.10.jar;U:mavenorgslf4jjul-to-slf4j1.7.10jul-to-slf4j-1.7.10.jar;U:mavenorgslf4jjcl-over-slf4j1.7.10jcl-over-slf4j-1.7.10.jar;U:mavenlog4jlog4j1.2.17log4j-1.2.17.jar;U:mavenorgslf4jslf4j-log4j121.7.10slf4j-log4j12-1.7.10.jar;U:mavencomningcompress-lzf1.0.3compress-lzf-1.0.3.jar;U:mavennetjpountzlz4lz41.3.0lz4-1.3.0.jar;U:mavenorgroaringbitmapRoaringBitmap.5.11RoaringBitmap-0.5.11.jar;U:mavencommons-netcommons-net2.2commons-net-2.2.jar;U:mavencomtypesafeakkaakka-remote_2.102.3.11akka-remote_2.10-2.3.11.jar;U:mavencomtypesafeakkaakka-actor_2.102.3.11akka-actor_2.10-2.3.11.jar;U:mavencomtypesafeconfig1.2.1config-1.2.1.jar;U:mavenionettynetty3.8.0.Finalnetty-3.8.0.Final.jar;U:mavencomgoogleprotobufprotobuf-java2.5.0protobuf-java-2.5.0.jar;U:mavenorguncommonsmathsuncommons-maths1.2.2auncommons-maths-1.2.2a.jar;U:mavencomtypesafeakkaakka-slf4j_2.102.3.11akka-slf4j_2.10-2.3.11.jar;U:mavenorgjson4sjson4s-jackson_2.103.2.10json4s-jackson_2.10-3.2.10.jar;U:mavenorgjson4sjson4s-core_2.103.2.10json4s-core_2.10-3.2.10.jar;U:mavenorgjson4sjson4s-ast_2.103.2.10json4s-ast_2.10-3.2.10.jar;U:mavenorgscala-langscalap2.10.0scalap-2.10.0.jar;U:mavenorgscala-langscala-compiler2.10.0scala-compiler-2.10.0.jar;U:mavencomsunjerseyjersey-server1.9jersey-server-1.9.jar;U:mavenasmasm3.1asm-3.1.jar;U:mavencomsunjerseyjersey-core1.9jersey-core-1.9.jar;U:mavenorgapachemesosmesos.21.1mesos-0.21.1-shaded-protobuf.jar;U:mavenionettynetty-all4.0.29.Finalnetty-all-4.0.29.Final.jar;U:mavencomclearspringanalyticsstream2.7.0stream-2.7.0.jar;U:maveniodropwizardmetricsmetrics-core3.1.2metrics-core-3.1.2.jar;U:maveniodropwizardmetricsmetrics-jvm3.1.2metrics-jvm-3.1.2.jar;U:maveniodropwizardmetricsmetrics-json3.1.2metrics-json-3.1.2.jar;U:maveniodropwizardmetricsmetrics-graphite3.1.2metrics-graphite-3.1.2.jar;U:mavencomfasterxmljacksoncorejackson-databind2.4.4jackson-databind-2.4.4.jar;U:mavencomfasterxmljacksoncorejackson-core2.4.4jackson-core-2.4.4.jar;U:mavencomfasterxmljacksonmodulejackson-module-scala_2.102.4.4jackson-module-scala_2.10-2.4.4.jar;U:mavenorgscala-langscala-reflect2.10.4scala-reflect-2.10.4.jar;U:mavencomthoughtworksparanamerparanamer2.6paranamer-2.6.jar;U:mavenorgapacheivyivy2.4.0ivy-2.4.0.jar;U:mavenorooro2.0.8oro-2.0.8.jar;U:mavenorgtachyonprojecttachyon-client.8.2tachyon-client-0.8.2.jar;U:mavencommons-langcommons-lang2.4commons-lang-2.4.jar;U:mavencommons-iocommons-io2.4commons-io-2.4.jar;U:mavenorgtachyonprojecttachyon-underfs-hdfs.8.2tachyon-underfs-hdfs-0.8.2.jar;U:mavenorgtachyonprojecttachyon-underfs-s3.8.2tachyon-underfs-s3-0.8.2.jar;U:mavenorgtachyonprojecttachyon-underfs-local.8.2tachyon-underfs-local-0.8.2.jar;U:mavennetrazorvinepyrolite4.9pyrolite-4.9.jar;U:mavennetsfpy4jpy4j.9py4j-0.9.jar
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.library.path=C:Program FilesJavajdk1.8.0bin;C:WindowsSunJavabin;C:Windowssystem32;C:Windows;C:ProgramDataOracleJavajavapath;C:Windowssystem32;C:Windows;C:WindowsSystem32Wbem;C:WindowsSystem32WindowsPowerShellv1.0;C:Program FilesCitrixVirtual Desktop Agent;C:Program Files (x86)Common FilesCitrixSystem32;C:Program Filesnodejs;C:Program Files (x86)Citrixsystem32;C:Program Files (x86)GoogleChromeApplication;C:ProgramDataOracleJavajavapath;C:Windowssystem32;C:Windows;C:WindowsSystem32Wbem;C:WindowsSystem32WindowsPowerShellv1.0;C:Program Files (x86)Citrixsystem32;C:Program FilesCitrixVirtual Desktop Agent;C:Program Files (x86)Common FilesCitrixSystem32;U:AppDataRoamingProgramsGitcmd;C:Program FilesJavajre1.8.0_31;C:Program FilesJavajdk1.8.0_31;.
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.io.tmpdir=C:Usersu60888AppDataLocalTemp19
18/05/03 12:15:26 INFO ZooKeeper: Client environment:java.compiler=<NA>
18/05/03 12:15:26 INFO ZooKeeper: Client environment:os.name=Windows Server 2008 R2
18/05/03 12:15:26 INFO ZooKeeper: Client environment:os.arch=amd64
18/05/03 12:15:26 INFO ZooKeeper: Client environment:os.version=6.1
18/05/03 12:15:26 INFO ZooKeeper: Client environment:user.name=u60888
18/05/03 12:15:26 INFO ZooKeeper: Client environment:user.home=C:Usersu60888
18/05/03 12:15:26 INFO ZooKeeper: Client environment:user.dir=U:KafkaWorkspace2scala-maven-example-masterscala-maven-example-master
18/05/03 12:15:26 INFO ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@396c8257
18/05/03 12:15:26 INFO ClientCnxn: Opening socket connection to server 0:0:0:0:0:0:0:1/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
18/05/03 12:15:27 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)
18/05/03 12:15:28 INFO ClientCnxn: Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
18/05/03 12:15:29 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068)
18/05/03 12:15:30 INFO JobScheduler: Added jobs for time 1525329930000 ms
18/05/03 12:15:30 INFO JobScheduler: Starting job streaming job 1525329930000 ms.0 from job set of time 1525329930000 ms
-------------------------------------------
Time: 1525329930000 ms
-------------------------------------------

2

Answers


  1. can you change kafka version

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0</version>
    </dependency>
    
    Login or Signup to reply.
  2. Add these dependencies in the pom file

    <dependencies>
      <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>$2.10.4</version>
      </dependency>
    
    
      <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.2</version>
      </dependency>
    
      <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.2</version>
      </dependency>
    
    
      <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.2</version>
      </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.2</version>
    
        </dependency>
    
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.2</version>
    
        </dependency>
    
    </dependencies>
    

    Try this code,

    //spark configurations
            val sparkConf = new SparkConf().setAppName("Kafka").setMaster("local[*]")
            val ssc = new StreamingContext(sparkConf, Seconds(10))
            val kafkaConf = Map(
            "metadata.broker.list" -> "localhost:9092",
            "zookeeper.connect" -> "localhost:9091",
            "group.id" -> "kafka-streaming-example",
            "zookeeper.connection.timeout.ms" -> "200000"
            )
            val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
            ssc,
            kafkaConf,
            Map("sample-topic" -> 1), // subscripe to topic and partition 1
            StorageLevel.MEMORY_ONLY
            )
            println("printing" + lines.toString())
            val words = lines.flatMap { case (x, y) => y.split(" ") }
            words.print()
            ssc.start()
            ssc.awaitTermination()
    

    Hope this works!

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search