I am using python to query an external api, transform the data and write it to a postgresql database internally.
In that process, I am comparing the result from the api with existing data in the database using pandas and generate a dataframe that has new records and existing records that have changed in one dataframe.
What I want to do is, to hand over the dataframe or dictionary to sqlalchemy and get it processed in a way that:
- new records are just appended
- existing records are updated
This is how I approached it (I am a newbie in python, so please be patient with my beginner skills…)
def update_absence(year):
api_result = get_absence(year)
db_result = get_database_absence(year)
df = compare_dataframes(api_result, db_result, 'id')
metadata_obj = MetaData()
metadata_obj.reflect(bind=engine)
some_table = Table("tb_absence", metadata_obj, autoload_with=engine)
for item in df.to_dict('records'):
insert_stmt = insert(some_table).values(item).on_conflict_do_update(constraint='tb_absence_pkey', set_=item)
print(insert_stmt.compile())
with engine.connect() as conn:
result = conn.execute(insert_stmt)
print(result.rowcount)
conn.commit()
the output for the insert_stmt.compile() is the following:
INSERT INTO tb_absence (id, start_date, end_date, half_day, morning, user_id, employee_id, type, extra_vacation, state, substitute_state, workdays, hours, medical_certificate, comments, substitute_user_id, name) VALUES (%(id)s, %(start_date)s, %(end_date)s, %(half_day)s, %(morning)s, %(user_id)s, %(employee_id)s, %(type)s, %(extra_vacation)s, %(state)s, %(substitute_state)s, %(workdays)s, %(hours)s, %(medical_certificate)s, %(comments)s, %(substitute_user_id)s, %(name)s) ON CONFLICT ON CONSTRAINT tb_absence_pkey DO UPDATE SET id = %(param_1)s, start_date = %(param_2)s, end_date = %(param_3)s, half_day = %(param_4)s, morning = %(param_5)s, user_id = %(param_6)s, employee_id = %(param_7)s, type = %(param_8)s, extra_vacation = %(param_9)s, state = %(param_10)s, substitute_state = %(param_11)s, workdays = %(param_12)s, hours = %(param_13)s, medical_certificate = %(param_14)s, comments = %(param_15)s, substitute_user_id = %(param_16)s, name = %(param_17)s
and the rowcount is 1 for each item that I iterate through (the print statement which will vanish for a real log entry once I have understood the approach). However, the database is never updated. I can’t really get my head around the conflict_do_update thing and how to handle the connection and engine.
I guess I have some fundamental issues of understanding it and the examples I find on the sqlalchemy tutorial for that part are hard to comprehend for my, as they only give small snippets and fragments of the solution. I would need a complete working example probably. Also reviewing other questions here on SO do did not lead to a good understanding on my side.
All methods for identifying the differences of the datasets are working fine.
I highly appreciate any hint that could help me progress here.
2
Answers
The problem was related to the pandas dataframe comparison operation and the wrong keep parameter (last instead of first) in the drop_duplicates operation.
By looping through
for item in df.to_dict('records'):
you are creating and sending a separate INSERT for each row. For example, with my table …… and DataFrame …
…
engine.echo = True
shows that your code results inWe can turn that into an "executemany" using textual SQL
which produces
Or, SQLAlchemy Core can build the statement for us:
which produces