skip to Main Content

I am running an Airflow job to load data into a table.
The task is :

  • query a database -> get results in pandas data frame -> pass the result set to a worker processes -> each worker process process the rows and load data into a different database.

The following is a simplified version of the DAG file

import process
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator

LOADING = PythonOperator(
            task_id='LOADING',
            python_callable=process,
            op_kwargs={
                'source_DB': MySqlHook(mysql_conn_id='source_DB'),
                'destination_DB': MySqlHook(mysql_conn_id='destination_DB')
            },
            dag=dag,
        )

start >> LOADING >> end

This is the code of the task:

import os
import logging
import billiard as mp

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def process(source_DB, destination_DB):

    get_data = open(f"{CUR_DIR}/path/to/get_data.sql").read()

    data = source_DB.get_pandas_df(
        sql=get_data,
        parameters={}
    )

    with mp.Pool(processes=mp.cpu_count(), initializer=init_worker, initargs=(destination_DB,)) as pool:
        items = [(idx, row) for idx, row in data.iterrows()]
        pool.map(load_data, items)


def init_worker(destination_DB):
    global conn
    conn = destination_DB.get_conn()


def load_data(args):

    index, data = args
    insert_sql = open(
        f"{CUR_DIR}/path/to/insert.sql").read()

    conn.autocommit(True)
    destination_DB_cur = conn.cursor()

    params = {
        'para1': data['para1'],
        'para2': data['para2']
    }
    for word, replacement in params.items():
        insert_sql = insert_sql.replace(
            '{{' + str(word) + '}}', str(replacement))

    try:
        destination_DB_cur.execute(insert_sql)
    except Exception as e:
        print(e)
    destination_DB_cur.close()

The Job is working fine without any error, but I have noticed that sometimes the loaded data is duplicated 3 times.

I did some research and some say it has to do with the billiard library, others say I have to use connection pooling to insure synchronization and coordination.

Can someone please help me understand the issue exactly and what to do to prevent it from happening

2

Answers


  1. Create a primary key or a unique index (constraint) on a table.

    Login or Signup to reply.
    • Data duplication can occur when multiple processes are simultaneously inserting data into the data warehouse without proper synchronization.
    • One possible reason for the duplication issue is that each worker process in the multiprocessing pool is establishing its own connection to the data warehouse and inserting data independently. This can lead to concurrent inserts and potential duplication.
    • try establishing a connection pool
    • Consider using a different multiprocessing library: Although billiard is a fork of the multiprocessing library, it may have some subtle differences. You could try using the standard multiprocessing library to see if it resolves the data duplication issue.
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search