skip to Main Content

I have a spark dataset that has fields: "identifier_id", "inner_blob" , "json_blob"

  "inner_blob": {
    "identifier_id": 2.0,
    "name": "test1",
    "age": 30.0
  },
  "identifier_id": 2.0,
  "json_blob": {
    "identifier_id": 2.0,
    "order_id": 2.0,
    "inner_blob": [
      {
        "item_id": 23.0,
        "item_name": "airpods2",
        "item_price": 300.0
      },
      {
        "item_id": 23.0,
        "item_name": "airpods1",
        "item_price": 600.0
      }
    ]
  }
}

How can I merge the values of two columns called "inner_blob" and "jsob_blob" into one column "json_blob" and "identifier_id" column will remain same. Actual output looks like this:

  "identifier_id": 2.0,
  "json_blob": {
    "identifier_id": 2.0,
    "name": "test1",
    "age": 30.0
    "order_id": 2.0,
    "inner_blob": [
      {
        "item_id": 23.0,
        "item_name": "airpods2",
        "item_price": 300.0
      },
      {
        "item_id": 23.0,
        "item_name": "airpods1",
        "item_price": 600.0
      }
    ]
  }
}

2

Answers


  1. To add a new field to a column of type struct from another column in apache spark, you can use the struct function:

    val resultDf = df.withColumn("json_blob", struct(col("inner_blob.name").alias("name"), col("inner_blob.age").alias("age"), col("json_blob.*"))).drop("inner_blob")
    

    UPDATE:

    If you are using spark >= 3.1, you can use dropFields to drop the not needed fields from a struct:

    val res = df.withColumn("inner_blob", col("inner_blob").dropFields("identifier_id"))
      .withColumn("json_blob", struct(col("inner_blob.*"), col("json_blob.*"))).drop("inner_blob")
    
    Login or Signup to reply.
  2. If we want to be completely agnostic of the fields that are inside inner_blob and json_blob, we can use the schema to get the column names. Then we need to decide what to do if a name is present in both structs. Let’s decide to take the one from inner_blob and drop the one from json_blob (the diff in the code below) but we can adjust the code for any other logic.

    def extractStructFields(schema : StructType, structName : String) =
        schema.fields.find(_.name == structName).get
              .dataType.asInstanceOf[StructType].fieldNames
    val inner_fields = extractStructFields(df.schema, "inner_blob")
    val json_fields = extractStructFields(df.schema, "json_blob")
    
    val result = df
        .withColumn("json_blob", struct(
               inner_fields.map("inner_blob." + _).map(col) ++
               json_fields.diff(inner_fields).map("json_blob." + _).map(col) : _*))
        .drop("inner_blob")
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search