skip to Main Content

we are trying to read a table from Bigquery to spark dataframe.

Strucute of the table is
simple_table

Following pyspark code is used for reading the data.

    from google.oauth2 import service_account
    from google.cloud import bigquery
    import json
    import base64 as bs
    from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DoubleType, DecimalType
    
    schema = "schema_name"
    project_id = "project_id"
    
    table_name = "simple"
    # table_name = "jsonres"
    schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
    credentials_dict = {"Insert_actual_credentials": "here"}

    credentials = service_account.Credentials.from_service_account_info(credentials_dict)
    client = bigquery.Client(credentials=credentials, project=project_id)
    
    query = "SELECT * FROM `{}`;".format(schema_table_name)
    # print(query)
    query_job = client.query(query)
    query_job.result()
    
    s = json.dumps(credentials_dict)
    res = bs.b64encode(s.encode('utf-8'))
    ans = res.decode("utf-8")
    
    try:
        df = spark.read.format('bigquery') 
            .option("credentials", ans) 
            .option("parentProject", project_id) 
            .option("project", project_id) 
            .option("mode", "DROPMALFORMED") 
            .option('dataset', query_job.destination.dataset_id) 
            .load(query_job.destination.table_id)
        df.printSchema()
        print(df)
        df.show()
    except Exception as exp:
        print(exp)

For simple tables, we are able to read table as dataframe successfully.

But when we have json column in the big query table as given below, we are getting error.
json_col_table

We are getting the following error.

An error occurred while calling o1138.load. :
java.lang.IllegalStateException: Unexpected type: JSON at
com.google.cloud.spark.bigquery.SchemaConverters.getStandardDataType(SchemaConverters.java:355)
at
com.google.cloud.spark.bigquery.SchemaConverters.lambda$getDataType$3(SchemaConverters.java:303)

We also tried by providing schema while reading the data.

structureSchema = StructType([ 
        StructField('x', StructType([
             StructField('name', StringType(), True)
             ])),
    StructField("y", DecimalType(), True) 
  ])
print(structureSchema)

try:
    df = spark.read.format('bigquery') 
        .option("credentials", ans) 
        .option("parentProject", project_id) 
        .option("project", project_id) 
        .option("mode", "DROPMALFORMED") 
        .option('dataset', query_job.destination.dataset_id) 
        .schema(structureSchema) 
        .load(query_job.destination.table_id)
    df.printSchema()
    print(df)
    df.show()
except Exception as exp:
    print(exp)

Still we faced the same error ‘java.lang.IllegalStateException: Unexpected type: JSON’.

How to read bigquery table with json type into spark dataframe?

Update 1:
There is an open issue in github regarding this.

While reading a bigquery table, having a JSON type field from Apache Spark throws exception.

Is there any workaround for this?

2

Answers


  1. Chosen as BEST ANSWER

    There is a corresponding Github issue for this in bigquery connector repository.

    While reading a bigquery table, having a JSON type field from Apache Spark throws exception. · Issue #804 · GoogleCloudDataproc/spark-bigquery-connector

    They are saying issue is fixed in 0.28.0 and above versions.

    Commit containing their fix: json related changes

    #804 · abhijeet-lele/spark-bigquery-connector@0c7de63

    commit_Screenshot

    Maven Repository: com.google.cloud.spark » spark-bigquery maven_ss


  2. Try the code below and check if it works for you, basically, you will keep the JSON column as a string, and then you can use the spark function to get the JSON content

    import pyspark.sql.functions as f
    
    structureSchema = StructType([
        StructField('x', StringType()),
        StructField("y", DecimalType())
      ])
    
    df = (spark.read.format('bigquery')
            .option("credentials", ans)
            .option("parentProject", project_id)
            .option("project", project_id)
            .option("mode", "DROPMALFORMED")
            .option('dataset', query_job.destination.dataset_id)
            .schema(structureSchema)
            .load(query_job.destination.table_id)
         )
    
    df = df.withColumn("jsonColumnName", f.get_json_object(f.col("x"), "$.name"))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search