skip to Main Content

I’m running following script in MWAA and my local env.

from airflow import DAG, XComArg
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.models.connection import Connection
from airflow.operators.dummy_operator import DummyOperator 
from airflow.utils.task_group import TaskGroup
# from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
import json
import requests
import time
from airflow.hooks.base_hook import BaseHook

from datetime import datetime, timedelta
import json

default_args = {'start_date': datetime(2023, 3, 21),
                'retries': 1,
                'retry_delay': timedelta(seconds=20),
                'email_on_failure': False}

with DAG(
    default_args=default_args,
    dag_id="load_dag", 
    schedule_interval="5 */2 * * 1-5",
    start_date=datetime(2023, 3, 21), 
    tags=['load'],
    description='DAG to run datawarehouse.load',
    default_view='graph', #grid, graph, duration, gantt, landing_times
    on_failure_callback = None,
    on_success_callback = None,
    catchup=False,
    # max_active_runs = 1,
    # concurrency=5
    ) as dag:
    person_duplicate_check = RedshiftSQLOperator(
        task_id = 'person_duplicate_check',
        sql="""
            with x as (
                select
                    contact_id,
                    count(1) as rows_count
                from person
                where contact_id is not null
                group by contact_id
            )
            select
                count(1) as id_count,
                count(case when rows_count > 1 then 1 end) as duplicate_id_count,
                sum(rows_count) as total_row_count,
                sum(case when rows_count > 1 then rows_count else 0 end) as duplicate_row_count
            from x;
        """,
        on_failure_callback = None,
        )

In my local I get Xcom value which looks like [[99796, 0, 99796, 0]]. However, when I run the same DAG in MWAA Airflow I get empty XCom. I have checked my Redshift Connection and it is working.

Am I missing something?

enter image description here
enter image description here

2

Answers


  1. Chosen as BEST ANSWER

    This is how I did it on my end finally.

    def table_duplication_call(tbl_name,sql):
        # print('Start of function', context)
        slack_token = BaseHook.get_connection('slack_default').password
        url = 'https://slack.com/api/chat.postMessage'
    #     print(url)
        headers = {
            'Authorization': f'Bearer {slack_token}',
            'Content-Type': 'application/json'
        }
        
        data = {'channel': '#your_channel_name', 
                'mrkdwn':True,
                'text': f""":red_circle: Duplicates in {tbl_name} Table.
                        ```{sql}```
                    """}
        response = requests.post(url, headers=headers, json=data)
        return response.json()
        response.raise_for_status()
        print(response.json())
    
    
    default_args = {'start_date': datetime(2023, 3, 2),
                    'retries': 1,
                    'retry_delay': timedelta(seconds=20),
                    'email_on_failure': False}
    
    with DAG(
        dag_id="load_dag", 
        start_date=datetime(2023, 3, 2), 
        schedule_interval="15 */4 * * *", 
        tags=['some_tag'],
        default_args=default_args,
        catchup = False,
        description='Some description',
        default_view='graph', #grid, graph, duration, gantt, landing_times
        on_failure_callback=None,
        on_success_callback=None,
        # on_failure_callback=send_email
        ) as dag:
    
    
        start = DummyOperator(
            task_id = 'start',
            dag=dag,
        )
        
        end = DummyOperator(
            task_id = 'end',
            dag=dag
        )
    
    
    tbl_ls = ['table1','table2','table3']
    
    
    def duplication_check_func():
            hook = PostgresHook(postgres_conn_id='redshift_default')
            for tbl in tbl_ls:
                sql = f"""
                                        select
                                                {tbl}.{tbl}_id duplicated_id,
                                                count({tbl}.{tbl}_id ) as cnt
                                            from
                                                datawarehouse.{tbl} 
                                            group by
                                                {tbl}_id 
                                            having
                                                cnt > 1;
                                        """
                results = hook.get_records(sql)
                if len(results)>0:
                    table_duplication_call(tbl_name={tbl},sql=sql)
                else:
                    None
            # return results
            
    
    db_duplication_notification = PythonOperator(
    task_id='db_duplication_notification',
    python_callable=duplication_check_func,
    dag=dag
    )
    
    start >> db_duplication_notification >> end
    

  2. I had a look at this on my MWAA 2.4.3 and 2.2.2 environments running amazon-providers 6.0.0 and 2.4.0.

    This is the version of the DAG I used (I was using the sample data you can optionally install when setting up a Redshift cluster)

    from datetime import datetime
    from airflow.utils.dates import days_ago
    from airflow import DAG
    from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
    from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
    
    POLL_INTERVAL = 10
    DAG_ID = 'example_redshift'
    DB_LOGIN = 'awsuser'
    DB_NAME = 'dev'
    CLUSTER = "mwaa-redshift"
    
    with DAG(
        dag_id=DAG_ID, 
        schedule_interval=None,
        start_date=days_ago(1), 
        tags=['load'],
        description='DAG to run datawarehouse.load',
        default_view='graph', #grid, graph, duration, gantt, landing_times
        catchup=False
        ) as dag:
        run_query = RedshiftDataOperator(
            task_id='create_table_redshift_data',
            cluster_identifier=CLUSTER,
            database=DB_NAME,
            db_user=DB_LOGIN,
            sql="""
                            with x as (
                    select
                        sellerid,
                        count(1) as rows_count
                    from listing
                    where sellerid is not null
                    group by sellerid
                )
                select
                    count(1) as id_count,
                    count(case when rows_count > 1 then 1 end) as duplicate_id_count,
                    sum(rows_count) as total_row_count,
                    sum(case when rows_count > 1 then rows_count else 0 end) as duplicate_row_count
                from x;
            """,
            poll_interval=POLL_INTERVAL,
            await_result=True,
        )
    

    I found that on the older provider it did not generate any xcom output, but in the newer provider it generated xcom output. That said, I could not get it to output the results of the query, only the Redshift query ID.

    Are you using mwaa-local-runner and have you checked the provider package versions to see what you are using?

    update – 5th March

    I upgraded my mwaa 2.4.3 environment to have newer providers. Here is my requirements.txt file

    --constraint "/usr/local/airflow/dags/updated-constraints.txt"
    apache-airflow-providers-amazon==7.3.0
    apache-airflow-providers-mysql==3.2.1
    apache-airflow-providers-common-sql==1.3.4
    

    After my mwaa environment restarts and I check the provider versions.

    enter image description here

    And when I re-run the same DAG above, I now get what you are expecting.

    enter image description here

    Hope this helps.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search