I’ve got an issue that I cannot make sense of. The objective for us is pretty straightforward:
- We have an ADLS Gen2 storage account that has JSON files stored on it
- We have a Fabric Eventhouse (KQL) database setup
- We want to be able to take the JSON content of any blob file on that storage account and ingest it into our eventhouse database
I have done this successfully end to end using a full-fledged Azure Data Explorer database; including continuous updates via a data-connection and the appropriate Event Grid / Event Hub connections. It took some doing to figure out some of the nuances, but it works. However, we would like to build this new KQL db directly in Fabric as there are some capabilities you get that aren’t available if we just added the existing data explorer database as a "Shortcut/Follower" db.
For a quick run-down of our approach:
I have created a "raw" table to hold the incoming data, an ingestion mapping to make sure the data comes in properly, a final table we’ll ultimately use later for querying, a function to transform the data from the raw table to final table schema, and an update policy for the final table. As I said, it works beautifully…. UNTIL I want to do all of the same things in a Microsoft Fabric eventhouse database.
The reason I cannot do it the same way is that there is no way to setup the data connection in Fabric Eventhouse (so the KQL db knows where the content of the blob triggering the event is). After a lot of round and round, we decided we’d just setup an event stream, Reflex instance, and Data Pipeline in Fabric to do the same task. But then we just exchange that for another problem.
At the end of the day here, we’re trying to do something pretty simple. We’ve got some JSON files sitting in storage and we want to suck them into a db. I know I have to be missing something stupid simple here – I just can’t figure out what might be. We’ve reached out to MS Support to no avail.
Raw table schema:
.create table HistoryRaw_EMR (CreatedUtc: datetime, TraceId: string, OktaUserId: string, TableName: string, HistoryTableNameId: int, ActionId: int, KeyValues: dynamic, FieldHistory: dynamic)
Ingestion mapping:
.create-or-alter table ['HistoryRaw_EMR'] ingestion json mapping 'HistoryRaw_EMR_mapping' '[{"column":"CreatedUtc", "Properties":{"Path":"$.CreatedUtc"}},{"column":"TraceId", "Properties":{"Path":"$['TraceId']"}},{"column":"OktaUserId", "Properties":{"Path":"$['OktaUserId']"}},{"column":"TableName", "Properties":{"Path":"$['TableName']"}},{"column":"HistoryTableNameId", "Properties":{"Path":"$['HistoryTableNameId']"}},{"column":"ActionId", "Properties":{"Path":"$['ActionId']"}},{"column":"KeyValues", "Properties":{"Path":"$['KeyValues']"}},{"column":"FieldHistory", "Properties":{"Path":"$['FieldHistory']"}}]'
For the purposes of this question, the schema of the final table doesn’t matter. The problem is that if I try to use the Copy activity in a Fabric Data Pipeline, I get an error:
ErrorCode=KustoMappingReferenceHasWrongKind,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Mapping reference should be of kind 'Csv'. Mapping reference: 'HistoryRaw_EMR_New_mapping'. Kind 'Json'.,Source=Microsoft.DataTransfer.Runtime.KustoConnector,'
I have specified every single ingestion property I can to make sure I am effectively matching the "ingest" command directly – but it is clearly somehow getting handed some sort of CSV format. Is there a way to "intercept" exactly what a copy activity is handing to the destination? I have tried removing the "Mapping" property from the copy activity, and instead just specify each column’s mapping in the "Destination" settings, but that doesn’t work because one of the properties is an array.
2
Answers
Hoping this helps others...
The solution to this was a bit of a head-scratcher, but here is the gist:
It sounds like the source data format isn’t set correctly in your pipeline. Edit your copy activity and ensure that the Data Format for the source is set to JSON.
In the copy activity sink settings, ensure the mapping reference kind is also set to JSON.
Here’s an example of a pipeline configuration: