skip to Main Content

I have a view in databricks and in one column the content is a json, example:

[{"ECID":100017056,"FIRST_NAME":"Ioannis","LAST_NAME":"CHATZIZYRLIS","TITLE":"Mr","GENDER":"M","DATE_OF_BIRTH":"1995-04-14","PLACE_OF_BIRTH":"Greece","COUNTRY_OF_BIRTH":"GR","NATIONALITY":"GR","RESIDENCE":"GR","ADDRESS":[{"TYPE":1,"STREET_1":"Gizi 6","CITY":"Agios Doe","POSTAL_CODE":"111 34","COUNTRY":"GR","MOBILE":"0000000","EMAIL":" "}]}]

I want to retrieve the content of that column so that I can post it to an api.

I tried loading it in a dataframe and then flatting it and various other options but each time I get it back as below, having it encapsulated in [Row()]

[Row(ECID=100016583, CID=None, FIRST_NAME='test', LAST_NAME='test', TITLE='Ms', GENDER='F', DATE_OF_BIRTH=datetime.date(1989, 10, 22), PLACE_OF_BIRTH='test', COUNTRY_OF_BIRTH='RS', NATIONALITY='RS', RESIDENCE='RS', AGENCY_ID=None, ADDRESS=[Row(TYPE=1, STREET_1='test 6', STREET_2=None, STREET_3=None, CITY='test', POSTAL_CODE='12226', COUNTRY='RS', PHONE=None, MOBILE='1111111', EMAIL='[email protected]')], PASSPORT=[Row(DOCUMENT_TYPE_ID=1, DOCUMENT_TYPE='PASSPORT', NUMBER='test', FIRST_NAME='test', LAST_NAME='test', ISSUE_COUNTRY='RS', ISSUE_CITY='test', ISSUE_DATE=datetime.date(2019, 10, 24), EXPIRATION_DATE=datetime.date(2029, 10, 24), PRIMARY=True)], VISAS=[Row(VISA_NAME_ID='Mandatory; Internal MXP identifier of Visa name. -Probably mapping', VISA_NAME='German Embassy', VISA_NUMBER='test', VISA_ISSUED_CITY_NAME='test', VISA_ISSUED_COUNTRY='RS', VISA_ISSUED_DATE=datetime.date(2021, 12, 28), VISA_EXPIRATION_DATE=datetime.date(2023, 1, 2))])]

How can I retrieve it so that I can get it in the correct format so that I can post it to the api?

2

Answers


  1. Try using df.rdd.flatmap to get information extracted from the Row.

    Example:

    json = """[{"ECID":100017056,"FIRST_NAME":"Ioannis","LAST_NAME":"CHATZIZYRLIS","TITLE":"Mr","GENDER":"M","DATE_OF_BIRTH":"1995-04-14","PLACE_OF_BIRTH":"Greece","COUNTRY_OF_BIRTH":"GR","NATIONALITY":"GR","RESIDENCE":"GR"}]"""
    df = spark.read.json(sc.parallelize([json]), multiLine=True)
    print(df.columns)
    #['COUNTRY_OF_BIRTH', 'DATE_OF_BIRTH', 'ECID', 'FIRST_NAME', 'GENDER', 'LAST_NAME', 'NATIONALITY', 'PLACE_OF_BIRTH', 'RESIDENCE', 'TITLE']
    print(df.rdd.flatMap(lambda x: x).collect())
    #['GR', '1995-04-14', 100017056, 'Ioannis', 'M', 'CHATZIZYRLIS', 'GR', 'Greece', 'GR', 'Mr']
    
    Login or Signup to reply.
  2. When you do all necessary transformations and collect the data to driver node, then you can just call the .asDict on the row object to get Python dict representation of that row

    df = ... # transform
    data = [r[0].asDict() for r in df.collect()]
    
    • The for r in df.collect() iterates over the rows of the dataframe
    • r[0] gives you a struct column represented as Row object
    • .asDict converts it into Python dictionary

    P.S. If you have a lot of data to send, then you may consider using the Pandas UDF instead so you can call API in parallel from all executors

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search