skip to Main Content

ImageI am currently reading a SQL Table which has more than 5000 records. Since Lookup activity doesnt support more than 5000 Records. I had to create a foreach loop which will iterate based on totalrecords/5000 and inside lookup will fetch first 5000 records then for next ietration it will fetch another 5000 and so on. however i am stuck on how to pass the each lookup activity output array to a variable.

My Pipeline look like this.

{
    "name": "pipeline2",
    "properties": {
        "activities": [
            {
                "name": "GetRowCount_FromMyTable",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": {
                            "value": "SELECT COUNT(*) as TotalCount FROM MyTable",
                            "type": "Expression"
                        },
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "ds_sql_extraction",
                        "type": "DatasetReference"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "ds_sql_extraction",
                        "type": "DatasetReference"
                    }
                ],
                "linkedServiceName": {
                    "referenceName": "MyDatabase",
                    "type": "LinkedServiceReference"
                }
            },
            {
                "name": "IterativeLookup",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetRowCount_OffSetTable",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(0, add(div(activity('GetRowCount_OffSetTable').output.firstRow.TotalCount, 5000), 1))",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "LookupActivity",
                            "type": "Lookup",
                            "dependsOn": [],
                            "policy": {
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [
                                {
                                    "name": "LookupIterations",
                                    "value": "@{item()}"
                                }
                            ],
                            "typeProperties": {
                                "source": {
                                    "type": "AzureSqlSource",
                                    "sqlReaderQuery": {
                                        "value": "SELECT * FROM MyTable ORDER BY OffsetValue OFFSET @{mul(int(item()), 5000)} ROWS FETCH NEXT 5000 ROWS ONLYnn",
                                        "type": "Expression"
                                    },
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None"
                                },
                                "dataset": {
                                    "referenceName": "ds_sql_extraction",
                                    "type": "DatasetReference"
                                },
                                "firstRowOnly": false
                            },
                            "inputs": [
                                {
                                    "referenceName": "ds_sql_extraction",
                                    "type": "DatasetReference"
                                }
                            ]
                        },
                        {
                            "name": "Set variable1",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "LookupActivity",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "LookupArray",
                                "value": {
                                    "value": "@string(item().value)",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "variables": {
            "LookupArray": {
                "type": "Array"
            },
            "AnotherArray": {
                "type": "Array"
            },
            "LookupString": {
                "type": "String"
            },
            "Stringg": {
                "type": "Array"
            },
            "Test": {
                "type": "String"
            },
            "test2": {
                "type": "Array"
            },
            "test1": {
                "type": "String"
            },
            "NewArraySet": {
                "type": "Array"
            }
        },
        "folder": {
            "name": "Data_Extraction"
        },
        "annotations": []
    }
}

how to get the two LookupActivity (Iterated) outputs to one single variable?

2

Answers


  1. Chosen as BEST ANSWER

    Image of Result

    Hi Thank you for your quick revert back. appreciate it. However, it still result in two different arrays rather in one array. res_arr resulted two times for first and second 5000 records. Image attached. How to put these into one array. As i have to use entire array to pass to next foreach copy activity. Or could you suggest a way where i can pass both outputs of array res_arr to next activity one by one?

    Following is new pipeline code based on your suggestions

    { "name": "pipeline2", "properties": { "activities": [ { "name": "GetRowCount_OffSetTable", "type": "Lookup", "dependsOn": [], "policy": { "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "AzureSqlSource", "sqlReaderQuery": { "value": "SELECT COUNT(*) as TotalCount FROM MyOffSetTable", "type": "Expression" }, "queryTimeout": "02:00:00", "partitionOption": "None" }, "dataset": { "referenceName": "ds_sql_extraction", "type": "DatasetReference" } }, "inputs": [ { "referenceName": "ds_sql_extraction", "type": "DatasetReference" } ], "linkedServiceName": { "referenceName": "MyDatabase", "type": "LinkedServiceReference" } }, { "name": "IterativeLookup", "type": "ForEach", "dependsOn": [ { "activity": "GetRowCount_OffSetTable", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "items": { "value": "@range(0, add(div(activity('GetRowCount_OffSetTable').output.firstRow.TotalCount, 5000), 1))", "type": "Expression" }, "activities": [ { "name": "LookupActivity", "type": "Lookup", "dependsOn": [], "policy": { "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [ { "name": "LookupIterations", "value": "@{item()}" } ], "typeProperties": { "source": { "type": "AzureSqlSource", "sqlReaderQuery": { "value": "SELECT * FROM MyOffSetTable ORDER BY OffsetValue OFFSET @{mul(int(item()), 5000)} ROWS FETCH NEXT 5000 ROWS ONLYnn", "type": "Expression" }, "queryTimeout": "02:00:00", "partitionOption": "None" }, "dataset": { "referenceName": "ds_sql_extraction", "type": "DatasetReference" }, "firstRowOnly": false }, "inputs": [ { "referenceName": "ds_sql_extraction", "type": "DatasetReference" } ] }, { "name": "temp_arr", "type": "SetVariable", "dependsOn": [ { "activity": "LookupActivity", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "temp_arr", "value": { "value": "@variables('res_arr')", "type": "Expression" } } }, { "name": "res_arr", "type": "SetVariable", "dependsOn": [ { "activity": "temp_arr", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "variableName": "res_arr", "value": { "value": "@union(variables('temp_arr'),activity('LookupActivity').output.value)", "type": "Expression" } } } ] } } ], "variables": { "temp_arr": { "type": "Array" }, "res_arr": { "type": "Array" } }, "folder": { "name": "M3_Extraction" }, "annotations": [] } }


  2. As you don’t have any duplicates in your table, you can union the lookup output array in each iteration with its previous iteration lookup output array to get the final array.

    Here, for sample instead of 5000 rows, I took 4 rows as limit where my table consists of total 16 rows and my id column is equivalent to your OffsetValue column.

    First, create two array variables temp_arr and res_arr with empty default values like below.

    enter image description here

    I have followed same approach and same queries as yours till the second lookup activity.

    Inside For-Each, after second lookup activity, take a set variable activity for temp_arr and give @variables('res_arr') to it.

    enter image description here

    Next, take another set variable activity for res_arr variable and give the below expression.

    @union(variables('temp_arr'),activity('Lookup2').output.value)
    

    enter image description here

    In ADF, self-referencing variables is not supported, that is the reason why the temp_arr variable was used.

    At the end of the For-loop, the result final array will be stored in the res_arr variable.

    Result:

    enter image description here

    UPDATE:

    Your activities are running parallelly inside For-Each. You need to check the Sequential check box in the For-Each activity to run the activities sequentially.

    enter image description here

    The output array will be stored in the res_arr variable. If you want to access the array after For-Each, you need to use this array.

    My pipeline JSON for your reference:

    Change the datasets name and query and activity names as per your requirement and use it.

    {
        "name": "lookup concat pipeline",
        "properties": {
            "activities": [
                {
                    "name": "Lookup1",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "AzureSqlSource",
                            "sqlReaderQuery": "SELECT COUNT(*) as TotalCount FROM lookup1",
                            "queryTimeout": "02:00:00",
                            "partitionOption": "None"
                        },
                        "dataset": {
                            "referenceName": "lookup_table",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": true
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@range(0, add(div(activity('Lookup1').output.firstRow.TotalCount, 4), 1))",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Lookup2",
                                "type": "Lookup",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "source": {
                                        "type": "AzureSqlSource",
                                        "sqlReaderQuery": {
                                            "value": "SELECT * FROM lookup1 ORDER BY id OFFSET @{mul(int(item()), 4)} ROWS FETCH NEXT 4 ROWS ONLY",
                                            "type": "Expression"
                                        },
                                        "queryTimeout": "02:00:00",
                                        "partitionOption": "None"
                                    },
                                    "dataset": {
                                        "referenceName": "lookup_table",
                                        "type": "DatasetReference"
                                    },
                                    "firstRowOnly": false
                                }
                            },
                            {
                                "name": "temp_arr",
                                "type": "SetVariable",
                                "dependsOn": [
                                    {
                                        "activity": "Lookup2",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "temp_arr",
                                    "value": {
                                        "value": "@variables('res_arr')",
                                        "type": "Expression"
                                    }
                                }
                            },
                            {
                                "name": "union temp and and lookup array",
                                "type": "SetVariable",
                                "dependsOn": [
                                    {
                                        "activity": "temp_arr",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "res_arr",
                                    "value": {
                                        "value": "@union(variables('temp_arr'),activity('Lookup2').output.value)",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                }
            ],
            "variables": {
                "temp_arr": {
                    "type": "Array"
                },
                "res_arr": {
                    "type": "Array"
                }
            },
            "annotations": [],
            "lastPublishTime": "2024-05-29T04:16:16Z"
        },
        "type": "Microsoft.DataFactory/factories/pipelines"
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search