Using autoloader, I am reading some continues data from storage to Databricks Delta Live table. The declaration of data pipeline is as follows.
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
sch = "StructType([StructField('Date', StringType(), True), StructField('machine', StringType(), True), StructField('temperature', DecimalType(), True), StructField('time', StringType(), True)])"
@dlt.create_table(
comment="The raw machine data, ingested from azure storage.",
table_properties={
"myCompanyPipeline.quality": "raw",
"pipelines.autoOptimize.managed": "true"
}
)
def test_raw():
return (spark.readStream.format("cloudFiles").option("schema",sch).option("cloudFiles.schemaLocation", "/FileStore/schema").option("cloudFiles.format", "json").load("..../"))
And dataset I am reading from storage as below.
{"Date":"2023-10-16","time":"12:00:00","machine":"Machine1","temperature":"23.50"}
{"Date":"2023-10-16","time":"12:00:01","machine":"Machine2","temperature":"...corrupt temp..."}
{"Date":"2023-10-16","time":"12:00:02","machine":"Machine3","temperature":"27.50"}
But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully.
Ideally this should get failed because temperature column is defined as Decimal data type.
Can someone please help, why this schema enforcement not working.
2
Answers
The problem has been resolved after applying
in place of
You won’t get error whenever there is mismatch between schema and data type, it simply makes it has
null
when there is type mismatch.Note: It makes
null
only when you are running pipeline first time. If you already having table with column of different type and provided schema is of different type then you will get error as below.For
json
file type it takes everything as string if you don’t provide the schema properly.That is you provided schema in
option("schema",sch)
instead ofschema(sch)
So, you won’t get any error and it takes everything as string.