skip to Main Content

I currently have a script that fires a request to an API endpoint which returns a csv.gzip file – which roughly contains 75,000 rows and 15 columns. I download this files to the webserver disk storage, unzip the file to .csv and then loop through every row and add the data into my database. Finally deleting the files saved to disk. The process currently takes between 5 and 10 minutes to complete.

I’m sure there is areas of improvement but not sure how to implement them. Some of them is:

  1. Save csv data to variable rather than on disk.
  2. Bulk import the data into my database.

I’m sure there are other improvements to be made, so any advice would be appreciated.

response = oauth.get(realm)

content = ET.fromstring(response.content)
coded_string = (content.find('.//pricefile'))
decoded_string = base64.b64decode(coded_string.text)
with open('test.csv.gzip', 'wb') as f:
    f.write(decoded_string)
    with gzip.open('test.csv.gzip', 'rb') as f_in:
        with open('test.csv', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

with open('test.csv') as f:
    reader = csv.reader(f)
    next(reader)
    for row in reader:
        pricing.objects.update_or_create(
            product_id=row[0],
            date=datetime.now(),

            defaults={
                'average_price': Decimal(row[1] or 0),
                'low_price': Decimal(row[2] or 0),
                'high_price': Decimal(row[3] or 0),
                ...
            })

os.remove('test.csv')
os.remove('test.csv.gzip')

2

Answers


  1. You should probably use bulk_create. Something like:

    pricing.objects.bulk_create(
        [
            pricing(
                product_id=row[0],
                date=datetime.now(),
                average_price=Decimal(row[1] or 0),
                …
            ),
            for row in reader
        ],
        update_conflicts=True,
        update_fields=["date", "average_price", …],
        unique_fields=["product_id"]
    )
    
    Login or Signup to reply.
  2. An example using psycopg2 directly. names.csv is derived from the US Social Security names database and has 260000 lines by four columns.

    import psycopg2
    import csv
    import time
    
    con = psycopg2.connect(dbname="test", host='localhost', user='postgres', port=5432) 
    cur = con.cursor()   
    
    with open("/home/aklaver/software_projects/test_data/data_sets/names/names.csv", newline="") as csv_file:
       t1 = time.time()
       cur.execute('create temp table names_csv_tmp(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')
       cur.execute('create table if not exists names_csv(name varchar, rank integer, gender varchar, year integer, PRIMARY KEY(name, gender,year))')  
       cur.copy_expert("COPY names_csv_tmp FROM STDIN WITH CSV HEADER", csv_file)
       cur.execute("insert into names_csv select * from names_csv_tmp ON CONFLICT (name,gender, year) DO UPDATE SET name = EXCLUDED.name, gender=EXCLUDED.gender")
       cur.execute("drop table names_csv_tmp")
       con.commit()
       t2 = time.time()
       print(t2 - t1)
    

    For first run where the table names_csv was empty and the INSERT was just inserting all the the records from names_csv_tmp the timing was 3.66 seconds.

    For the second run where the ON CONFLICT triggered updates the run time was 7.29 seconds.

    UPDATE

    To clarify ON CONFLICT relies, per ON CONFLICT, on there being a "..unique violation or exclusion constraint violation error." that triggers the alternate ON CONFLICT action to the original INSERT action.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search