I am composing these services in separate docker containers all on the same confluent network:
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
spark-master:
image: bitnami/spark:latest
volumes:
- ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
networks:
- confluent
spark-worker:
image: bitnami/spark:latest
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
networks:
- confluent
volumes:
- ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
- CASSANDRA_USERNAME=cassandra
- CASSANDRA_PASSWORD=cassandra
networks:
- confluent
# Define the networks
networks:
confluent:
I create a spark connection like this:
s_conn = SparkSession.builder
.appName('SparkDataStreaming')
.config('spark.cassandra.connection.host', 'cassandra')
.config("spark.cassandra.connection.port", "9042")
.config("spark.cassandra.auth.username", "cassandra")
.config("spark.cassandra.auth.password", "cassandra")
.getOrCreate()
And cassandra connection like this:
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['localhost'], port=9042, auth_provider=auth_provider, connect_timeout=60)
cas_session = cluster.connect()
The spark connection reads from a kafka stream (also on same network) into a dataframe like this:
spark_df = spark_conn.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'broker:29092')
.option('subscribe', 'users_created')
.option('startingOffsets', 'earliest')
.load()
I write the dataframe to cassandra using this:
streaming_query = (spark_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
When I try writing the dataframe to cassandra I get this error:
ERROR CassandraConnectorConf: Unknown host 'cassandra'
java.net.UnknownHostException: cassandra: nodename nor servname provided, or not known
And this is how I call my spark job (I believe with the right packages attached):
spark-submit --master spark://localhost:7077 --packages "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3" --conf spark.cassandra.connection.host=cassandra --conf spark.cassandra.connection.port=9042 spark_stream.py
I thought the spark connection would connect to cassandra, but it seems it can’t find the host or contact point?
I tried changing spark.cassandra.connection.host
to localhost
or 127.0.0.1
. For some reason localhost
gives me a different error message:
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
24/11/14 01:59:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@105fe638,com.datastax.spark.connector.cql.CassandraConnector@4d392247,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(address,RegularColumn,VarCharType), ColumnDef(email,RegularColumn,VarCharType), ColumnDef(first_name,RegularColumn,VarCharType), ColumnDef(gender,RegularColumn,VarCharType), ColumnDef(last_name,RegularColumn,VarCharType), ColumnDef(phone,RegularColumn,VarCharType), ColumnDef(picture,RegularColumn,VarCharType), ColumnDef(post_code,RegularColumn,VarCharType), ColumnDef(registered_date,RegularColumn,VarCharType), ColumnDef(username,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(RowsInBatch(5),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(gender,StringType,true),StructField(address,StringType,true),StructField(post_code,StringType,true),StructField(email,StringType,true),StructField(username,StringType,true),StructField(registered_date,StringType,true),StructField(phone,StringType,true),StructField(picture,StringType,true)),org.apache.spark.SparkConf@5ac2dede)] is aborting.
I don’t believe it’s an error with column mismatches, but I could be wrong. I can see the data appear in the kafka topic through a confluent control-center I set up, so I know the data is making it that far.
Any help would be appreciated.
2
Answers
Have you tried To integrate Kafka with Cassandra, the inegration can leverage the power of Kafka Streams and the DataStax Java Driver for Cassandra? This combination allows to efficiently stream data from Kafka and store it in Cassandra.
verify cassandra hostname by checking configuration and reviewing property
spark.cassandra.connection.host
Spark configuration. This should be set to the actual hostname or IP address of the cassandra node.If you have a cluster with multiple cassandra nodes, specify them as a comma-separated list in the spark.cassandra.connection.host property.
For example:
spark.cassandra.connection.host=cassandra-node1,cassandra-node2