skip to Main Content

I am new to PySpark and I am struggling with flattening a nested json file into a PySpark data frame.

I need to define the schema for the JSON data. I know how to define schema for regular json files but I get stuck at defining the schema for this case. I need generalised version of this example, not the hard coded version.

In this example I have a verified manifest and the string AB1 should go under verifiedManifest_name and quantity should go under verifiedManifest_quantity. I don’t know how many inputs to the verifiedManifest I’ll have nor how they will be named.

"verifiedManifest":{"AB1":{"quantity":1}, "DE5":{"quantity":5}, "AG1":{"quantity":10}}

Output for this case will look like this:

verifiedManifest_name    verifiedManifest_quantity
AB1                            1
DE5                            5
AG1                            10
DOL1                           100
BG1                            3
etc...

2

Answers


  1. Suppose you have your data in a JSON file:

    input_df = spark.read.text('./test.json', wholetext = True)
    input_string = input_df.collect()[0]['value']
    input_dict = eval(input_string)
    
    df = spark.createDataFrame([
        (input_dict['verifiedManifest'],)
    ], ['verifiedManifest'])
    
    df = (
        df
        .select(f.explode(f.col('verifiedManifest')))
        .withColumnRenamed('key', 'verifiedManifest_name')
        .withColumn('verifiedManifest_value', f.element_at(f.col('value'), 'quantity'))
        .select('verifiedManifest_name', 'verifiedManifest_value')
    )
    
    df.show(truncate= False)
    

    and the output will look like:

    +---------------------+----------------------+                                  
    |verifiedManifest_name|verifiedManifest_value|
    +---------------------+----------------------+
    |AB1                  |1                     |
    |AG1                  |10                    |
    |DE5                  |5                     |
    +---------------------+----------------------+
    
    Login or Signup to reply.
  2. Use stack function for all the columns dynamically, by reading using read.json method.

    Example:

    from pyspark.sql.functions import *
    str= """{"verifiedManifest":{"AB1":{"quantity":1}, "DE5":{"quantity":5}, "AG1":{"quantity":10}}}"""
    #read the json string using spark read json
    df = spark.read.json(sc.parallelize([str]))
    
    #unnest struct and select dynamically
    df1 = df.select("verifiedManifest.*")
    
    #length of columns
    sz_cols = len(df1.columns).__format__('')
    
    #create dynamic stack function and alias of output column names
    stack_expr = f"stack({sz_cols}" +"," +','.join([f'"{f}",{f}.*' for f in df1.columns]) + ") as (verifiedManifest_name, verifiedManifest_quantity)"
    df1.select(expr(stack_expr)).show(10,False)
    #+---------------------+-------------------------+
    #|verifiedManifest_name|verifiedManifest_quantity|
    #+---------------------+-------------------------+
    #|AB1                  |1                        |
    #|AG1                  |10                       |
    #|DE5                  |5                        |
    #+---------------------+-------------------------+
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search