skip to Main Content

I am composing these services in separate docker containers all on the same confluent network:

broker:
    image: confluentinc/cp-server:7.4.0
    hostname: broker
    container_name: broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    networks:
      - confluent
    healthcheck:
      test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
      interval: 10s
      timeout: 5s
      retries: 5

  spark-master:
    image: bitnami/spark:latest
    volumes: 
      - ./spark_stream.py:/opt/bitnami/spark/spark_stream.py
    command: bin/spark-class org.apache.spark.deploy.master.Master
    ports:
      - "9090:8080"
      - "7077:7077"
    networks:
      - confluent

  spark-worker:
    image: bitnami/spark:latest
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
    depends_on:
      - spark-master
    environment:
      SPARK_MODE: worker
      SPARK_WORKER_CORES: 2
      SPARK_WORKER_MEMORY: 1g
      SPARK_MASTER_URL: spark://spark-master:7077
    networks:
      - confluent
    volumes: 
      - ./spark_stream.py:/opt/bitnami/spark/spark_stream.py

  cassandra_db:
    image: cassandra:latest
    container_name: cassandra
    hostname: cassandra
    ports:
      - "9042:9042"
    environment:
      - MAX_HEAP_SIZE=512M
      - HEAP_NEWSIZE=100M
      - CASSANDRA_USERNAME=cassandra
      - CASSANDRA_PASSWORD=cassandra
    networks:
      - confluent


# Define the networks
networks:
  confluent:

I create a spark connection like this:

s_conn = SparkSession.builder 
            .appName('SparkDataStreaming') 
            .config('spark.cassandra.connection.host', 'cassandra') 
            .config("spark.cassandra.connection.port", "9042") 
            .config("spark.cassandra.auth.username", "cassandra") 
            .config("spark.cassandra.auth.password", "cassandra") 
            .getOrCreate()

And cassandra connection like this:

auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['localhost'], port=9042, auth_provider=auth_provider, connect_timeout=60)
cas_session = cluster.connect()

The spark connection reads from a kafka stream (also on same network) into a dataframe like this:

spark_df = spark_conn.readStream 
            .format('kafka') 
            .option('kafka.bootstrap.servers', 'broker:29092') 
            .option('subscribe', 'users_created') 
            .option('startingOffsets', 'earliest') 
            .load()

I write the dataframe to cassandra using this:

streaming_query = (spark_df.writeStream.format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .start())

When I try writing the dataframe to cassandra I get this error:

ERROR CassandraConnectorConf: Unknown host 'cassandra'
java.net.UnknownHostException: cassandra: nodename nor servname provided, or not known

And this is how I call my spark job (I believe with the right packages attached):

spark-submit --master spark://localhost:7077 --packages "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3" --conf spark.cassandra.connection.host=cassandra --conf spark.cassandra.connection.port=9042 spark_stream.py

I thought the spark connection would connect to cassandra, but it seems it can’t find the host or contact point?

I tried changing spark.cassandra.connection.host to localhost or 127.0.0.1. For some reason localhost gives me a different error message:

ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
24/11/14 01:59:40 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 0, writer: CassandraBulkWrite(org.apache.spark.sql.SparkSession@105fe638,com.datastax.spark.connector.cql.CassandraConnector@4d392247,TableDef(spark_streams,created_users,ArrayBuffer(ColumnDef(id,PartitionKeyColumn,VarCharType)),ArrayBuffer(),Stream(ColumnDef(address,RegularColumn,VarCharType), ColumnDef(email,RegularColumn,VarCharType), ColumnDef(first_name,RegularColumn,VarCharType), ColumnDef(gender,RegularColumn,VarCharType), ColumnDef(last_name,RegularColumn,VarCharType), ColumnDef(phone,RegularColumn,VarCharType), ColumnDef(picture,RegularColumn,VarCharType), ColumnDef(post_code,RegularColumn,VarCharType), ColumnDef(registered_date,RegularColumn,VarCharType), ColumnDef(username,RegularColumn,VarCharType)),Stream(),false,false,Map()),WriteConf(RowsInBatch(5),1000,Partition,LOCAL_QUORUM,false,false,5,None,TTLOption(DefaultValue),TimestampOption(DefaultValue),true,None),StructType(StructField(id,StringType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(gender,StringType,true),StructField(address,StringType,true),StructField(post_code,StringType,true),StructField(email,StringType,true),StructField(username,StringType,true),StructField(registered_date,StringType,true),StructField(phone,StringType,true),StructField(picture,StringType,true)),org.apache.spark.SparkConf@5ac2dede)] is aborting.

I don’t believe it’s an error with column mismatches, but I could be wrong. I can see the data appear in the kafka topic through a confluent control-center I set up, so I know the data is making it that far.

Any help would be appreciated.

2

Answers


  1. Have you tried To integrate Kafka with Cassandra, the inegration can leverage the power of Kafka Streams and the DataStax Java Driver for Cassandra? This combination allows to efficiently stream data from Kafka and store it in Cassandra.

    Login or Signup to reply.
  2. verify cassandra hostname by checking configuration and reviewing property spark.cassandra.connection.host Spark configuration. This should be set to the actual hostname or IP address of the cassandra node.

    If you have a cluster with multiple cassandra nodes, specify them as a comma-separated list in the spark.cassandra.connection.host property.
    For example: spark.cassandra.connection.host=cassandra-node1,cassandra-node2

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search