I’m running following script in MWAA and my local env.
from airflow import DAG, XComArg
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.models.connection import Connection
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup
# from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from airflow.decorators import dag, task
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
import json
import requests
import time
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta
import json
default_args = {'start_date': datetime(2023, 3, 21),
'retries': 1,
'retry_delay': timedelta(seconds=20),
'email_on_failure': False}
with DAG(
default_args=default_args,
dag_id="load_dag",
schedule_interval="5 */2 * * 1-5",
start_date=datetime(2023, 3, 21),
tags=['load'],
description='DAG to run datawarehouse.load',
default_view='graph', #grid, graph, duration, gantt, landing_times
on_failure_callback = None,
on_success_callback = None,
catchup=False,
# max_active_runs = 1,
# concurrency=5
) as dag:
person_duplicate_check = RedshiftSQLOperator(
task_id = 'person_duplicate_check',
sql="""
with x as (
select
contact_id,
count(1) as rows_count
from person
where contact_id is not null
group by contact_id
)
select
count(1) as id_count,
count(case when rows_count > 1 then 1 end) as duplicate_id_count,
sum(rows_count) as total_row_count,
sum(case when rows_count > 1 then rows_count else 0 end) as duplicate_row_count
from x;
""",
on_failure_callback = None,
)
In my local I get Xcom value which looks like [[99796, 0, 99796, 0]]. However, when I run the same DAG in MWAA Airflow I get empty XCom. I have checked my Redshift Connection and it is working.
Am I missing something?
2
Answers
This is how I did it on my end finally.
I had a look at this on my MWAA 2.4.3 and 2.2.2 environments running amazon-providers 6.0.0 and 2.4.0.
This is the version of the DAG I used (I was using the sample data you can optionally install when setting up a Redshift cluster)
I found that on the older provider it did not generate any xcom output, but in the newer provider it generated xcom output. That said, I could not get it to output the results of the query, only the Redshift query ID.
Are you using mwaa-local-runner and have you checked the provider package versions to see what you are using?
update – 5th March
I upgraded my mwaa 2.4.3 environment to have newer providers. Here is my requirements.txt file
After my mwaa environment restarts and I check the provider versions.
And when I re-run the same DAG above, I now get what you are expecting.
Hope this helps.