skip to Main Content

Using autoloader, I am reading some continues data from storage to Databricks Delta Live table. The declaration of data pipeline is as follows.

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
    
sch = "StructType([StructField('Date', StringType(), True), StructField('machine', StringType(), True), StructField('temperature', DecimalType(), True), StructField('time', StringType(), True)])" 
    
    @dlt.create_table(
      comment="The raw machine data, ingested from azure storage.",
      table_properties={
        "myCompanyPipeline.quality": "raw",
        "pipelines.autoOptimize.managed": "true"
      }
    )
    
    def test_raw():
      return (spark.readStream.format("cloudFiles").option("schema",sch).option("cloudFiles.schemaLocation", "/FileStore/schema").option("cloudFiles.format", "json").load("..../"))

And dataset I am reading from storage as below.

{"Date":"2023-10-16","time":"12:00:00","machine":"Machine1","temperature":"23.50"}
{"Date":"2023-10-16","time":"12:00:01","machine":"Machine2","temperature":"...corrupt temp..."}
{"Date":"2023-10-16","time":"12:00:02","machine":"Machine3","temperature":"27.50"}

But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully.
Ideally this should get failed because temperature column is defined as Decimal data type.

Can someone please help, why this schema enforcement not working.

2

Answers


  1. Chosen as BEST ANSWER

    The problem has been resolved after applying

    spark.readStream.format("cloudFiles").schema(sch)
    

    in place of

    spark.readStream.format("cloudFiles").option("schema",sch)
    

  2. But unfortunately, the pipeline is not failing for wrong "temperature" data (Non Decimal) and pipeline is processing all records successfully. Ideally this should get failed because temperature column is defined as Decimal data type.

    You won’t get error whenever there is mismatch between schema and data type, it simply makes it has null when there is type mismatch.

    Note: It makes null only when you are running pipeline first time. If you already having table with column of different type and provided schema is of different type then you will get error as below.

    enter image description here

    For json file type it takes everything as string if you don’t provide the schema properly.

    That is you provided schema in option("schema",sch) instead of schema(sch)

    enter image description here

    So, you won’t get any error and it takes everything as string.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search