skip to Main Content

I am coping data from blob storage to SQL database. In source I have multiple files with different schema so when schema doesn’t match to destination table it fails copy data for that file.

I want pipeline to be succeeded even though one iteration of for each activity failed.

Please help me to resolve the issue.

2

Answers


  1. Can you predict how the schema of the file differs when it fails? Suppose that, if the file is the correct schema, it will certainly have Column A; and if the file is not the correct schema, it will not have A. You can add steps prior to the copy so that you only try to copy if A is present. This is in fact what I am doing in a similar situation.

    You can check for the presence of a any given column name by adding a Get Metadata activity to the For Each, then one of the Conditional activities with an expression that looks for that column and if true, does copy; if false, does nothing.

    My method is a bit more complicated, I have this logic in a pipeline that I then call from other pipelines. There is an extra ForEach for this reason. You could skip that. The use of the daisy-chained parameters so that this can be called from other pipelines to change the column name to search for makes it a bit less flexible, I don’t recommend trying to get it working the first time like this. But this will at least show the general idea.

    enter image description here

    {
        "name": "If Column Found then Proceed",
        "properties": {
            "activities": [
                {
                    "name": "Get Blob File Metadata",
                    "type": "GetMetadata",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataset": {
                            "referenceName": "BlobDelimWithColumnHeaders",
                            "type": "DatasetReference",
                            "parameters": {
                                "container": {
                                    "value": "@pipeline().parameters.container",
                                    "type": "Expression"
                                },
                                "directory": {
                                    "value": "@pipeline().parameters.Directory",
                                    "type": "Expression"
                                },
                                "blob_file": {
                                    "value": "@pipeline().parameters.blobfile",
                                    "type": "Expression"
                                },
                                "delimiter": {
                                    "value": "@pipeline().parameters.delimiter",
                                    "type": "Expression"
                                },
                                "dataset_env": {
                                    "value": "@pipeline().globalParameters.adf_environment",
                                    "type": "Expression"
                                }
                            }
                        },
                        "fieldList": [
                            "structure"
                        ],
                        "storeSettings": {
                            "type": "AzureBlobStorageReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    }
                },
                {
                    "name": "Check each column name",
                    "description": "I hate that we have to loop over each column and can't use an array function but the "structure" is an array of JSON values and there seems to be no great way to only get the names out of the parent array into a new array.nnWhy can't we do something like this instead?n@contains(activity('Get Metadata File').output.structure.name, pipeline().parameters.ColumnName)nIt would make the pipeline 2 steps.nnIf any column matches the success variable gets set to TRUE which will cause the next activity to not fail",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Get Blob File Metadata",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Get Blob File Metadata').output.structure",
                            "type": "Expression"
                        },
                        "isSequential": false,
                        "activities": [
                            {
                                "name": "Check If Column Matches",
                                "type": "IfCondition",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "expression": {
                                        "value": "@equals(item().name, pipeline().parameters.ColumnName)",
                                        "type": "Expression"
                                    },
                                    "ifTrueActivities": [
                                        {
                                            "name": "Success",
                                            "description": "The column name was found!",
                                            "type": "SetVariable",
                                            "dependsOn": [],
                                            "policy": {
                                                "secureOutput": false,
                                                "secureInput": false
                                            },
                                            "userProperties": [],
                                            "typeProperties": {
                                                "variableName": "Success",
                                                "value": true
                                            }
                                        }
                                    ]
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Determine Success or Fail",
                    "description": "If desired column wasn't found in any of the columns, a fail activity is triggered here",
                    "type": "IfCondition",
                    "dependsOn": [
                        {
                            "activity": "Check each column name",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "expression": {
                            "value": "@variables('Success')",
                            "type": "Expression"
                        },
                        "ifFalseActivities": [
                            {
                                "name": "Exit pipeline",
                                "type": "Wait",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "waitTimeInSeconds": 1
                                }
                            }
                        ]
                    }
                }
            ],
            "parameters": {
                "container": {
                    "type": "string"
                },
                "Directory": {
                    "type": "string"
                },
                "blobfile": {
                    "type": "string"
                },
                "ColumnName": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                }
            },
            "variables": {
                "Success": {
                    "type": "Boolean",
                    "defaultValue": false
                }
            },
            "folder": {
                "name": "Utilities"
            },
            "annotations": [],
            "lastPublishTime": "2023-09-26T17:42:24Z"
        },
        "type": "Microsoft.DataFactory/factories/pipelines"
    }
    
    Login or Signup to reply.
  2. To make the pipeline succeed even after failure of single iteration, please handle a fail flow :
    enter image description here

    In this case, even if any iteration fails, the pipeline is in success state.

    Sample code :
    
    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@pipeline().parameters.test",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Set variable1",
                                "type": "SetVariable",
                                "dependsOn": [],
                                "policy": {
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "Vt",
                                    "value": {
                                        "value": "@int(item())",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Wait1",
                    "type": "Wait",
                    "dependsOn": [
                        {
                            "activity": "ForEach1",
                            "dependencyConditions": [
                                "Failed"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "waitTimeInSeconds": 1
                    }
                }
            ],
            "parameters": {
                "test": {
                    "type": "array",
                    "defaultValue": [
                        "1",
                        "a",
                        "2"
                    ]
                }
            },
            "variables": {
                "Vt": {
                    "type": "Integer"
                }
            },
            "annotations": []
        }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search