I would like to process the data 24/7 using databricks structured streaming.
But before I process all the data I need to apply some transformations on it.
therefore I am using foreach
. Below is my def
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, from_json, regexp_replace
from datetime import datetime
def flatten_nested_df_2(df_nested, columns_to_flatten, epochId ):
"""
Cleans and flattens a nested DataFrame by extracting and parsing JSON from a specified column,
then iteratively creating new columns out of original columns which are structs.
The new column names get the original parent column name as prefix.
:param df_nested: The input DataFrame with nested JSON.
:param columns_to_flatten: List of column names to flatten.
:param json_column_name: The column name containing JSON data to be cleaned and parsed.
:return: A flattened DataFrame.
"""
#----------------------------------------- Part1: Cleaning and Flattening --------------------------------------------------------------
# Step 1: Clean the JSON string in the specified column
json_column_name="data"
clean_col = col(json_column_name)
for pattern, replacement in [
('`', ''),
("'", ''),
# ('-', '_'),
(' ', '_'),
('\.', '_'),
('á', 'a'),
('\$', ''),
('\n', '_')]:
clean_col = regexp_replace(clean_col, pattern, replacement)
filtered_df = df_nested.filter(col("type").like("%YardOrder%"))
filtered_df = filtered_df.select("data")
df_cleaned = filtered_df.withColumn(json_column_name, clean_col)
# Step 2: Parse the JSON string into a DataFrame
schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema
df_with_json = df_cleaned.withColumn(json_column_name, from_json(json_column_name, schema=schema))
# Step 3: Flatten the nested DataFrame
stack = [((), df_with_json)]
column_names = []
while len(stack) > 0:
parents, df = stack.pop()
flat_column_names = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if (c[1][:6] != "struct") and c[0] in columns_to_flatten
]
nested_column_names = [c[0] for c in df.dtypes if c[1][:6] == "struct" and c[0] in columns_to_flatten]
column_names.extend(flat_column_names)
for nested_column_name in nested_column_names:
projected_df = df.select(nested_column_name + ".*")
stack.append((parents + (nested_column_name,), projected_df))
flattened_df = df_with_json.select(column_names)
# Rename the column 'data_id' to 'bk_data_id'
flattened_df = flattened_df.withColumnRenamed("data_id", "bk_data_id")
#------------------------------------------------------------ Part 2: Upsert to silver-------------------------------------------------
deltaTable = DeltaTable.forPath(spark,f"abfss://[email protected]/shippingUnit")
list_of_columns = flattened_df.columns
list_of_BK_columns = ['bk_data_id']
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column}'
string_insert = ''
for column in list_of_BK_columns:
string_insert += f'table.{column} = newData.{column} and '
string_insert[:-4]
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# Executing the merge function itself
print(f"batch {epochId} starting merge now at {datetime.now()}")
deltaTable = DeltaTable.forPath(spark, f"abfss://[email protected]/shippingUnit")
deltaTable.alias('table')
.merge(flattened_df.alias("newData"), string)
.whenMatchedUpdate(set=dictionary)
.whenNotMatchedInsert(values=dictionary)
.execute()
print(f"batch {epochId} done at {datetime.now()}")
this is how im writing it
# print(f"Merge initiated at {datetime.now()}")
df.writeStream.foreach(lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId)).option("checkpointLocation", checkpoint_directory).start()
print(f"Merge done at {datetime.now()}")
But I get the error message:
[CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
I think this problem is caused by this step #Step 2: Parse the JSON string into a DataFrame schema = spark.read.json(df_cleaned.rdd.map(lambda row: row[json_column_name])).schema
. which is not possible to run in continuous fashion.
Im using the single user cluster, runtime 15.3 Beta
any help is much appreciated
3
Answers
You can solve this based on your data, whether the json column is in constant same fields in all rows or not.
If the data in
json_column_name
is constant and the fields are not changing between each rows then you can useschema_of_json
function.Use below code.
Output:
If you the fields in your data is keeps on changing between rows then you need to define standard schema for it and pass the schema to batch function to use it.
and use it in function to parse the data.
lambda expression’s use of a class variable is the source of this issue. Pyspark internally pickles every item it encounters, including the object, which has a reference to the spark context.
You have to take the class reference out of lambda in order to fix that:
As the error message says "SparkContext can only be used on the driver, not in code that it run on workers"
When pyspark.sql.DataFrame.foreach() is executed on the driver, it just delegates that work to workers, to perform in parallel. To do that, it passes the code of the callable/function (
lambda df, epochId: flatten_nested_df_2(df, columns_to_flatten, epochId)
in your case) to the workers. And each worker (which has their own OS/executable/process/memory), compiles and runs this code in it’s runtime environment.There is no RPC callback from worker to driver to execute the function on driver. So the code running on worker, can not use any state info available on driver. Variable
spark
exists only on driver, not worker.Perhaps use python’s json module or one of the json functions from
pyspark.sql.function
likeschema_of_json()
orfrom_json()
orget_json_object()
. Instead ofspark.read.json()
.See also: