skip to Main Content

I am trying to write data to Cassandra table (cosmos DB) via Azure DBR job (spark streaming). Getting below exception:

Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`

What I did to get here:

  • created cosmos DB account
  • created cassandra keyspace
  • created cassandra table
  • created DBR job
  • added com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 to the job cluster
  • added com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 to the job cluster

What I tried:

different versions of connectors or azure cosmos db helper libraries, but some or the other ClassNotFoundExceptions or MethodNotFound errors

Code Snippet:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime

def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
    spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
    spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
    spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
    spark.conf.set("spark.cassandra.concurrent.reads", "512")
    spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
    spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
    spark.conf.set("spark.cassandra.output.ignoreNulls","true")
    spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
    spark.conf.set("spark.cassandra.connection.port", "10350")
    spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
    // spark.cassandra.auth.username and password are set in cluster conf
    
    val write=dataFrame.writeStream.
              format("org.apache.spark.sql.cassandra").
              options(Map( "table" -> "****", "keyspace" -> "****")).
              foreachBatch(upsertToDelta _).
              outputMode("update").
              option("mergeSchema", "true").
              option("mode","PERMISSIVE").
              option("checkpointLocation", checkpointLocation).
              start()
            write.awaitTermination()
}

  def upsertToDelta(newBatch: DataFrame, batchId: Long) {

    try {
      val spark = SparkSession.active
      println(LocalDateTime.now())
      println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
      newBatch.persist()
      val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
      val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
    
      deDup.write
        .format("org.apache.spark.sql.cassandra")
        .options(Map( "table" -> "****", "keyspace" -> "****"))
        .mode("append")
        .save()

      newBatch.unpersist()
    } catch {
      case e: Exception =>
        throw e
    }
  }

############################

After implementing solution suggested by @theo-van-kraay, Getting error in executor’s logs (Job keeps on running even after this error)

23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.orElse(Option.scala:447)
    at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
    at scala.Option.flatMap(Option.scala:271)
    ...
    ...

23/02/13 07:28:56 ERROR Utils: Aborting task

2

Answers


  1. You can remove:

    com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 
    

    It is not required with Spark 3 Cassandra Connector and was created for Spark 2 only. Also remove references to it in the code.

    Login or Signup to reply.
  2. The "Unable to get Token Metadata" error is a known issue that affects Spark 3 (Java 4 driver) and Cosmos DB API for Apache Cassandra in certain scenarios. It has been fixed recently but is still in the process of being rolled out across the service. If resolution is urgent, you can raise a support case in Azure and we can expedite by enabling the fix explicitly on your account until it has been fully deployed. Feel free to mention this Stack Overflow question when raising the support case so that the engineer who handles it will have context.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search