skip to Main Content

I want to store a datframe from a parquet file into a PostgreSQL using Polars using this code:

def store_in_postgresql(df):
    password = 'anon'
    username = 'postgres'
    database = 'nyc_taxis'
    uri = f'postgresql://{username}:{password}@localhost:5432/{database}'
    engine = create_engine(uri)
    common_sql_state = "SQLSTATE: 42P07"
    
    try:
        df.write_database(table_name, connection=uri,
                          engine='adbc', if_exists='replace')
        print('loading has been completed!')
    except Exception as e:
        if(common_sql_state in str(e)):
            df.write_database(table_name, connection=uri,
                          engine='adbc', if_exists='append')
            print('loading has been completed!')
        else:
            print(e)

but I’m getting this error:

INVALID_ARGUMENT: [libpq] Failed to execute COPY statement: PGRES_FATAL_ERROR ERROR:  COPY file signature not recognized
. SQLSTATE: 22P04

the code store dataframes with small sizes like 4 million rows (200 mb) but when I want to store a big dataframe with 18 million rows (500 mb) I get the error above, is there a way to fix the code or perhaps slice the dataframe to have it stored in the database? thank you in advance.

2

Answers


  1. Chosen as BEST ANSWER

    I think I've figured it out, it was a memory problem. since the function works for smaller dataframes and not big ones I made a new function to split the big dataframe into smaller chunks and then they are processed by the function above:

    def store_by_chunking(df, num_of_chunks):
    try:
        length = len(df) // num_of_chunks
        j = 0
        list_of_chunks = []
        
        for i in range(num_of_chunks):
            list_of_chunks.append(j)
            j += length
        list_of_chunks.append(len(df))
        
        print(list_of_chunks)
        
        for i in range(num_of_chunks):
            sliced_df = df.slice(list_of_chunks[i], list_of_chunks[i+1])
            store_in_postgresql(sliced_df)
    except Exception as e:
        print(e)
    

    this code has not given me an error so far, hope it helps people with the same problem.


  2. An elegant way to achieve this (at least for me):

        batch_size = 10000
        for i in range(0, df.height, batch_size):
            sub_df = df[i:i+batch_size]
            sub_df.write_database(table_name=table_name,
                connection=connection_string,
                engine='adbc',
                if_exists='replace')
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search