skip to Main Content

I am new to Databricks and would appreciate any help 🙂

I am using Databricks for an ELT project to read from Blob storage
transform the data to Dataframes,
and write the data to another blob storage(in essence a Lighthouse architecture)

question :
i get an error when i try to process a large file (1GB) in databricks.
how can i do it more efficiently?
i’ve tried parallelize and first writing it to parquet but i still get an issue with limit to resource.

my Enviorment is :
Azure Databricks
cluster compute size : DBR 14.3 SPARK3.5.0 . 4-8 cores

i get an error when i try to process a file of size : 1GB.

my code is this :

# Define your storage account details
storage_account_name = "{SomeblobName}"
storage_account_key = "{SomePassword}"

# Create a connection string
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"

# Create a BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Specify the container and blob (file) you want to access
container_name = "incomingSampleData"
file_name = "{SomeName}.json"

# Get a BlobClient object to interact with the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)

# Download the content of the blob
blob_data = blob_client.download_blob().readall()

# If the file is JSON, load it into a Python dictionary
json_data = json.loads(blob_data)

# Display the data or perform further operations
print(json_data)

this part works.

second part when i try to load and process the file :

# COMMAND ----------

# Initialize Spark session (if needed)
spark = SparkSession.builder.appName("JsonProcessing").getOrCreate()

# Assuming `json_data` is a dictionary or list, we first parallelize it and then read it as JSON
# Convert `json_data` into an RDD and load it as a DataFrame
df_root_spark = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Display the DataFrame
display(df_root_spark.limit(10))

i get an error :

java.lang.Exception: Results too large
    at com.databricks.backend.daemon.driver.OutputAggregator$.maybeApplyOutputAggregation(OutputAggregator.scala:514)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:861)
    at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1575)
    at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:290)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)

2

Answers


  1. I am newbie too and I am commenting purely to be able to see the answers to your question. Maybe if the results are too large(providing this is not a red herring error) you should try splitting your list of dictionaries into smaller chunks and iterate over it? Hoping for some decent answers for you 🙂

    Login or Signup to reply.
  2. Here are some tips for efficient loading the file.

    1. Read you json file from blob storage directly into spark df using below code.
    storage_account_name = "{SomeblobName}"
    storage_account_key = "{SomePassword}"
    
    spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
    file_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{file_name}"
    
    df = spark.read.json(file_path)
    display(df.limit(10))
    

    Refer this documentation for more about this.

    1. According to this article you set the spark.sql.files.maxPartitionBytes configuration to perfect value to process data faster and efficient.
    2. You directly provide the schema, it reduces the loading time.
    df = spark.read.json(spark.sparkContext.parallelize(data_1d),schema=schema)
    
    1. It also depends on the partitions, the default will be 4 partitions since you have 4-8 cores, increase the partition according to the size and number records you have.

    df = df.repartition(4 * 100)

    I have 20,00,000 records and below is the time taken for different partitions.

    @get_time
    def s():
        df = spark.read.json(spark.sparkContext.parallelize(data_1d[:2000000]),schema=schema)
        df = df.repartition(200)
        print(f"Number of Partition -> {df.rdd.getNumPartitions()}")
        df.write.format("noop").mode("overwrite").save()
    

    enter image description here

    and

    @get_time
    def s():
        df = spark.read.json(spark.sparkContext.parallelize(data_1d[:2000000]),schema=schema)
        df = df.repartition(100)
        print(f"Number of Partition -> {df.rdd.getNumPartitions()}")
        df.write.format("noop").mode("overwrite").save()
    

    enter image description here

    It all depends on how many records you have, based on that you set the configuration.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search