skip to Main Content

I want to write a script for mass importing hundreds of csv files into PostgreSQL with differing columns/formatting

How do you bulk import data that has differing columns/formats into another table on postgresql using a csv file?

For example this code below outlines how to bulk import data into SQL, yet do not know how import data that has different columns.

So how would we modify the code below to accomplish this task? I am new to postgresql so please pardon my stupidity.

import pandas as pd
import psycopg2
from io import StringIO

# Database connection parameters
db_params = {
    "dbname": "your_database",
    "user": "your_username",
    "password": "your_password",
    "host": "your_host",
    "port": "your_port",
}

# File path to your CSV
csv_file = "your_file.csv"
table_name = "your_table_name"

# Connect to PostgreSQL
conn = psycopg2.connect(**db_params)
cur = conn.cursor()

# Step 1: Read CSV into Pandas DataFrame
df = pd.read_csv(csv_file)

# Step 2: Create table dynamically
columns = df.columns
col_str = ", ".join([f'"{col}" TEXT' for col in columns])  # Assuming TEXT for simplicity
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({col_str});"
cur.execute(create_table_query)
conn.commit()

# Step 3: Use COPY to load the data
buffer = StringIO()
df.to_csv(buffer, index=False, header=False)  # Write DataFrame to buffer without header
buffer.seek(0)

# COPY data into the table
copy_query = f"COPY {table_name} ({', '.join(columns)}) FROM STDIN WITH CSV"
cur.copy_expert(copy_query, buffer)
conn.commit()

# Close connections
cur.close()
conn.close()

print(f"Data from {csv_file} has been successfully imported into {table_name}.")

I tried to aggregate all column names and was able to create a file with all column names yet was unsure of how to align the data with the columns.

2

Answers


  1. Chosen as BEST ANSWER

    Okay so here is what I did and it worked!

    import os
    import pandas as pd
    import psycopg2
    from io import StringIO
    
    # Database connection details
    db_params = {
        "dbname": "xxx",
        "user": "xxx",
        "password": "xxx",
        "host": "xxx",
        "port": xxx
    }
    excel_folder_path = "xxx"
    table_name = "xxx"
    
    
    # Connect to PostgreSQL
    conn = psycopg2.connect(**db_params)
    cur = conn.cursor()
    
    # Function to create the table dynamically
    def create_table_from_csv(file_path, table_name):
        df = pd.read_csv(file_path)
        columns = df.columns
        col_defs = []
    
        for col in columns:
            if pd.api.types.is_integer_dtype(df[col]):
                col_defs.append(f'"{col}" INTEGER')
            elif pd.api.types.is_float_dtype(df[col]):
                col_defs.append(f'"{col}" FLOAT')
            elif pd.api.types.is_bool_dtype(df[col]):
                col_defs.append(f'"{col}" BOOLEAN')
            elif pd.api.types.is_datetime64_any_dtype(df[col]):
                col_defs.append(f'"{col}" TIMESTAMP')
            else:
                col_defs.append(f'"{col}" TEXT')
    
        col_str = ", ".join(col_defs)
        create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({col_str});"
        cur.execute(create_table_query)
        conn.commit()
        print(f"Table {table_name} has been created based on {file_path}.")
    
    # Function to alter the table to include missing columns
    def add_missing_columns(df, table_name):
        # Get existing table columns from PostgreSQL
        cur.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}';")
        existing_columns = set(row[0] for row in cur.fetchall())
    
        # Find missing columns
        missing_columns = set(df.columns) - existing_columns
        for col in missing_columns:
            if pd.api.types.is_integer_dtype(df[col]):
                col_type = "INTEGER"
            elif pd.api.types.is_float_dtype(df[col]):
                col_type = "FLOAT"
            elif pd.api.types.is_bool_dtype(df[col]):
                col_type = "BOOLEAN"
            elif pd.api.types.is_datetime64_any_dtype(df[col]):
                col_type = "TIMESTAMP"
            else:
                col_type = "TEXT"
    
            # Add the missing column to the table
            alter_query = f'ALTER TABLE {table_name} ADD COLUMN "{col}" {col_type};'
            cur.execute(alter_query)
            conn.commit()
            print(f"Added column {col} ({col_type}) to table {table_name}.")
    
    # Function to import CSV data into the table
    def import_csv_to_table(file_path, table_name):
        # Read the CSV into a DataFrame
        df = pd.read_csv(file_path)
    
        # Remove extra spaces from column names
        df.columns = df.columns.str.strip()  # Remove extra spaces
    
        # Ensure all table columns are present
        add_missing_columns(df, table_name)
    
        # Align DataFrame columns to match table columns
        cur.execute(f"SELECT column_name FROM information_schema.columns WHERE table_name = '{table_name}';")
        table_columns = [row[0] for row in cur.fetchall()]
        
        # Add any missing columns with NULL values
        for col in table_columns:
            if col not in df.columns:
                df[col] = None  # Add missing columns with NULL values
    
        df = df[table_columns]  # Ensure column order matches the table
    
        # Use COPY to load the data
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False)  # Default is comma-delimited
        buffer.seek(0)
    
        # Construct the COPY query
        columns_str = ", ".join([f'"{col}"' for col in table_columns])
        copy_query = f"COPY {table_name} ({columns_str}) FROM STDIN WITH CSV"
    
        # Debugging: print buffer content and query
        print(f"Buffer content:n{buffer.getvalue()}")
        print(f"Executing query: {copy_query}")
    
        cur.copy_expert(copy_query, buffer)
        conn.commit()
        print(f"Data from {file_path} has been successfully imported into {table_name}.")
    
    
    # Process all CSV files in the folder
    csv_files = [f for f in os.listdir(csv_folder) if f.endswith('.csv')]
    if not csv_files:
        print("No CSV files found in the folder.")
    else:
        # Create the table from the first file
        first_csv = os.path.join(csv_folder, csv_files[0])
        create_table_from_csv(first_csv, table_name)
    
        # Import all files, dynamically adjusting the table as needed
        for filename in csv_files:
            file_path = os.path.join(csv_folder, filename)
            print(f"Processing file: {file_path}")
            import_csv_to_table(file_path, table_name)
    
    # Close connections
    cur.close()
    conn.close()
    
    print("All CSV files have been success")
    

  2. The idea is not bad, but if I understand correctly, you copied the "hundreds of CSV" into one file? This is a bad idea! You have to solve it in an outer iteral to go through the files and create tables with the same file name in a row. The number of columns in the CSV is determined by your program.
    You should build the cycle in such a way that you enter a path where the files are and go through them and create the table for each file and upload the data.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search