skip to Main Content

I am copying data from ADLS csv file to Azure SQL DB.
File contains 9 columns and mapping is done according to that, but sometimes the source file is only having 6 or 7 columns. In that case the pipeline is failing with error

The name of column index ‘x’ is empty. Make sure column name is properly specified in the header row

Even some columns are missing I want to copy other columns data which is available and ignore the missing columns.
How can I dynamically handle this case and do mapping in case some columns are missing? As the files from source will have different set of columns each time.

2

Answers


  1. You can remove the mapping from the copy activity so it will try to map automatically.

    Note: the column order in csv file should match the column order in table

    Login or Signup to reply.
  2. I created pipeline and performed get metadata activity with Column count field to get the number of columns of my csv file which is in ADLS account by selecting the ADLS dataset. After successful execution of metadata activity connected with lookup activity. selected sql database dataset for lookup and executed below code to find the number of columns of my sql table.

    SELECT  COUNT(COLUMN_NAME) as c
    FROM INFORMATION_SCHEMA.COLUMNS
    WHERE TABLE_CATALOG =  '<database>'  AND TABLE_SCHEMA =  '<schema>'
    AND TABLE_NAME =  '<table>' 
    

    After successful execution of look up connected to if condition and given below expression to compare the columns of meta data activity and lookup activity

    @equals(activity('Get Metadata1').output.columnCount,activity('Lookup1').output.firstRow.c) 
    

    If it is true I implemented copy activity to copy data from adls to sql by following below procedure

    Selected delimited text dataset by selecting csv file from adls linked service as source enable first row as header in dataset

    enter image description here

    and select sql database dataset as sink and imported the schema in mapping of copy activity.
    If it is false I implemented dynamic mapping following below procedure:
    I created table in sql database with below columns

    create  table table_mappings(
    
        sourceFile nvarchar(30),
        sinkTableSchema nvarchar(30),
        sinkTableName nvarchar(30),
        jsonMapping nvarchar(max)
    )
    

    Inserted below values

    insert  into table_mappings values('<sourcefilename>','<schema>','<tablename>','{"type": "TabularTranslator",
    
    "mappings": [
                 {
                    "source": {
                        "name": "EMPLOYEE_ID",
                        "type": "String",
                        "physicalType": "String"
                     },
                    "sink": {
                        "name": "EMPLOYEE_ID",
                        "type": "String",
                        "physicalType": "String"
                    }
                },
                {
                    "source": {
                        "name": "FIRST_NAME",
                        "type": "String",
                        "physicalType": "String"
                     },
                     "sink": {
                        "name": "FIRST_NAME",
                        "type": "String",
                        "physicalType": "String"
                    }
               }
           ]}')    
    

    In jsonMapping column inserted columns of source and columns of table where the values need to store with above format.

    enter image description here

    I implemented lookup activity to by selecting sql database dataset and entered below query

    select * from table_mappings where sourceFile = '<filename>'
    

    enter image description here

    After successful execution of lookup created connected to copy activity. selected adls delimited text as source and sql database dataset as sink and created two parameters in sink named schema with @activity('Lookup1').output.firstRow.sinkTableSchema value and table with @activity('Lookup1').output.firstRow.sinkTableName
    and used those parameters for selecting the table in dataset as @dataset().schema.@dataset().table I added mapping dynamically using @json(activity('Lookup1').output.firstRow.jsonMapping)
    executed the pipeline, it executed successfully.

    enter image description here

    Here is my Json of pipeline:

    {
        "name": "pipeline3",
        "properties": {
            "activities": [
                {
                    "name": "Lookup1",
                    "type": "Lookup",
                    "dependsOn": [
                        {
                            "activity": "Get Metadata1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "AzureSqlSource",
                            "sqlReaderQuery": "SELECT COUNT(COLUMN_NAME) as cnFROM INFORMATION_SCHEMA.COLUMNS nWHERE TABLE_CATALOG = 'db' AND TABLE_SCHEMA = 'dbo'nAND TABLE_NAME = 'employees'  ",
                            "queryTimeout": "02:00:00",
                            "partitionOption": "None"
                        },
                        "dataset": {
                            "referenceName": "AzureSqlTable1",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "Get Metadata1",
                    "type": "GetMetadata",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataset": {
                            "referenceName": "DelimitedText2",
                            "type": "DatasetReference"
                        },
                        "fieldList": [
                            "columnCount"
                        ],
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    }
                },
                {
                    "name": "If Condition1",
                    "type": "IfCondition",
                    "dependsOn": [
                        {
                            "activity": "Lookup1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "expression": {
                            "value": "@equals(activity('Get Metadata1').output.columnCount,activity('Lookup1').output.firstRow.c)",
                            "type": "Expression"
                        },
                        "ifFalseActivities": [
                            {
                                "name": "Copy data1_copy1",
                                "type": "Copy",
                                "dependsOn": [
                                    {
                                        "activity": "Lookup1_copy1",
                                        "dependencyConditions": [
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "source": {
                                        "type": "DelimitedTextSource",
                                        "storeSettings": {
                                            "type": "AzureBlobFSReadSettings",
                                            "recursive": true,
                                            "enablePartitionDiscovery": false
                                        },
                                        "formatSettings": {
                                            "type": "DelimitedTextReadSettings"
                                        }
                                    },
                                    "sink": {
                                        "type": "AzureSqlSink",
                                        "writeBehavior": "insert",
                                        "sqlWriterUseTableLock": false
                                    },
                                    "enableStaging": false,
                                    "translator": {
                                        "value": "@json(activity('Lookup1').output.firstRow.jsonMapping)",
                                        "type": "Expression"
                                    }
                                },
                                "inputs": [
                                    {
                                        "referenceName": "DelimitedText2",
                                        "type": "DatasetReference"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "referenceName": "AzureSqlTable2",
                                        "type": "DatasetReference",
                                        "parameters": {
                                            "schema": {
                                                "value": "@activity('Lookup1').output.firstRow.sinkTableSchema",
                                                "type": "Expression"
                                            },
                                            "table": {
                                                "value": "@activity('Lookup1').output.firstRow.sinkTableName",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ]
                            },
                            {
                                "name": "Lookup1_copy1",
                                "type": "Lookup",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "source": {
                                        "type": "AzureSqlSource",
                                        "sqlReaderQuery": "select * from table_mappings where sourceFile = 'employees.csv'",
                                        "queryTimeout": "02:00:00",
                                        "partitionOption": "None"
                                    },
                                    "dataset": {
                                        "referenceName": "AzureSqlTable1",
                                        "type": "DatasetReference"
                                    }
                                }
                            }
                        ],
                        "ifTrueActivities": [
                            {
                                "name": "Copy data1",
                                "type": "Copy",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "source": {
                                        "type": "DelimitedTextSource",
                                        "storeSettings": {
                                            "type": "AzureBlobFSReadSettings",
                                            "recursive": true,
                                            "enablePartitionDiscovery": false
                                        },
                                        "formatSettings": {
                                            "type": "DelimitedTextReadSettings"
                                        }
                                    },
                                    "sink": {
                                        "type": "AzureSqlSink",
                                        "writeBehavior": "insert",
                                        "sqlWriterUseTableLock": false,
                                        "disableMetricsCollection": false
                                    },
                                    "enableStaging": false,
                                    "translator": {
                                        "type": "TabularTranslator",
                                        "mappings": [
                                            {
                                                "source": {
                                                    "name": "EMPLOYEE_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "EMPLOYEE_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "FIRST_NAME",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "FIRST_NAME",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "LAST_NAME",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "LAST_NAME",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "EMAIL",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "EMAIL",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "PHONE_NUMBER",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "PHONE_NUMBER",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "HIRE_DATE",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "HIRE_DATE",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "JOB_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "JOB_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "SALARY",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "SALARY",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "COMMISSION_PCT",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "COMMISSION_PCT",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "MANAGER_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "MANAGER_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            },
                                            {
                                                "source": {
                                                    "name": "DEPARTMENT_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                },
                                                "sink": {
                                                    "name": "DEPARTMENT_ID",
                                                    "type": "String",
                                                    "physicalType": "String"
                                                }
                                            }
                                        ],
                                        "typeConversion": true,
                                        "typeConversionSettings": {
                                            "allowDataTruncation": true,
                                            "treatBooleanAsNumber": false
                                        }
                                    }
                                },
                                "inputs": [
                                    {
                                        "referenceName": "DelimitedText1",
                                        "type": "DatasetReference"
                                    }
                                ],
                                "outputs": [
                                    {
                                        "referenceName": "AzureSqlTable1",
                                        "type": "DatasetReference"
                                    }
                                ]
                            }
                        ]
                    }
                }
            ],
            "variables": {
                "hello": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }
    

    If there are multiple files then perform above activities in foreach loop.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search