skip to Main Content

I’m working with a dataset where I need to drop some columns which contain only NULL values. The issue is that the column names are not consistent or similar, and can change with time. I was wondering if there is a way in ADF to drop a column if all instances are NULL without having drifted columns?

I have tried unpivoting, removing rows, then re-pivoting, however after I pivot the data back to its original format, I get the following message:

"This drifted column is not in the source schema and therefore can only be referenced with pattern matching expressions"

The drifted columns don’t seem to join on subsequent join functions. I have also tried setting derived columns with regex column patters to make all the drifted columns explicit, however, the byName() function doesn’t seem to work with the $$ syntax; namely:

toString(byName($$))

Any ideas of how to solve this within Azure Data Factory – Data Flows would be very much appreciated!

2

Answers


  1. If the source column names will change, then you have to use column patterns. When you match columns based on patterns, you can project those into columns using the Select transformation. Use the rule-based mapping option in the Select transformation with true() as the matching expression and $$ as the Name As property like this:

    enter image description here

    Login or Signup to reply.
    • I have used combination of both data factory pipeline activities and dataflow to achieve the requirement.

    • First, I have taken dataflow to output a file. I have added a new column with all values as 1 so that I can use aggregate on all other rows using this new column to group.
      enter image description here

    • I have used collect() to create an array for each of the column where group by is on above created column.

    enter image description here

    • Now create another derived column to replace the array by converting array to string and calculating length. If length is 2 it indicates that column contains all nulls.

    enter image description here

    • Write this dataflow output to a file. The data preview of the sink will be as follows:

    enter image description here

    • Create a dataflow activity to run the above dataflow and pass the following dynamic content in execute pipeline activity to filter out and write data of only required columns.
    @activity('Data flow1').output.runstatus.profile.sink1.total
    

    enter image description here

    • In pipeline2, I have used activities to get columns that are not entirely nulls, create a dynamic schema and then use this schema as mapping and write to a file only the required columns.

    • First, I have read the file written at the end of dataflow without header (even though the file has header). The dataset looks as shown below:

    enter image description here

    • You can directly use the following pipeline JSON to build the pipeline:
    {
        "name": "pipeline2",
        "properties": {
            "activities": [
                {
                    "name": "Lookup1",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobFSReadSettings",
                                "recursive": true,
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "dataset": {
                            "referenceName": "cols",
                            "type": "DatasetReference"
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "ForEach1",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@range(0,pipeline().parameters.count_of_rows)",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Append variable1",
                                "type": "AppendVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "props",
                                    "value": {
                                        "value": "Prop_@{item()}",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "ForEach2",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "ForEach1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@variables('props')",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Append variable2",
                                "type": "AppendVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "req_cols",
                                    "value": {
                                        "value": "@if(and(not(equals(activity('Lookup1').output.value[0][item()],'tp')),not(equals(activity('Lookup1').output.value[1][item()],'2'))),activity('Lookup1').output.value[0][item()],'')",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Filter1",
                    "type": "Filter",
                    "dependsOn": [
                        {
                            "activity": "ForEach2",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@variables('req_cols')",
                            "type": "Expression"
                        },
                        "condition": {
                            "value": "@not(equals(item(),''))",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "ForEach3",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Filter1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Filter1').output.Value",
                            "type": "Expression"
                        },
                        "isSequential": true,
                        "activities": [
                            {
                                "name": "Append variable3",
                                "type": "AppendVariable",
                                "dependsOn": [],
                                "userProperties": [],
                                "typeProperties": {
                                    "variableName": "mapping",
                                    "value": {
                                        "value": "@json(concat('{"source":{"name":"',item(),'"},"sink":{"name":"',item(),'"}}'))",
                                        "type": "Expression"
                                    }
                                }
                            }
                        ]
                    }
                },
                {
                    "name": "Set variable1",
                    "type": "SetVariable",
                    "dependsOn": [
                        {
                            "activity": "ForEach3",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "dynamic_schema",
                        "value": {
                            "value": "@concat('{"type":"TabularTranslator","mappings":',string(variables('mapping')),'}}')",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "Copy data1",
                    "type": "Copy",
                    "dependsOn": [
                        {
                            "activity": "Set variable1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobFSReadSettings",
                                "recursive": true,
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "sink": {
                            "type": "DelimitedTextSink",
                            "storeSettings": {
                                "type": "AzureBlobFSWriteSettings"
                            },
                            "formatSettings": {
                                "type": "DelimitedTextWriteSettings",
                                "quoteAllText": true,
                                "fileExtension": ".txt"
                            }
                        },
                        "enableStaging": false,
                        "translator": {
                            "value": "@json(variables('dynamic_schema'))",
                            "type": "Expression"
                        }
                    },
                    "inputs": [
                        {
                            "referenceName": "csv1",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "req_file",
                            "type": "DatasetReference"
                        }
                    ]
                }
            ],
            "parameters": {
                "count_of_rows": {
                    "type": "int"
                }
            },
            "variables": {
                "props": {
                    "type": "Array"
                },
                "req_cols": {
                    "type": "Array"
                },
                "test": {
                    "type": "String"
                },
                "mapping": {
                    "type": "Array"
                },
                "dynamic_schema": {
                    "type": "String"
                }
            },
            "annotations": []
        }
    }
    

    enter image description here

    NOTE: In the copy data activity, the source is the original file.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search