skip to Main Content

I have a json that looks like this:

[
    {
        "event_date": "20221207",
        "user_properties": [
            {
                "key": "user_id",
                "value": {
                    "set_timestamp_micros": "1670450329209558"
                }
            },
            {
                "key": "doc_id",
                "value": {
                    "set_timestamp_micros": "1670450329209558"
                }
            }
        ]
    },
    {
        "event_date": "20221208",
        "user_properties": [
            {
                "key": "account_id",
                "value": {
                    "int_value": "3176465",
                    "set_timestamp_micros": "1670450323992556"
                }
            },
            {
                "key": "user_id",
                "value": {
                    "string_value": "430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d",
                    "set_timestamp_micros": "1670450323992556"
                }
            }
        ]
    }
]

When I read it using spark.read.json(JSON_PATH), I got the following schema:

root
 |-- event_date: string (nullable = true)
 |-- user_properties: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: struct (nullable = true)
 |    |    |    |-- int_value: string (nullable = true)
 |    |    |    |-- set_timestamp_micros: string (nullable = true)
 |    |    |    |-- string_value: string (nullable = true)

I need to parse it using pyspark and the result dataframe should be like this:

event_date up_account_id_int up_account_id_set_timestamp_micros up_doc_id_set_timestamp_micros up_user_id_set_timestamp_micros up_user_id_string
20221208 3176465 1670450323992556 null 1670450323992556 430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d
20221207 null null 1670450329209558 1670450329209558 null

Any ideas on how can I accomplish it?

2

Answers


  1. You can use this function:

    import org.apache.spark.sql.DataFrame
    
    def flattenDataframe(df: DataFrame): DataFrame = {
    
    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    val length = fields.length
    
    for (i <- 0 to fields.length - 1) {
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName")
          // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*"))
          val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
          return flattenDataframe(explodedDf)
        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
          val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
          val explodedf = df.select(renamedcols: _*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
    }
    
    val flattendedJSON = flattenDataframe(df)
    

    Before flattening:

    +----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
    |event_date|user_properties                                                                                                                                         |
    +----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
    |20221207  |[{user_id, {null, 1670450329209558, null}}, {doc_id, {null, 1670450329209558, null}}]                                                                   |
    |20221208  |[{account_id, {3176465, 1670450323992556, null}}, {user_id, {null, 1670450323992556, 430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d}}]|
    +----------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    After flattening:

    +----------+-------------------+-------------------------------+------------------------------------------+----------------------------------------------------------------+
    |event_date|user_properties_key|user_properties_value_int_value|user_properties_value_set_timestamp_micros|user_properties_value_string_value                              |
    +----------+-------------------+-------------------------------+------------------------------------------+----------------------------------------------------------------+
    |20221207  |user_id            |null                           |1670450329209558                          |null                                                            |
    |20221207  |doc_id             |null                           |1670450329209558                          |null                                                            |
    |20221208  |account_id         |3176465                        |1670450323992556                          |null                                                            |
    |20221208  |user_id            |null                           |1670450323992556                          |430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d|
    +----------+-------------------+-------------------------------+------------------------------------------+----------------------------------------------------------------+
    
    Login or Signup to reply.
  2. First you can explode the array then flatten struct with select.

    df = (df.select('event_date', F.explode('user_properties').alias('user_properties'))
          .select('event_date', 'user_properties.key', 'user_properties.value.*')
    )
    

    And it seems you are pivoting the data. This won’t give you the exact dataframe as you posted but you should be able to transform it as you like.

    df = (df.groupby('event_date')
          .pivot('key')
          .agg(F.max('int_value').alias('id_int'),
               F.max('set_timestamp_micros').alias('set_timestamp_micros'),
               F.max('string_value').alias('string')))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search