i was able to get row values from delta table using foreachWriter in spark-shell and cmd but while writing the same code in azure databricks it doesn’t work.
val process_deltatable=read_deltatable.writeStream.foreach(new ForeachWriter[Row]
{
def process(value: Row):Unit ={
val Telemetery= value.getString(0)
println(Telemetery)
}
def open(partitionId: Long, epochId: Long): Boolean = true
def close(errorOrNull: Throwable): Unit = {}
}
)
val xyz=process_deltatable.start()
Is there any way to get row by row values in azure databricks from a streaming delta table?
2
Answers
We can get streaming data by saving it as table to specific location and load it.
Here is the altered code.
Here while saving i specified checkpoint and data saving loaction.
After saving it you can retrieve it by running following code,
Here are the results
This are the row in source delta table.
Lets run the write stream code.
This is the result got by running writestream code.
Now i insert a single row to source table.
New row is added to source, Now lets check in destination.
Yes you can see the new row is added.
And you see in the databricks dashboard having spike when the new record is added.
If you need to see the full files and path you can run below code in the cell.
Here you can see the files generated for any updation in source table.
If you just need to apply transformation to each row, then you either need to use the
map
function on the DataFrame (doc), or define & use user-defined function (UDF). And then write to the Delta Lake table as normal write operation: