skip to Main Content

I have a list of .json files that contain person information. One file contains info of one person. I want to load this data into table using pyspark in an Azure Databricks notebook.

Let’s say the files are built like this:

{
    "id": 1,
    "name": "Homer",
    "address": {
        "street": "742 Evergreen Terrace"
        "city": "Springfield"
    }
}

Fairly simple json here, which i can read into a datafrom with this code:

from pyspark.sql.functions import *

sourcejson = spark.read.json("path/to/json")

df = (
    sourcejson.select(
        col('id'),
        col('name'),
        col('address.street').alias('street'),
        col('address.city').alias('city')
    )
)

which gives the expected result:

id | name  | street                | city
1  | Homer | 742 Evergreen Terrace | Springfield

However. The problem start when the address is unknown. In that case, the whole address struct in the json will just be null:

{
    "id": 2,
    "name": "Ned",
    "address": null
}

In the example file above, we don’t know Ned’s address so we have a null. Using the code from before, I would expect a result like this:

id | name | street | city
2  | Ned  | null   | null

however, running the code results in an error:

[INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "address". Need a complex type [STRUCT, ARRAY, MAP] but got "STRING"

I understand the reason behind the error but I can’t find any solution on it. Any idea’s how we could handle this?

2

Answers


  1. You’re creating (an avoidable) problem by reading one file at a time. Read all files at once spark.read.json('folder/with/all/json/files') instead of:

    • spark.read.json('folder/with/all/json/files/file1') and then
    • spark.read.json('folder/with/all/json/files/file2')

    There is a little gotcha here. In OP you’re reading one file at a time. Practically you’ll be reading all files at once.

    • When you read one file at a time (which makes no sense as you would want a single dataframe containing data from all json files as rows), spark will infer type of address as STRING for null values. Unless you specify the schema, while reading the file.
    >>> spark.read.json(sc.parallelize(['{"id": 2, "name": "Marge", "address": null}'])).printSchema()
    root
     |-- address: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
    
    >>> 
    
    • If you read all files at once it’ll infer type of address as StructType([StructField('city', StringType(), True), StructField('street', StringType(), True)]), False)] for null values. And your original code will work as is.
    >>>
    >>> df = spark.read.json(sc.parallelize([
    ...   '{"id": 2, "name": "Marge", "address": null}',
    ...   '{"id": 1, "name": "Homer", "address": {"street": "742 Evergreen Terrace", "city": "Springfield"} }'
    ... ]))
    >>> df.printSchema()
    root
     |-- address: struct (nullable = true)
     |    |-- city: string (nullable = true)
     |    |-- street: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
    
    >>> df.select(
    ...     F.col('id'),
    ...     F.col('name'),
    ...     F.col('address.street').alias('street'),
    ...     F.col('address.city').alias('city')
    ... ).show(truncate=False)
    +---+-----+---------------------+-----------+
    |id |name |street               |city       |
    +---+-----+---------------------+-----------+
    |2  |Marge|null                 |null       |
    |1  |Homer|742 Evergreen Terrace|Springfield|
    +---+-----+---------------------+-----------+
    
    >>>
    

    Use coalesce() if you do actually want to use some specific default value for null values. E.g. code below translates address=null in json file to {city='', street=null} in the dataframe, instead of {city=null, street=null} that spark does by default when you read all files at once.

    >>> json_strings = [
    ...   '{"id": 1, "name": "Homer", "address": {"street": "742 Evergreen Terrace", "city": "Springfield"} }',
    ...   '{"id": 2, "name": "Marge", "address": null}',
    ... ]
    >>> df = spark.read.json(sc.parallelize(json_strings))
    >>> df.show(truncate=False)
    +------------------------------------+---+-----+
    |address                             |id |name |
    +------------------------------------+---+-----+
    |{Springfield, 742 Evergreen Terrace}|1  |Homer|
    |null                                |2  |Marge|
    +------------------------------------+---+-----+
    
    >>>
    
    >>> default_value = F.struct(F.lit('').alias('city'), F.lit(None).alias('street'))
    >>> df2 = df.select('id', 'name', F.coalesce('address', default_value).alias('address'))
    >>> df2.show(truncate=False)
    +---+-----+------------------------------------+
    |id |name |address                             |
    +---+-----+------------------------------------+
    |1  |Homer|{Springfield, 742 Evergreen Terrace}|
    |2  |Marge|{, null}                            |
    +---+-----+------------------------------------+
    
    >>>
    >>> df2.select(
    ...     F.col('id'),
    ...     F.col('name'),
    ...     F.col('address.street').alias('street'),
    ...     F.col('address.city').alias('city')
    ... ).show(truncate=False)
    +---+-----+---------------------+-----------+
    |id |name |street               |city       |
    +---+-----+---------------------+-----------+
    |1  |Homer|742 Evergreen Terrace|Springfield|
    |2  |Marge|null                 |           |
    +---+-----+---------------------+-----------+
    
    >>>
    
    Login or Signup to reply.
  2. When you don’t provide a schema for spark.read.json, it will be inferred from the data. So when the address is missing in all objects, Spark assumes it is a StringType and that’s why you are getting the error. One possible solution is to read the data with a schema:

    from pyspark.sql import functions as F
    from pyspark.sql.types import StructType, StructField, StringType, LongType
    
    raw_data = spark.sparkContext.parallelize(
        ['{"id": 2, "name": "Marge", "address": null}']
    )
    address_struct = StructType([
        StructField('street', StringType(), True),
        StructField('city', StringType(), True),
    ])
    schema = StructType([
        StructField('id', LongType(), True),
        StructField('name', StringType(), True),
        StructField('address', address_struct, True),
    ])
    sourcejson = spark.read.json(raw_data, schema=schema)
    
    res = (
        sourcejson.select(
            F.col('id'),
            F.col('name'),
            F.col('address.street').alias('street'),
            F.col('address.city').alias('city')
        )
    )
    res.show(10, False)
    
    # +---+-----+------+----+
    # |id |name |street|city|
    # +---+-----+------+----+
    # |2  |Marge|NULL  |NULL|
    # +---+-----+------+----+
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search