skip to Main Content

What is the better way to insert a very large data into PostgreSQL Table?
OS: Ubuntu 22.04 LTS
DB: PostgreSQL 14
Framework: Python 3.11 Django

For now I am using insert into statement of 100,000 rows at a time. It is taking 2 Minutes for the whole process of inserting average of 1,000,000 rows, which is within my acceptable range. But I want to know if there is any better way to do this.

It was working fine but somehow it is taking more time and sometimes giving errors like

OperationalError: (psycopg2.OperationalError) server closed the connection unexpectedly
from django.db import connection
cursor = connection.cursor()

batch_size = 100000

offset = 0
while True:
    transaction_list_query = SELECT * FROM {source_table} LIMIT {batch_size} OFFSET {offset};  
    cursor.execute(transaction_list_query)
    transaction_list = dictfetchall(cursor)
    if not transaction_list:
        break
    data_to_insert = []
    for transaction in transaction_list:
        # Some Process Intensive Calculations

    insert_query = INSERT INTO {per_transaction_table} ({company_ref_id_id_column}, {rrn_column},{transaction_type_ref_id_id_column}, {transactionamount_column}) VALUES  {",".join(data_to_insert)} ON CONFLICT ({rrn_column}) DO UPDATE SET {company_ref_id_id_column} = EXCLUDED.{company_ref_id_id_column};
    cursor.execute(insert_query)
    offset += batch_size

2

Answers


  1. A faster way could be to use a prepared statement in a session, and then repeatedly exec ing it with a new batch of rows.

    An even faster way would be to use COPY (optionally WITH (FREEZE) on unlogged tables: https://www.postgresql.org/docs/current/sql-copy.html
    and add indexes and constraints later.

    Login or Signup to reply.
  2. I’d recommend using a task queue like Celery to handle large database insertions in Django. Here’s why:

    Prevents Request Timeouts: Long-running database operations within a request context can exceed timeout limits, leading to errors. Task queues enable asynchronous execution, so the request can return immediately while the database task runs in the background.
    Improves Scalability: Task queues distribute work across multiple workers, enhancing performance and handling larger workloads efficiently.
    Provides Monitoring and Retry Mechanisms: Celery offers tools to monitor task execution, retry failed tasks, and handle errors gracefully.
    Here’s a simple example of how to use Celery for this purpose:

    1. Install Celery:

    pip install celery

    2. Configure Celery:

    Set up a Celery broker (e.g., RabbitMQ or Redis) for task communication.
    Define Celery tasks in your Django app.
    3. Create a Celery Task:

    from celery import shared_task
    
    @shared_task
    def process_and_insert_data():
        batch_size = 100000  # Adjust as needed
        offset = 0
    
        while True:
            with connection.cursor() as cursor:
                transaction_list_query = f"SELECT * FROM {source_table} LIMIT {batch_size} OFFSET {offset};"
                cursor.execute(transaction_list_query)
                transaction_list = dictfetchall(cursor)
    
                if not transaction_list:
                    break
    
                data_to_insert = []
                for transaction in transaction_list:
                    # Perform process-intensive calculations here
                    data_to_insert.append(...)
    
                insert_query = f"""
                    INSERT INTO {per_transaction_table} ({company_ref_id_id_column}, {rrn_column},
                        {transaction_type_ref_id_id_column}, {transactionamount_column})
                    VALUES {','.join(['%s'] * len(data_to_insert))}
                    ON CONFLICT ({rrn_column}) DO UPDATE SET {company_ref_id_id_column} = EXCLUDED.{company_ref_id_id_column};
                """
                cursor.executemany(insert_query, data_to_insert)
    
            offset += batch_size
    

    4. Initiate the Task:

    from .tasks import process_and_insert_data
    
    # Within a view or other part of your code:
    process_and_insert_data.delay()
    

    Remember:

    Adjust batch size and other parameters based on your specific needs.
    Consider using Django’s ORM for database interactions within tasks if applicable.
    Explore Celery’s advanced features for monitoring, error handling, and task prioritization.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search