skip to Main Content

I have a JSON file and I need to convert it into tabular form by using only Pyspark.
My JSON file :-

{
    "records": [
        {
            "name": "Priya",
            "last_name": "Munjal",
            "special_values": [
                {
                    "name": "adress",
                    "value": "some adress"
                },
                {
                    "name": "city",
                    "value": "Chd"
                },
                {
                    "name": "zip_code",
                    "value": "134112"
                }
            ]
        },
        {
            "name": "Neha",
            "last_name": "Miglani",
            "special_values": [
                {
                    "name": "adress",
                    "value": "some adress"
                },
                {
                    "name": "city",
                    "value": "kkr"
                },
                {
                    "name": "zip_code",
                    "value": "02221"
                }
            ]
        }
    ]
}

Result that I want :-

name|last_name|address|city|zip_code

priya|munjal|some adress|Chd|02193

neha|miglani|some adress|kkr|02221

I have tried this code :-

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Initialize SparkSession
spark = SparkSession.builder.appName("JSONTransformation").getOrCreate()

# Read the input JSON file
input_path = "path_to_your_spark_ex.json"
df = spark.read.json(input_path)

# Explode the 'records' array and select required columns
flattened_df = df.select(
    explode(col('records')).alias('record')
).select(
    col('record.name').alias('name'),
    col('record.last_name').alias('last_name'),
    col('record.special_values').alias('special_values')
)

# Create a DataFrame from the 'special_values' array
values_df = flattened_df.select(
    col('name'),
    col('last_name'),
    col('special_values')[0]['value'].alias('address'),
    col('special_values')[1]['value'].alias('city'),
    col('special_values')[2]['value'].alias('zip_code')
)

# Show the result
values_df.show()

# Stop SparkSession
spark.stop()

but not getting the result. I have done using Pandas but need to be done only in Pyspark now

2

Answers


    1. Extract value from array using special_values.value
    2. Use concat_ws function to concat array of string as comma separated values.
    3. Use from_csv function to apply schema to the comma separated values
    4. Use * to extract attributes from struct data type.
    from_csv( 
         concat_ws( 
            ',',
            special_values.value
         ), 
         'address string, city string, zip_code string'
    )
    
    from pyspark.sql.functions import *
    
    >>> df.show(10, False)
    +-----------------------------------------------------------------------------------------------------------------------------------------------------+
    |records                                                                                                                                              |
    +-----------------------------------------------------------------------------------------------------------------------------------------------------+
    |[{Munjal, Priya, [{adress, some adress}, {city, Chd}, {zip_code, 134112}]}, {Miglani, Neha, [{adress, some adress}, {city, kkr}, {zip_code, 02221}]}]|
    +-----------------------------------------------------------------------------------------------------------------------------------------------------+
    
    df
    .select(explode(col('records')).alias('records'))
    .select(col('records.*'))
    .withColumn('special_values', expr('''from_csv(concat_ws(',',special_values.value), 'address string, city string, zip_code string')'''))
    .select('name','last_name','special_values.*')
    .show(10, False)
    
    +-----+---------+-----------+----+--------+
    |name |last_name|address    |city|zip_code|
    +-----+---------+-----------+----+--------+
    |Priya|Munjal   |some adress|Chd |134112  |
    |Neha |Miglani  |some adress|kkr |02221   |
    +-----+---------+-----------+----+--------+
    
    Login or Signup to reply.
  1. Your code works for me after I add .option('multiLine', True) to the read method.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, explode
    
    spark = SparkSession.builder.appName("JSONTransformation").getOrCreate()
    
    df = (
        spark.read
        .option('multiLine', True)
        .json('data.json')
    )
    
    # Explode the 'records' array and select required columns
    flattened_df = df.select(
        explode(col('records')).alias('record')
    ).select(
        col('record.name').alias('name'),
        col('record.last_name').alias('last_name'),
        col('record.special_values').alias('special_values')
    )
    
    # Create a DataFrame from the 'special_values' array
    values_df = flattened_df.select(
        col('name'),
        col('last_name'),
        col('special_values')[0]['value'].alias('address'),
        col('special_values')[1]['value'].alias('city'),
        col('special_values')[2]['value'].alias('zip_code')
    )
    
    # Show the result
    values_df.show()
    

    Result:

    +-----+---------+-----------+----+--------+
    | name|last_name|    address|city|zip_code|
    +-----+---------+-----------+----+--------+
    |Priya|   Munjal|some adress| Chd|  134112|
    | Neha|  Miglani|some adress| kkr|   02221|
    +-----+---------+-----------+----+--------+
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search