I am new to Pyspark and trying to flatten JSON file using Pyspark but not getting desired output.
Here is my JSON file :-
{
"events": [
{
"event_name": "start",
"event_properties": ["property1", "property2", "property3"],
"entities": ["entityI", "entityII", "entityIII"],
"event_timestamp": "2022-05-01 00:00:00"
},
{
"event_name": "stop",
"event_properties": ["propertyA", "propertyB", "propertyC", "propertyD"],
"entities": ["entityW", "entityX", "entityY", "entityZ"],
"event_timestamp": "2022-05-01 01:00:00"
}
]
}
I want output using PySpark :-
event_name | event_properties | entities | event_timestamp
start | property1 | entityI | 2022-05-01 00:00:00
start | property2 | entityII | 2022-05-01 00:00:00
start | property3 | entityIII | 2022-05-01 00:00:00
stop | propertyA | entityW | 2022-05-01 01:00:00
stop | propertyB | entityX | 2022-05-01 01:00:00
stop | propertyC | entityY | 2022-05-01 01:00:00
stop | propertyD | entityZ | 2022-05-01 01:00:00
The code which I have tried :-
Implementing JSON File in PySpark
spark = SparkSession.builder
.master("local[1]")
.appName("PySpark Read JSON")
.getOrCreate()
df = spark.read.option("multiline","true").json(r"C:UsersLajoDownloadsspark_ex1_input.json")
from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
df_flatten = flatten(df)
df_flatten.show()
Can someone help?
2
Answers
So first we read the json file and import needed functions.
When dealing with nested JSON structures in PySpark and needing to flatten arrays side-by-side, the traditional
explode
function can lead to incorrect combinations if not used cautiously.The solution to this problem is to maintain a unique identifier for each event to keep track of exploded elements.
The
posexplode
function is very useful for this. This function not only explodes the array into separate rows but also provides the position (index) of each element in the original array.result:
Please find the simple spark sql to flatten required columns.