skip to Main Content

I am trying to read data from parquet file in blob storage in databricks and writing to a delta table.

Cluster config = 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)

1.df = spark.read.format("parquet").load("/mnt/path") — Reading successfully
2.df.write.format("delta").mode("overwrite").saveAsTable(path)

Here giving this error SchemaColumnConvertNotSupportedException: column: [Col_Name], physicalType: INT64, logicalType: string

I have tried getting the schema from parquet and enforce it while reading it but still i am getting the error. Tried different spark conf setting but no result.

Thanks

Detailed approach

# Extracting the schema
myschema = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("/mnt/path").schema
print(myschema)

StructType([StructField('GatewayID', StringType(), True), StructField('Version', StringType(), True), StructField('Generation', StringType(), True), StructField('AppVersion', StringType(), True), StructField('UnitEntity', StringType(), True), StructField('SubID', StringType(), True), StructField('SIMCardID', StringType(), True), StructField('UnitNumber', StringType(), True), StructField('UnitType', StringType(), True), StructField('ISOCountryCode', StringType(), True), StructField('ReportTime', LongType(), True), StructField('MessageFormat', StringType(), True), StructField('MessagesAsString', StringType(), True)])

# Imposing the schema

df = spark.read.format("parquet").schema(myschema).load("/mnt/path")

# Writing to datalake
df.write.format("delta").mode("overwrite").saveAsTable(path)

Error:
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [SubID], physicalType: INT64, logicalType: string

Analysis:
If you see the subID col. is string while extracting the schema but in target parquet as per error it is INT64.
I have tried to convert the datatype in dataframe and write to delta but same schema error.

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df = df.withColumn("SubID", col("SubID").cast(IntegerType()))

2

Answers


  1. How you are enforcing schema while reading it from parquet.
    Please pass the schema while reading as below and check

    val schema= structtype
    val df = spark.read().parquet(path, schema)

    Login or Signup to reply.
  2. As per this article by @shanmugavel.chandrakasu,

    From Databricks 7.3 and above, when reading the parquet files, spark will read the files in vectorized format. This might be the reason that it is taking the native data type string (data type taken when reading parquet file) instead of casting data type int.

    First, try to disable the vectorized format and then read the parquet file from source.

    spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
    

    Now, read the parquet file and check whether the data types of source and target columns are same or not.

    If not, use below code to cast it to the INT64 data type. It is mentioned that your target has INT64 which is long, so use the LongType while casting.

    from pyspark.sql.types import LongType
    df = df.withColumn("SubID", col("SubID").cast(LongType()))
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search