skip to Main Content

In Pyspark, I have 2 dataframe. 1st dataframe say df1 is empty dataframe created from a schema. 2nd dataframe df2 is non-empty dataframe filled from a csv file.
Now I want to merge such that all below scenarios are covered.

  1. If both dataframes contain same number if columns, merge them.
  2. If 2nd dataframe contains additional columns than drop those columns
  3. If 2nd dataframe conatains lesser columns then populate those column with null values.

I tried iterating through the empty dataframe columns’ field property.
schema = some schema of few column

for field in scehma.fields:
    if field.name in df2.columns:
       final_df = df1.withColumn(field.name, df2[field.name].cast(field.dataType))

2

Answers


  1. Basically, you have an input dataframe with columns A, B and C and a reference schema with columns A, B, D. And you want your output dataframe to match this "reference" schema by dropping column C and adding a "NULL" column D.

    Assuming df_ref is your reference dataframe and df is your csv file.

    from pyspark.sql import functions as F
    
    
    # Adding missing columns
    for col in df_ref.columns: 
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None))
    
    # select only ref's columns
    df.select(df_ref.columns)
    
    Login or Signup to reply.
  2. You can achieve the desired merge using the join operation in PySpark. Here’s an example code snippet that covers all the scenarios you mentioned:

    from pyspark.sql.functions import lit
    
    # Check if both dataframes have the same number of columns
    if len(df1.columns) == len(df2.columns):
        # Merge the dataframes using a join operation
        final_df = df1.join(df2, on=df1.columns, how='inner')
    
    # Check if the 2nd dataframe has additional columns
    elif len(df1.columns) < len(df2.columns):
        # Drop the additional columns from the 2nd dataframe
        common_columns = set(df1.columns).intersection(df2.columns)
        final_df = df2.select(*common_columns)
    
    # Check if the 2nd dataframe has fewer columns
    elif len(df1.columns) > len(df2.columns):
        # Add null columns to the 2nd dataframe for the missing columns
        missing_columns = set(df1.columns).difference(df2.columns)
        for column in missing_columns:
            df2 = df2.withColumn(column, lit(None))
    
        final_df = df1.join(df2, on=df1.columns, how='inner')
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search