skip to Main Content

The problem is after each row is inserted I need to store the identity and update again it in the source. How is it possible in ADF, in ssms we can use identity_insert?

in first database, i have a table

ID1, name, ID2
1,'a',null
2,'b',null

in 2nd databse, table is

ID2, name
1,'a'

So, I need to update ID2 in first database after inserting from 1st database table to 2nd database table

2

Answers


  1. Showcase of the full pipeline, as well as better explanation would be useful. Are you using storedprocs here to modify the databases?

    Also, if you want changes to be updated every single time the table in the first database is updated, you can use CDC (Change data capture).

    Login or Signup to reply.
  2. If your source and sink databases are same, then using copy activity pre copy script, you can get the desired result.

    But you mentioned the source and sink are two different databases.
    I could able to achieve your requirement as below.

    NOTE: This approach only works for inserting records.

    This is my source table with identity column ID1:

    enter image description here

    My target table with Identity column ID2:

    enter image description here

    • First, I have taken source table ID1 column first value using lookup activity query select TOP (1) ID1 from [dbo].[mysource].

    • After the copy activity from source table to target table I have generated an array using range of values from ID1(1st row value) to number of records copied in copy activity(till last row ID1). I have used below dynamic content for it.
      @range(activity('Source First ID').output.value[0].ID1,activity('Copy data1').output.rowsCopied)

      enter image description here

    • Then I have taken another lookup query select IDENT_CURRENT('mytarget2') as ID2; to target table to get the current identity(last inserted ID2). I have used copy activity rows copied and this lookup output to generate target ID2 array using below dynamic content.

      @range(int(string(add(sub(activity('get last ID2').output.value[0].ID2,activity('Copy data1').output.rowsCopied),1))), activity('Copy data1').output.rowsCopied)
      

      enter image description here

    • Now, I am going to use the above arrays to update source table ID2 using script activity inside a ForEach. For iterating two arrays at a time inside a For Each I have generated an index array with @range(0, activity('Copy data1').output.rowsCopied).

    • Give this index array to ForEach activity and inside the ForEach use a script activity for source database with following dynamic content.

      UPDATE mysource SET ID2=@{variables('targetID2array')[item()]} WHERE ID1 = @{variables('sourceidsarray')[item()]};
      

      enter image description here

    This is my Pipeline flow:

    enter image description here

    My Pipeline JSON:

    {
    "name": "pipeline2",
    "properties": {
        "activities": [
            {
                "name": "Source First ID",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": "select TOP (1) ID1 from [dbo].[mysource]",
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "Copy data1",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "Source First ID",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": "select name from mysource;",
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "sink": {
                        "type": "AzureSqlSink",
                        "writeBehavior": "insert",
                        "sqlWriterUseTableLock": false
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "typeConversion": true,
                        "typeConversionSettings": {
                            "allowDataTruncation": true,
                            "treatBooleanAsNumber": false
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "target",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "SourceIDSarray",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Copy data1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "sourceidsarray",
                    "value": {
                        "value": "@range(activity('Source First ID').output.value[0].ID1,activity('Copy data1').output.rowsCopied)",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "targetID2 array",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "get last ID2",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "targetID2array",
                    "value": {
                        "value": "@range(int(string(add(sub(activity('get last ID2').output.value[0].ID2,activity('Copy data1').output.rowsCopied),1))), activity('Copy data1').output.rowsCopied)",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "get last ID2",
                "type": "Lookup",
                "dependsOn": [
                    {
                        "activity": "SourceIDSarray",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": "select IDENT_CURRENT('mytarget2') as ID2;",
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "target",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "Index array",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "targetID2 array",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "index_array",
                    "value": {
                        "value": "@range(0, activity('Copy data1').output.rowsCopied)",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Index array",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@variables('index_array')",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Script1",
                            "type": "Script",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "linkedServiceName": {
                                "referenceName": "AzureSqlDatabase1",
                                "type": "LinkedServiceReference"
                            },
                            "typeProperties": {
                                "scripts": [
                                    {
                                        "type": "Query",
                                        "text": {
                                            "value": "UPDATE mysource SET ID2=@{variables('targetID2array')[item()]} WHERE ID1 = @{variables('sourceidsarray')[item()]};",
                                            "type": "Expression"
                                        }
                                    }
                                ],
                                "scriptBlockExecutionTimeout": "02:00:00"
                            }
                        }
                    ]
                }
            }
        ],
        "variables": {
            "sourceidsarray": {
                "type": "Array"
            },
            "targetID2array": {
                "type": "Array"
            },
            "index_array": {
                "type": "Array"
            },
            "ok": {
                "type": "Array"
            }
        },
        "annotations": []
    }
    }
    

    Target table after Pipeline execution:

    enter image description here

    Source table with updated ID2 values after Pipeline execution:

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search