skip to Main Content

I have these following inputs.
1.Endpoint
2.Database
3.Collections
Based on these inputs needs to establish a connection with Azure Cosmos Db from databricks

Tried this approach:
installed importted com.azure.cosmos.spark maven package in databricks cluster with the DBR of 11.3LTS and spark 3.3.0


readCfg = {
    "Endpoint": "<endpointurl>",
    "Database": "<dbname>",
    "Collection": "<collectionname>"
}

 

readCfg1 = {
  "spark.cosmos.accountEndpoint": "<endpointurl>",
  "spark.cosmos.database": "<dbname>",
  "spark.cosmos.container": "<collectionname>"
}

 

# Read data from Cosmos DB
df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**readCfg).load()
df.show()`


Error Message 
getting class not found/assertion error

2

Answers


  1. This code should work. You can modify the query as per requirement.

    import uuid
    from pyspark.sql.functions import *
    cosmosEndpoint = "https://xxxxxxxxx.documents.azure.com:443/"
    cosmosMasterKey = "mxxxxxxxxxxxxxxxxxx=="
    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.views.repositoryPath", "/viewDefinitions" + str(uuid.uuid4()))#
    
    database1 = "<your_databasename>"
    sourceContainer = "<your_container_name>"
    
    
    query = """select * from cosmosCatalog.{}.{} 
    """.format(database1, sourceContainer)
     
    df = spark.sql(query)
    df.show()
    
    Login or Signup to reply.
  2. Try the alternative below code –

    from azure.cosmos import CosmosClient
    
    cosmosEndpoint = "xxxx"
    cosmosPrimaryKey = "yyyy"
    cosmosDatabaseName = "zzzz"
    cosmosCollectionName = "xyz"
    
    client = CosmosClient(cosmosEndpoint, credential=cosmosPrimaryKey)
    database = client.get_database_client(cosmosDatabaseName)
    collection = database.get_container_client(cosmosCollectionName)
    
    # Enumerate the returned items
    import json
    for item in collection.query_items(
            query='SELECT * FROM {0} r'.format(cosmosCollectionName),
            enable_cross_partition_query=True):
        print(json.dumps(item, indent=True))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search