skip to Main Content

I would like to process the data 24/7 using databricks structured streaming.

But before I process all the data I need to apply some transformations on it.

therefore I am using foreach. Below is my def

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, from_json, regexp_replace
from datetime import datetime

def flatten_nested_df_2(df_nested, columns_to_flatten, epochId ):
    """
    Cleans and flattens a nested DataFrame by extracting and parsing JSON from a specified column,
    then iteratively creating new columns out of original columns which are structs. 
    The new column names get the original parent column name as prefix.

    :param df_nested: The input DataFrame with nested JSON.
    :param columns_to_flatten: List of column names to flatten.
    :param json_column_name: The column name containing JSON data to be cleaned and parsed.
    :return: A flattened DataFrame.
    """
    #----------------------------------------- Part1: Cleaning and Flattening --------------------------------------------------------------

    # Step 1: Clean the JSON string in the specified column
    json_column_name="data"
    clean_col = col(json_column_name)
    for pattern, replacement in [
        ('`', ''),  
        ("'", ''),                                  
        # ('-', '_'),                                     
        (' ', '_'),                                         
        ('\.', '_'),                                           
        ('á', 'a'),                                                 
        ('\$', ''),                                                    
        ('\n', '_')]:
        clean_col = regexp_replace(clean_col, pattern, replacement)

    filtered_df = df_nested.filter(col("type").like("%YardOrder%"))
    filtered_df = filtered_df.select("data")
    
    df_cleaned = filtered_df.withColumn(json_column_name, clean_col)
    
    # Step 2: Parse the JSON string into a DataFrame
    schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema
    df_with_json = df_cleaned.withColumn(json_column_name, from_json(json_column_name, schema=schema))
    
    # Step 3: Flatten the nested DataFrame
    stack = [((), df_with_json)]
    column_names = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_column_names = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if (c[1][:6] != "struct") and c[0] in columns_to_flatten
        ]

        nested_column_names = [c[0] for c in df.dtypes if c[1][:6] == "struct" and c[0] in columns_to_flatten]
        column_names.extend(flat_column_names)

        for nested_column_name in nested_column_names:
            projected_df = df.select(nested_column_name + ".*")
            stack.append((parents + (nested_column_name,), projected_df))
    
    flattened_df = df_with_json.select(column_names)
    # Rename the column 'data_id' to 'bk_data_id'
    flattened_df = flattened_df.withColumnRenamed("data_id", "bk_data_id")


    #------------------------------------------------------------ Part 2: Upsert to silver-------------------------------------------------

    deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/shippingUnit")


    
    list_of_columns = flattened_df.columns
    list_of_BK_columns = ['bk_data_id']

    string = ''
    for column in list_of_BK_columns:
        string += f'table.{column} = newData.{column}'
    string_insert = ''
    for column in list_of_BK_columns:
        string_insert += f'table.{column} = newData.{column} and '
    string_insert[:-4]
    dictionary = {}
    for key in list_of_columns:
        dictionary[key] = f'newData.{key}'
    # Executing the merge function itself

    print(f"batch {epochId} starting merge now at {datetime.now()}")

    deltaTable = DeltaTable.forPath(spark, f"abfss://[email protected]/shippingUnit")
    deltaTable.alias('table') 
        .merge(flattened_df.alias("newData"), string) 
        .whenMatchedUpdate(set=dictionary) 
        .whenNotMatchedInsert(values=dictionary) 
        .execute()
    
    print(f"batch {epochId} done at {datetime.now()}")

this is how im writing it

# print(f"Merge initiated at {datetime.now()}")
df.writeStream.foreach(lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId)).option("checkpointLocation", checkpoint_directory).start()
print(f"Merge done at {datetime.now()}")

But I get the error message:

[CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

I think this problem is caused by this step #Step 2: Parse the JSON string into a DataFrame schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema. which is not possible to run in continuous fashion.

Im using the single user cluster, runtime 15.3 Beta

any help is much appreciated

3

Answers


  1. You can solve this based on your data, whether the json column is in constant same fields in all rows or not.

    If the data in json_column_name is constant and the fields are not changing between each rows then you can use schema_of_json function.

    Use below code.

    json_string = df.rdd.map(lambda row: row['json_column_name']).collect()[0]
    df_with_json = df.withColumn('json_column_name', from_json('json_column_name', schema=schema_of_json(json_string)))
    df_with_json.display()
    

    Output:

    id name json_column_name
    1 jaya [{"key":"value"}]
    2 alex [{"key":null}]
    3 sam [{"key":null}]

    If you the fields in your data is keeps on changing between rows then you need to define standard schema for it and pass the schema to batch function to use it.

    df.writeStream.foreach(lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten,standard_json_schema ,epochId)).option("checkpointLocation", checkpoint_directory).start()
    

    and use it in function to parse the data.

    df_with_json = df.withColumn('json_column_name', from_json('json_column_name', schema=standard_json_schema))
    
    Login or Signup to reply.
  2. lambda expression’s use of a class variable is the source of this issue. Pyspark internally pickles every item it encounters, including the object, which has a reference to the spark context.
    You have to take the class reference out of lambda in order to fix that:

    Login or Signup to reply.
  3. As the error message says "SparkContext can only be used on the driver, not in code that it run on workers"

    When pyspark.sql.DataFrame.foreach() is executed on the driver, it just delegates that work to workers, to perform in parallel. To do that, it passes the code of the callable/function (lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId) in your case) to the workers. And each worker (which has their own OS/executable/process/memory), compiles and runs this code in it’s runtime environment.

    There is no RPC callback from worker to driver to execute the function on driver. So the code running on worker, can not use any state info available on driver. Variable spark exists only on driver, not worker.

    Perhaps use python’s json module or one of the json functions from pyspark.sql.function like schema_of_json() or from_json() or get_json_object(). Instead of spark.read.json().

    See also:

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search