skip to Main Content

I m trying to read csv file in databricks using pyspark where it has columns shuffled instead of A ,B, C it will randomly arranged like C,A,B
i tried using map() , it throws error ‘cannot pickle ‘_thread.RLock’ object’

i need to reshuffle the column correctly using pyspark in databricks.
I refered to the example in https://sparkbyexamples.com/pyspark/pyspark-map-transformation/ it is not helping to me because i have dataframe created by reading csv file

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

appName = "AddColumnUsingMap"
spark = SparkSession.Builder().appName(appName).getOrCreate()
parentname='xx'
filename='Test (2)'
todayDate='2022-05-24 1:48:42'
extension='.csv'
filePath="dbfs:/mnt/bronze/landing/x/"+parentname+"/"+"current/"+filename+extension

# Formats
read_format = "csv"
write_format = "csv"
claimdenials_df_raw = (spark
                  .read
                  .format(read_format)
                  .option("multiLine", "true")
                  .option("header", "true")
                  .option("escape", '"')
                  .load(filePath))
display(claimdenials_df_raw)
rdd=spark.sparkContext.parallelize(claimdenials_df_raw)
def func1(x):
    DenialId=x["Id"]
    PatientFirstName=x["First Name"]
    PatientLastName=x[" Last Name"]
    PatientDateOfBirth=x["Date of Birth"]
    PatientId=x["PatientId"]
    
    return (DenialId,PatientFirstName,PatientLastName,PatientDateOfBirth,PatientId)

rdd2=df.rdd.map(lambda x: func1(x))
print(rdd2.collect())

2

Answers


  1. You can transfer rdd to dataframe with correct order.
    I am not sure but you can try with rdd2

    df = rdd2.toDF(["Id", "First Name", "Last Name", "Date of Birth", "PatientId"])
    
    Login or Signup to reply.
  2. The sparkContext.parallelize() helps us to create an RDD from a list of elements or a collection. It is throwing the error TypeError: cannot pickle '_thread.RLock' object because you are trying to create an RDD by directly passing the dataframe claimdenials_df_raw.

    So, you need to modify your code as shown below:

    rdd=spark.sparkContext.parallelize(claimdenials_df_raw.collect()) 
    
    • Here claimdenials_df_raw.collect() returns a list of all the data of your dataframe. You can pass this list to successfully create an RDD.
    rdd = claimdenials_df_raw.rdd 
    
    • Using the above code also gives you an RDD to work with.

    The above code helps to overcome the error. You can use the following modified code directly to get rdd2:

    claimdenials_df_raw = (spark 
                      .read 
                      .format(read_format) 
                      .option("multiLine", "true") 
                      .option("header", "true") 
                      .option("escape", '"') 
                      .load(filePath)) 
    display(claimdenials_df_raw) 
    rdd=spark.sparkContext.parallelize(claimdenials_df_raw.collect()) 
    def func1(x): 
        DenialId=x["Id"] 
        PatientFirstName=x["First Name"] 
        PatientLastName=x[" Last Name"] 
        PatientDateOfBirth=x["Date of Birth"] 
        PatientId=x["PatientId"] 
         
        return (DenialId,PatientFirstName,PatientLastName,PatientDateOfBirth,PatientId) 
     
    rdd2=rdd.map(lambda x: func1(x)) 
    print(rdd2.collect()) 
    

    Additional approach without RDD:

    Instead of using RDD to shuffle the columns you can simply use DataFrame.select() and pass the required order of column names. You can use the following code:

    claimdenials_df_raw_updated = claimdenials_df_raw.select("Id", "First Name", "Last Name", "Date of Birth", "PatientId")
    
    claimdenials_df_raw_updated.show()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search