skip to Main Content

i was able to get row values from delta table using foreachWriter in spark-shell and cmd but while writing the same code in azure databricks it doesn’t work.

val process_deltatable=read_deltatable.writeStream.foreach(new ForeachWriter[Row]
  {
    def process(value: Row):Unit ={ 
      val Telemetery= value.getString(0)
      println(Telemetery)
    }
    def open(partitionId: Long, epochId: Long): Boolean = true
    def close(errorOrNull: Throwable): Unit = {}
  }
)

val xyz=process_deltatable.start()

Is there any way to get row by row values in azure databricks from a streaming delta table?

2

Answers


  1. We can get streaming data by saving it as table to specific location and load it.
    Here is the altered code.

    val  process_deltatable=read_deltatable.writeStream.foreach(new  ForeachWriter[Row]{
            def process(value: Row):Unit ={
            print(value)
            }
            def open(partitionId: Long, epochId: Long): Boolean = true 
            
            def close(errorOrNull: Throwable): Unit = {}})
       val  xyz=process_deltatable.option("checkpointLocation","dbfs:/user/hive/warehouse/desdelta/chkpoints").format("delta").start("dbfs:/user/hive/warehouse/desdelta/data")
    

    Here while saving i specified checkpoint and data saving loaction.
    After saving it you can retrieve it by running following code,

    var  df = spark.read.format("delta").load("dbfs:/user/hive/warehouse/desdelta/data/")
    display(df)
    

    Here are the results

    enter image description here

    This are the row in source delta table.
    Lets run the write stream code.
    enter image description here
    This is the result got by running writestream code.
    Now i insert a single row to source table.

    enter image description here
    enter image description here
    New row is added to source, Now lets check in destination.
    enter image description here
    Yes you can see the new row is added.
    And you see in the databricks dashboard having spike when the new record is added.
    enter image description here

    If you need to see the full files and path you can run below code in the cell.

    %fs
    ls user/hive/warehouse/desdelta/data
    

    enter image description here
    Here you can see the files generated for any updation in source table.

    Login or Signup to reply.
  2. If you just need to apply transformation to each row, then you either need to use the map function on the DataFrame (doc), or define & use user-defined function (UDF). And then write to the Delta Lake table as normal write operation:

    // Adjust record to your row structure
    case class Record(timestamp: java.sql.Timestamp, value: Long)
    
    val df = spark.readStream.format("rate").load()
    val df2 = df.as[Record].map(x => {
      // do something with your record
      x
    })
    df2.writeStream.format("delta")
      .option("checkpointLocation", "/tmp/stream1-cp")
      .start("/tmp/stream1")
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search