I am new to Databricks and would appreciate any help 🙂
I am using Databricks for an ELT project to read from Blob storage
transform the data to Dataframes,
and write the data to another blob storage(in essence a Lighthouse architecture)
question :
i get an error when i try to process a large file (1GB) in databricks.
how can i do it more efficiently?
i’ve tried parallelize and first writing it to parquet but i still get an issue with limit to resource.
my Enviorment is :
Azure Databricks
cluster compute size : DBR 14.3 SPARK3.5.0 . 4-8 cores
i get an error when i try to process a file of size : 1GB.
my code is this :
# Define your storage account details
storage_account_name = "{SomeblobName}"
storage_account_key = "{SomePassword}"
# Create a connection string
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
# Create a BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Specify the container and blob (file) you want to access
container_name = "incomingSampleData"
file_name = "{SomeName}.json"
# Get a BlobClient object to interact with the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)
# Download the content of the blob
blob_data = blob_client.download_blob().readall()
# If the file is JSON, load it into a Python dictionary
json_data = json.loads(blob_data)
# Display the data or perform further operations
print(json_data)
this part works.
second part when i try to load and process the file :
# COMMAND ----------
# Initialize Spark session (if needed)
spark = SparkSession.builder.appName("JsonProcessing").getOrCreate()
# Assuming `json_data` is a dictionary or list, we first parallelize it and then read it as JSON
# Convert `json_data` into an RDD and load it as a DataFrame
df_root_spark = spark.read.json(spark.sparkContext.parallelize([json_data]))
# Display the DataFrame
display(df_root_spark.limit(10))
i get an error :
java.lang.Exception: Results too large
at com.databricks.backend.daemon.driver.OutputAggregator$.maybeApplyOutputAggregation(OutputAggregator.scala:514)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:861)
at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1575)
at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:290)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
2
Answers
I am newbie too and I am commenting purely to be able to see the answers to your question. Maybe if the results are too large(providing this is not a red herring error) you should try splitting your list of dictionaries into smaller chunks and iterate over it? Hoping for some decent answers for you 🙂
Here are some tips for efficient loading the file.
df
using below code.Refer this documentation for more about this.
spark.sql.files.maxPartitionBytes
configuration to perfect value to process data faster and efficient.df = df.repartition(4 * 100)
I have 20,00,000 records and below is the time taken for different partitions.
and
It all depends on how many records you have, based on that you set the configuration.