I am trying to write data to Cassandra table (cosmos DB) via Azure DBR job (spark streaming). Getting below exception:
Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`
`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`
What I did to get here:
- created cosmos DB account
- created cassandra keyspace
- created cassandra table
- created DBR job
- added com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 to the job cluster
- added com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0 to the job cluster
What I tried:
different versions of connectors or azure cosmos db helper libraries, but some or the other ClassNotFoundExceptions or MethodNotFound errors
Code Snippet:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime
def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
spark.conf.set("spark.cassandra.output.ignoreNulls","true")
spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
// spark.cassandra.auth.username and password are set in cluster conf
val write=dataFrame.writeStream.
format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "****", "keyspace" -> "****")).
foreachBatch(upsertToDelta _).
outputMode("update").
option("mergeSchema", "true").
option("mode","PERMISSIVE").
option("checkpointLocation", checkpointLocation).
start()
write.awaitTermination()
}
def upsertToDelta(newBatch: DataFrame, batchId: Long) {
try {
val spark = SparkSession.active
println(LocalDateTime.now())
println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
newBatch.persist()
val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
deDup.write
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "****", "keyspace" -> "****"))
.mode("append")
.save()
newBatch.unpersist()
} catch {
case e: Exception =>
throw e
}
}
############################
After implementing solution suggested by @theo-van-kraay, Getting error in executor’s logs (Job keeps on running even after this error)
23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
at scala.Option.orElse(Option.scala:447)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.flatMap(Option.scala:271)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.orElse(Option.scala:447)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.flatMap(Option.scala:271)
...
...
23/02/13 07:28:56 ERROR Utils: Aborting task
2
Answers
You can remove:
It is not required with Spark 3 Cassandra Connector and was created for Spark 2 only. Also remove references to it in the code.
The "Unable to get Token Metadata" error is a known issue that affects Spark 3 (Java 4 driver) and Cosmos DB API for Apache Cassandra in certain scenarios. It has been fixed recently but is still in the process of being rolled out across the service. If resolution is urgent, you can raise a support case in Azure and we can expedite by enabling the fix explicitly on your account until it has been fully deployed. Feel free to mention this Stack Overflow question when raising the support case so that the engineer who handles it will have context.