skip to Main Content

I have below pyspark row type data:

indv_msg = [Row(cbm_json_output=Row(country_code='USA', date='06-10-2023', date_epoch='1696550400', id='USA-001535-1696550400', interfaceVersion='1.0.0', opmode_car_door=Row(health_category='GREEN', msg_id='1', num_yellow_preds_in_last_14_days=0, reason=None, reasonDetail=None), opmode_landing_door=Row(health_category='GREEN', msg_id='1', reason=None, reasonDetail=None), sensor=Row(component_type=None, health_category=None, landing_priority=None, msg_id='1', num_yellow_preds_in_last_14_days=None, reason=None, reasonDetail=None), unit_id='001535'))

While trying to convert to json string, it is eliminating assign field such as "country_code", "date", ….

user_encode_data = json.dumps(indv_msg, indent=2)

result :
indv_msg

 [
  [
    "USA",
    "06-10-2023",
    "1696550400",
    "USA-001535-1696550400",
    "1.0.0",
    [
      "GREEN",
      "1",
      0,
      null,
      null
    ],
    [
      "GREEN",
      "1",
      null,
      null
    ]
]

Expected result:
indv_msg

[
  [
    "country_code" : "USA",
    "date" : "06-10-2023",
    "date_epoch": 1696550400",
     ....
     ....
   ]
 ]

2

Answers


  1. Define a function (to_dict) to recursively convert pyspark Row type to the corresponding dictionary representation then map this function on each row inside indv_msg finally use json.dumps to serialize the data

    def to_dict(row):
        return {
            k: to_dict(v) if isinstance(v, Row) else v
            for k, v in row.asDict().items()
        }
    
    data = json.dumps([to_dict(msg) for msg in indv_msg], indent=4)
    

    print(data)
    [
        {
            "cbm_json_output": {
                "country_code": "USA",
                "date": "06-10-2023",
                "date_epoch": "1696550400",
                "id": "USA-001535-1696550400",
                "interfaceVersion": "1.0.0",
                "opmode_car_door": {
                    "health_category": "GREEN",
                    "msg_id": "1",
                    "num_yellow_preds_in_last_14_days": 0,
                    "reason": null,
                    "reasonDetail": null
                },
                "opmode_landing_door": {
                    "health_category": "GREEN",
                    "msg_id": "1",
                    "reason": null,
                    "reasonDetail": null
                },
                "sensor": {
                    "component_type": null,
                    "health_category": null,
                    "landing_priority": null,
    ...
                "unit_id": "001535"
            }
        }
    ]
    
    Login or Signup to reply.
  2. Addition to @Shubham Sharma you can just call row.asDict(True) with recursive as True.

    When i tried in my environment even i got the same results.

    enter image description here

    So you can use below code block to get json output.

    indv_msg_dict = [row.asDict(True) for row in indv_msg]
    
    user_encode_data = json.dumps(indv_msg_dict, indent=2)
    
    print(user_encode_data)
    

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search