skip to Main Content

I’m working on a data flow to ingest XML files and trying to pull a deeply nested node’s value up into another part of the flow. E.g.

<payload>
  <header>
    <customerID>1234</customerID>
    <createdDate>1/2/2023T15:33:22</createdDate>
    ....
  </header>
  <body>
    ...
    <compartments>
      <compartment>
        <compartmentNo>1</compartmentNo>
        <product>2</product>
        <quantity>5000</quantity>
        ...
        <references>
          <reference>
            <referenceType>ShipmentID</referenceType>
            <referenceValue>23434</referenceValue>
          </reference>
          <reference>
            ...
          </reference>
        </reference>
      </compartment>
    </compartments
  </body>
</payload>

Note: This XML is not complete and also not a sensible structure but it’s what we’ve got from the vendor.

The file is ingested into 2 tables: Shipments and ShipmentCompartments however the ShipmentID belongs in the Shipments table.

I’m using a flattening activity to get all the compartments and then flattening references, but I’m unsure of how to get the shipment ID up to the Shipments Sinc activity especially since it is part of an array so I would need to get the correct Reference node (by filtering the referenceType by Shipment ID) and then extracting the value from the adjacent referenceValue node.

Source: XML File from a Blob storage container
Target: Azure SQL Server (split into multiple tables)

Table structure where data is currently being landed:
enter image description here

Any help would be appreciated.

2

Answers


    • I have used the following xml data as my source for demonstration:
    <payload>
        <header>
        <customerID>1234</customerID>
        <createdDate>1/2/2023T15:33:22</createdDate>
      </header>
      <body>
        <compartments>
          <compartment>
            <compartmentNo>1</compartmentNo>
            <product>2</product>
            <references>
              <reference>
                <referenceType>ShipmentID</referenceType>
                <referenceValue>23434</referenceValue>
              </reference>
            </references>
          </compartment>
          <compartment>
            <compartmentNo>2</compartmentNo>
            <product>22</product>
            <references>
              <reference>
                <referenceType>ShipmentID</referenceType>
                <referenceValue>123434</referenceValue>
              </reference>
            </references>
          </compartment>
        </compartments>
      </body>
    </payload>
    
    • I have used select transformation with rule based mapping to unpack the header property and a fixed mapping to select body.

    enter image description here

    • I have used flatten transformation unrolling by body.compartments.compartment with the following configurations.

    enter image description here

    • I have used another select transformation to select only customerID and createdDate as fixed mapping and a rule-based mapping to unpack the hierarchy references.reference:

    enter image description here

    • The data preview after the above operations would be as shown in the below image:

    enter image description here

    • If you have other referenceType values other than ShipmentID, you can use the filter transformation with condition referenceType=='ShipmentID'

    enter image description here

    • Now, you can select the sink table (shipments) and manually map the values to respective columns. The following is the entire dataflow JSON:
    {
        "name": "dataflow2",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                    {
                        "dataset": {
                            "referenceName": "Xml1",
                            "type": "DatasetReference"
                        },
                        "name": "source1"
                    }
                ],
                "sinks": [
                    {
                        "name": "sink1"
                    }
                ],
                "transformations": [
                    {
                        "name": "select1"
                    },
                    {
                        "name": "flatten1"
                    },
                    {
                        "name": "select2"
                    },
                    {
                        "name": "filter1"
                    }
                ],
                "scriptLines": [
                    "source(output(",
                    "          payload as (body as (compartments as (compartment as (compartmentNo as short, product as short, references as (reference as (referenceType as string, referenceValue as integer)))[])), header as (createdDate as string, customerID as short))",
                    "     ),",
                    "     allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     ignoreNoFilesFound: false,",
                    "     validationMode: 'none',",
                    "     namespaces: true) ~> source1",
                    "source1 select(mapColumn(",
                    "          each(payload.header,match(true())),",
                    "          body = payload.body",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select1",
                    "select1 foldDown(unroll(body.compartments.compartment),",
                    "     mapColumn(",
                    "          customerID,",
                    "          createdDate,",
                    "          references = body.compartments.compartment.references",
                    "     ),",
                    "     skipDuplicateMapInputs: false,",
                    "     skipDuplicateMapOutputs: false) ~> flatten1",
                    "flatten1 select(mapColumn(",
                    "          customerID,",
                    "          createdDate,",
                    "          each(references.reference,match(true()))",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select2",
                    "select2 filter(referenceType=='ShipmentID') ~> filter1",
                    "filter1 sink(validateSchema: false,",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true,",
                    "     store: 'cache',",
                    "     format: 'inline',",
                    "     output: false,",
                    "     saveOrder: 1,",
                    "     mapColumn(",
                    "          customerID,",
                    "          createdDate,",
                    "          ShipmentID = referenceValue",
                    "     )) ~> sink1"
                ]
            }
        }
    }
    
    Login or Signup to reply.
  1. Azure SQL DB can read directly from blob store reading OPENROWSET. A simple example:

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'super$trongPassw0rd!';
    
    CREATE DATABASE SCOPED CREDENTIAL cred_datalake
    WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
    SECRET = 'sv=2020-02-10the rest of your SAS key-no question mark at the front';
    
    CREATE EXTERNAL DATA SOURCE ds_datalake
    WITH (
        TYPE = BLOB_STORAGE,
        LOCATION = 'https://yourStorageAccount.blob.core.windows.net/datalake',
        CREDENTIAL = cred_datalake
    );
    GO
    
    DECLARE @xml XML;
    
    SELECT @xml = BulkColumn 
    FROM OPENROWSET(
        BULK 'raw/xml/payload.xml', 
        DATA_SOURCE = 'ds_datalake', 
        SINGLE_BLOB
    ) AS x;
    
    SELECT @xml;
    
    --INSERT INTO ...
    SELECT
        p.c.value('(header/customerID/text())[1]', 'INT') AS customerID,
        p.c.value('(header/createdDate/text())[1]', 'VARCHAR(20)') AS createdDate,
    
        c.c.value('(compartmentNo/text())[1]', 'INT') AS compartmentNo,
        c.c.value('(product/text())[1]', 'INT') AS product,
        c.c.value('(quantity/text())[1]', 'INT') AS quantity,
    
        r.c.value('(referenceType/text())[1]', 'VARCHAR(20)') AS referenceType,
        r.c.value('(referenceValue/text())[1]', 'VARCHAR(20)') AS referenceValue,
    
        p.c.query('.') payload
    FROM @xml.nodes('payload') p(c)
        CROSS APPLY p.c.nodes('body/compartments/compartment') c(c)
            CROSS APPLY c.c.nodes('references/reference') r(c);
    

    Other setup required:

    • Enable the System assigned Managed identity for the SQL server which hosts your
    • Grant that identity the Storage Blob Data Reader RBAC role on the storage account

    Adapt this example for your requirement. Use ADF for orchestration and make use of the existing compute you have in the Azure SQL DB.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search