skip to Main Content

I have a Spark Dataframe which contains in every single row two items: a file name (with an extension, like for instance .jpg), and the content of the file in bytes.
I would like to write a process that takes each row of the Dataframe and converts the bytes to a ‘.jpg’ image while I store it into an ADLS container.

Everything needs to run inside a Databricks cluster so I use pyspark in order to create the Dataframe, and I would like to make use of it to truncate those files into the destination.

However, I am having trouble when I use the azure-storage library to write those files by using it inside a map function. Like the following example, where the function consume_row uses the library to create the file and write the content like this example:

results_rdd = rdd.map(lambda row: consume_row(row, ...))

It returns the following error:

PicklingError: Could not serialize object: TypeError: cannot pickle ‘_thread._local’ object

Has anyone tried to do anything similar to this?

2

Answers


  1. Chosen as BEST ANSWER

    The problem was inside the function consume_row. We were using a variable to store the token for the API which, underneath, was using a Thread Local Python object to store the token itself, and this was not serializable by pickle to be sent to workers. So we just needed to pass the token to the function and everything is working perfectly.


  2. PySpark uses Python’s pickle module for serialization, and certain objects (like _thread._local objects) cannot be pickled.

    To achieve your goal, you need to mount your adls2 account and save the file.

    Follow the steps below.

    Here, I am mounting using the account key. You can also mount using a SAS token or OAuth.

    dbutils.fs.mount(
      source = "wasbs://<container_name>@<account_name>.blob.core.windows.net",
      mount_point = "/mnt/jadls2",
      extra_configs = {"fs.azure.account.key.<account_name>.blob.core.windows.net":"<account_key>"})
    

    Next, use the code below.

    from io import BytesIO
    from PIL import Image
    
    sample_data = [
        ("image1.jpg", image1_content),
        ("image2.jpg", image2_content)
    ]
    
    columns = ["filename", "content"]
    sample_df = spark.createDataFrame(sample_data, columns)
    
    def consume_row(row):
        filename, content = row
        image = Image.open(BytesIO(content))
        image.save(f"/dbfs/mnt/jadls2/databricks/images/{filename}",format="JPEG")
    
    sample_df.rdd.foreach(consume_row)
    

    Output:

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search