I am trying to read data from parquet file in blob storage in databricks and writing to a delta table.
Cluster config = 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
1.df = spark.read.format("parquet").load("/mnt/path") — Reading successfully
2.df.write.format("delta").mode("overwrite").saveAsTable(path)
Here giving this error SchemaColumnConvertNotSupportedException: column: [Col_Name], physicalType: INT64, logicalType: string
I have tried getting the schema from parquet and enforce it while reading it but still i am getting the error. Tried different spark conf setting but no result.
Thanks
Detailed approach
# Extracting the schema
myschema = spark.read.option("header","true").option("recursiveFileLookup","true").parquet("/mnt/path").schema
print(myschema)
StructType([StructField('GatewayID', StringType(), True), StructField('Version', StringType(), True), StructField('Generation', StringType(), True), StructField('AppVersion', StringType(), True), StructField('UnitEntity', StringType(), True), StructField('SubID', StringType(), True), StructField('SIMCardID', StringType(), True), StructField('UnitNumber', StringType(), True), StructField('UnitType', StringType(), True), StructField('ISOCountryCode', StringType(), True), StructField('ReportTime', LongType(), True), StructField('MessageFormat', StringType(), True), StructField('MessagesAsString', StringType(), True)])
# Imposing the schema
df = spark.read.format("parquet").schema(myschema).load("/mnt/path")
# Writing to datalake
df.write.format("delta").mode("overwrite").saveAsTable(path)
Error:
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [SubID], physicalType: INT64, logicalType: string
Analysis:
If you see the subID col. is string while extracting the schema but in target parquet as per error it is INT64.
I have tried to convert the datatype in dataframe and write to delta but same schema error.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
df = df.withColumn("SubID", col("SubID").cast(IntegerType()))
2
Answers
How you are enforcing schema while reading it from parquet.
Please pass the schema while reading as below and check
val schema= structtype
val df = spark.read().parquet(path, schema)
As per this article by @shanmugavel.chandrakasu,
First, try to disable the vectorized format and then read the parquet file from source.
Now, read the parquet file and check whether the data types of source and target columns are same or not.
If not, use below code to cast it to the
INT64
data type. It is mentioned that your target hasINT64
which islong
, so use theLongType
while casting.