skip to Main Content

I have a JSON string substitutions as a column in dataframe which has multiple array elements that I want to explode and create a new row for each element present in that array. There are other columns present in the df

My dataframe looks like this:

+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+
|           requestid|sourcepage|              cartid|                  tm|        dt|          customerId| usItemId|prefType|       substitutions|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+
|00-efbedfe05b4482...|  CHECKOUT|808b44cc-1a38-4dd...|2023-04-25 00:07:...|2023-04-25|f1a34e16-a6d0-6f5...|862776084| NO_PREF|{"id":{"productId...|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+

json string column substitutions

[
    {
        "id": {
            "productId": "2N3UYGUTROQK",
            "usItemId": "32667929"
        },
        "usItemId": "32667929",
        "itemRank": 1,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTRHQK",
            "usItemId": "32667429"
        },
        "usItemId": "32667429",
        "itemRank": 2,
        "customerChoice": true
    },
    {
        "id": {
            "productId": "2N3UYGUTRYQK",
            "usItemId": "32667529"
        },
        "usItemId": "32667529",
        "itemRank": 3,
        "customerChoice": false
    },
    {
        "id": {
            "productId": "2N3UYGUTIQK",
            "usItemId": "32667329"
        },
        "usItemId": "32667329",
        "itemRank": 4,
        "customerChoice": false
    },
    {"id": {
        "productId": "2N3UYGUTYOQK",
        "usItemId": "32663929"
    },
    "usItemId": "32663929",
    "itemRank": 5,
    "customerChoice": false
    }
]

I have tried the below but not getting desired results

df.select("*", f.explode(f.from_json("substitutions", MapType(StringType(),StringType()))))


+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+-------+
|           requestid|sourcepage|              cartid|                  tm|        dt|          customerId| usItemId|prefType|       substitutions|entries|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+-------+
|00-efbedfe05b4482...|  CHECKOUT|808b44cc-1a38-4dd...|2023-04-25 00:07:...|2023-04-25|f1a34e16-a6d0-6f5...|862776084| NO_PREF|[{"id":{"productI...|   null|
+--------------------+----------+--------------------+--------------------+----------+--------------------+---------+--------+--------------------+-------+

What mistake am I doing here?

2

Answers


  1. Your issue is that you’re not properly defining the schema of your json.

    Let’s start by making a simplified version of your df (you don’t need to do this):

    someField='someVal'
    substitutions="""
    [
        {
            "id": {
                "productId": "2N3UYGUTROQK",
                "usItemId": "32667929"
            },
            "usItemId": "32667929",
            "itemRank": 1,
            "customerChoice": false
        },
        {
            "id": {
                "productId": "2N3UYGUTRHQK",
                "usItemId": "32667429"
            },
            "usItemId": "32667429",
            "itemRank": 2,
            "customerChoice": true
        },
        {
            "id": {
                "productId": "2N3UYGUTRYQK",
                "usItemId": "32667529"
            },
            "usItemId": "32667529",
            "itemRank": 3,
            "customerChoice": false
        },
        {
            "id": {
                "productId": "2N3UYGUTIQK",
                "usItemId": "32667329"
            },
            "usItemId": "32667329",
            "itemRank": 4,
            "customerChoice": false
        },
        {
            "id": {
                "productId": "2N3UYGUTYOQK",
                "usItemId": "32663929"
            },
            "usItemId": "32663929",
            "itemRank": 5,
            "customerChoice": false
        }
    ]"""
    
    df = spark.createDataFrame([(someField, substitutions)], ["someField", "substitutions"])
    

    Now, in order to be able to parse in the json (for example using the from_json function like you’re doing), you need to define the correct schema to be used. After having done that, you can explode your dataframe:

    from pyspark.sql.types import *
    from pyspark.sql.functions import from_json, explode, col
    
    # Defining the json schema
    schema = ArrayType(StructType([
        StructField("id", StructType([
            StructField("productId", StringType(), True),
            StructField("usItemId", StringType(), True),
        ]), True),
        StructField("usItemId", StringType(), True),
        StructField("itemRank", IntegerType(), True),
        StructField("customerChoice", BooleanType(), True)
    ]))
    
    output = df.withColumn("jsonStruct", explode(from_json(col("substitutions"), schema))) 
               .select("someField", "jsonStruct.*")
    
    >>> output.show(truncate=False)
    +---------+------------------------+--------+--------+--------------+
    |someField|id                      |usItemId|itemRank|customerChoice|
    +---------+------------------------+--------+--------+--------------+
    |someVal  |{2N3UYGUTROQK, 32667929}|32667929|1       |false         |
    |someVal  |{2N3UYGUTRHQK, 32667429}|32667429|2       |true          |
    |someVal  |{2N3UYGUTRYQK, 32667529}|32667529|3       |false         |
    |someVal  |{2N3UYGUTIQK, 32667329} |32667329|4       |false         |
    |someVal  |{2N3UYGUTYOQK, 32663929}|32663929|5       |false         |
    +---------+------------------------+--------+--------+--------------+
    
    
    Login or Signup to reply.
  2. Is the substitution-column a string, or is it recognised as JSON by Spark?

    If it’s recognised as JSON you can simply remove the from_json:

    df.select("*", f.explode("substitutions"))
    

    Otherwise you have to provide the entire schema of your JSON. This worked for me with the JSON you posted above:

    schema = ArrayType(StructType([
        StructField('customerChoice', BooleanType()),
        StructField('id', StructType([StructField('productId', StringType()),
                                      StructField('usItemId', StringType())])),
        StructField('itemRank', LongType()),
        StructField('usItemId', StringType())
    ]))
    df.select("*", f.explode(f.from_json('substitutions', schema)))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search