skip to Main Content

I am new to Pyspark and trying to flatten JSON file using Pyspark but not getting desired output.

Here is my JSON file :-

{
    "events": [
        {
            "event_name": "start",
            "event_properties": ["property1", "property2", "property3"],
            "entities": ["entityI", "entityII", "entityIII"],
            "event_timestamp": "2022-05-01 00:00:00"
        },
        {
            "event_name": "stop",
            "event_properties": ["propertyA", "propertyB", "propertyC", "propertyD"],
            "entities": ["entityW", "entityX", "entityY", "entityZ"],
            "event_timestamp": "2022-05-01 01:00:00"
        }
    ]
}

I want output using PySpark :-

event_name | event_properties | entities  | event_timestamp

start      | property1        | entityI   | 2022-05-01 00:00:00

start      | property2        | entityII  | 2022-05-01 00:00:00

start      | property3        | entityIII | 2022-05-01 00:00:00

stop       | propertyA        | entityW   | 2022-05-01 01:00:00

stop       | propertyB        | entityX   | 2022-05-01 01:00:00

stop       | propertyC        | entityY   | 2022-05-01 01:00:00

stop       | propertyD        | entityZ   | 2022-05-01 01:00:00

The code which I have tried :-
Implementing JSON File in PySpark

spark = SparkSession.builder 
    .master("local[1]") 
    .appName("PySpark Read JSON") 
    .getOrCreate()
    
df = spark.read.option("multiline","true").json(r"C:UsersLajoDownloadsspark_ex1_input.json")

from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col


def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df
   
   
 df_flatten = flatten(df)
 
 df_flatten.show()

Can someone help?

2

Answers


  1. So first we read the json file and import needed functions.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode, posexplode, col,concat_ws
    
    spark = SparkSession.builder 
        .master("local[1]") 
        .appName("PySpark Read JSON") 
        .getOrCreate()
    
    df = spark.read.option("multiline", "true").json(r"input.json")
    

    When dealing with nested JSON structures in PySpark and needing to flatten arrays side-by-side, the traditional explode function can lead to incorrect combinations if not used cautiously.

    The solution to this problem is to maintain a unique identifier for each event to keep track of exploded elements.
    The posexplode function is very useful for this. This function not only explodes the array into separate rows but also provides the position (index) of each element in the original array.

    # Explode the events
    df_events = df.select(explode(col("events")).alias("events_data"))
    
    # Create a unique event identifier
    df_events = df_events.withColumn("event_id", concat_ws('_', "events_data.event_name", "events_data.event_timestamp"))
    
    # Explode the event_properties with their index and keep the event_id
    df_properties = df_events.select(
            "event_id",
            "events_data.event_name",
            "events_data.event_timestamp",
            posexplode("events_data.event_properties").alias("index", "event_properties")
        )
    
    # Explode the entities with their index and keep the event_id
    df_entities = df_events.select(
            "event_id",
            posexplode("events_data.entities").alias("index", "entities")
        )
    
    # Join on the event_id and index
    final_df = df_properties.join(df_entities, on=["event_id", "index"]).select(
            "event_name",
            "event_properties",
            "entities",
            "event_timestamp"
        )
    final_df.show(truncate=False)
    

    result:

    +----------+----------------+---------+-------------------+
    |event_name|event_properties|entities |event_timestamp    |
    +----------+----------------+---------+-------------------+
    |start     |property1       |entityI  |2022-05-01 00:00:00|
    |start     |property2       |entityII |2022-05-01 00:00:00|
    |start     |property3       |entityIII|2022-05-01 00:00:00|
    |stop      |propertyA       |entityW  |2022-05-01 01:00:00|
    |stop      |propertyB       |entityX  |2022-05-01 01:00:00|
    |stop      |propertyC       |entityY  |2022-05-01 01:00:00|
    |stop      |propertyD       |entityZ  |2022-05-01 01:00:00|
    +----------+----------------+---------+-------------------+
    
    Login or Signup to reply.
  2. Please find the simple spark sql to flatten required columns.

    scala> spark.table("input").toJSON.show(false)
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |value                                                                                                                                                                                                                                                                                                                                                                     |
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |{"events":[{"entities":["entityI","entityII","entityIII"],"event_name":"start","event_properties":["property1","property2","property3"],"event_timestamp":"2022-05-01 00:00:00"},{"entities":["entityW","entityX","entityY","entityZ"],"event_name":"stop","event_properties":["propertyA","propertyB","propertyC","propertyD"],"event_timestamp":"2022-05-01 01:00:00"}]}|
    +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    spark.sql("""
        WITH explode_events AS (
            SELECT
                inline_outer(events),
                explode_outer(entities) AS entity,
                inline_outer(transform(event_properties, e -> struct(e))) AS event_property
            FROM input)
        select
            distinct
            event_name,
            event_property,
            entity,
            event_timestamp
        from explode_events
    """).show(100, false)
    
    // Exiting paste mode, now interpreting.
    
    +----------+--------------+---------+-------------------+
    |event_name|event_property|entity   |event_timestamp    |
    +----------+--------------+---------+-------------------+
    |start     |property3     |entityII |2022-05-01 00:00:00|
    |stop      |propertyA     |entityX  |2022-05-01 01:00:00|
    |stop      |propertyC     |entityY  |2022-05-01 01:00:00|
    |stop      |propertyA     |entityY  |2022-05-01 01:00:00|
    |stop      |propertyD     |entityY  |2022-05-01 01:00:00|
    |stop      |propertyD     |entityX  |2022-05-01 01:00:00|
    |stop      |propertyC     |entityZ  |2022-05-01 01:00:00|
    |start     |property3     |entityIII|2022-05-01 00:00:00|
    |stop      |propertyD     |entityZ  |2022-05-01 01:00:00|
    |stop      |propertyA     |entityW  |2022-05-01 01:00:00|
    |start     |property1     |entityII |2022-05-01 00:00:00|
    |start     |property2     |entityII |2022-05-01 00:00:00|
    |stop      |propertyB     |entityX  |2022-05-01 01:00:00|
    |stop      |propertyC     |entityW  |2022-05-01 01:00:00|
    |stop      |propertyD     |entityW  |2022-05-01 01:00:00|
    |stop      |propertyB     |entityW  |2022-05-01 01:00:00|
    |start     |property1     |entityIII|2022-05-01 00:00:00|
    |stop      |propertyB     |entityY  |2022-05-01 01:00:00|
    |stop      |propertyA     |entityZ  |2022-05-01 01:00:00|
    |start     |property3     |entityI  |2022-05-01 00:00:00|
    |stop      |propertyC     |entityX  |2022-05-01 01:00:00|
    |start     |property2     |entityI  |2022-05-01 00:00:00|
    |stop      |propertyB     |entityZ  |2022-05-01 01:00:00|
    |start     |property2     |entityIII|2022-05-01 00:00:00|
    |start     |property1     |entityI  |2022-05-01 00:00:00|
    +----------+--------------+---------+-------------------+
    
    
    scala>
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search