skip to Main Content

I have a .csv file (in Azure Data Lake Storage), which looks approximately like this ->
enter image description here

I want to create a notebook (PySpark (Python)), which could be implemented in the synapse analytics (integrate -> pipeline) in one of the pipelines.

The code in notebook should be able to separate 2nd column in 2 and transform all the rows to GB unit, so that it looks like this:
enter image description here

Could you please help with the PySpark code? As I am beginner in Azure synapse analytics and not sure how to do it

!! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)

Thanks in advance

2

Answers


  1. One way to do is to use the split function:

    Input:

    +----+--------+-----+
    |  ID|Consumed|Total|
    +----+--------+-----+
    |1234|80.14 GB|    x|
    |2345|  670 KB|    y|
    |3456| 9.38 GB|    Z|
    +----+--------+-----+
    

    Code:

    from pyspark.sql.functions import split, col, lit
    from pyspark.sql.types import DoubleType
    
    df = spark.createDataFrame([(1234,'80.14 GB','x',),
                                (2345,'670 KB','y',),
                                (3456,'9.38 GB','Z',)]
                                , ['ID','Consumed','Total']
                              )
    dfUnit = spark.createDataFrame([  (1.0,'GB',),
                                      (0.001,'MB',),
                                      (0.000001,'KB',),]
                                    ,['Scale','Unit']
                                  )
    
    df = (df
          .withColumn("ConsumedSplit",split(col("Consumed"), " "))
          .withColumn("Unit",col("ConsumedSplit")[1])
          .withColumn("Consumed",(col("ConsumedSplit")[0]).cast(DoubleType()))
          .join(dfUnit, on="Unit")
          .withColumn("Consumed",col("Consumed")*col("Scale"))
          .withColumn("Unit",lit("GB"))
          .select("ID", "Consumed", "Unit", "Total")
         )
    df.show()
    

    result:

    +----+--------+----+-----+
    |  ID|Consumed|Unit|Total|
    +----+--------+----+-----+
    |1234|   80.14|  GB|    x|
    |3456|    9.38|  GB|    Z|
    |2345|  6.7E-4|  GB|    y|
    +----+--------+----+-----+
    

    I would not recommend overwriting on the same file. It’s always good practice to separate your stages. You could call the files that you are reading as raw files, e.g. saved in .../Raw/ folder and then write the newly generated files in somewhere like .../Preprocessed/ folder.

    It might be a good idea to save the new file in a binary format, like Parquet, both for compression/fileSize plus you save the datatype of each column in the file itself.

    Login or Signup to reply.
  2. I read csv file form my storage into a dataframe.
    Here is my file:

    enter image description here

    I created new column called ‘Unit’ by splitting consumed column;

    split_cols = pyspark.sql.functions.split(df['Consumed'], ' ')
    
        df = df.withColumn('Unit', split_cols.getItem(1))
        df = df.withColumn('Unit', when(df['Unit'] == 'KB', 'GB').otherwise(df['Unit']))
    

    I converted kb value into GB of consumed value by using below code:

    df =  df.withColumn("Consumed",
                       when(df["Consumed"].contains("GB"),
                            round(regexp_extract(df["Consumed"], r"(d+.?d*)", 1).cast("double"), 2)
                            )
                       .when(df["Consumed"].contains("KB"),
                             round(regexp_extract(df["Consumed"], r"(d+.?d*)", 1).cast("double")/1000000, 5)
                            )
                       .otherwise(0)
                      )
    

    When I try with above one, I am getting 670 kb value as 6.7E-4 i.e., it is converting as scientific notification.

    enter image description here

    So, I formatted it by using below command

    df = df.withColumn("Consumed", when(col("Consumed") == 6.7E-4, format_number(col("Consumed"),5)).otherwise(col("Consumed")))
    

    And selected columns in below format

    df = df.select('ID', 'Consumed', 'Unit', 'Total')
    

    Output:

    enter image description here

    !! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)

    Mount the path using below procedure:

    Create linked service of path and created mount using below code:

    mssparkutils.fs.mount(
    "abfss://<container>@<storageacc>.dfs.core.windows.net",
    "<mount container>",  
    {"linkedService":"AzureDataLakeStorage1"}
    
    )
    

    enter image description here

    jobid=mssparkutils.env.getJobId()
    path='/synfs/'+jobid+'/<mount container>/<filename>.csv'
    

    enter image description here

    I overwrite the updated dataframe into filename using below code:

    df.toPandas().to_csv(path,index = False)
    

    enter image description here

    Updated file:

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search