I am running an Airflow job to load data into a table.
The task is :
- query a database -> get results in pandas data frame -> pass the result set to a worker processes -> each worker process process the rows and load data into a different database.
The following is a simplified version of the DAG file
import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator
LOADING = PythonOperator(
task_id='LOADING',
python_callable=process,
op_kwargs={
'source_DB': MySqlHook(mysql_conn_id='source_DB'),
'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
},
dag=dag,
)
start >> LOADING >> end
This is the code of the task:
import os
import logging
import billiard as mp
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
def process(source_DB, destination_DB):
get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()
data = source_DB.get_pandas_df(
sql=get_data,
parameters={}
)
with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
items = [(idx, row) for idx, row in data.iterrows()]
pool.map(load_data, items)
def init_worker(destination_DB):
global conn
conn = destination_DB.get_conn()
def load_data(args):
index, data = args
insert_sql = open(
f"{CUR_DIR}/path/to/insert.sql").read()
conn.autocommit(True)
destination_DB_cur = conn.cursor()
params = {
'para1': data['para1'],
'para2': data['para2']
}
for word, replacement in params.items():
insert_sql = insert_sql.replace(
'{{' + str(word) + '}}', str(replacement))
try:
destination_DB_cur.execute(insert_sql)
except Exception as e:
print(e)
destination_DB_cur.close()
The Job is working fine without any error, but I have noticed that sometimes the loaded data is duplicated 3 times.
I did some research and some say it has to do with the billiard library, others say I have to use connection pooling to insure synchronization and coordination.
Can someone please help me understand the issue exactly and what to do to prevent it from happening
2
Answers
Create a primary key or a unique index (constraint) on a table.