Problem: The start date of my DAG is not being set properly, can anyone tell me why?
Here is sample code:
default_args = {
"owner": "hello",
"email_on_failure": "false",
"retries": 1,
"retry_delay": timedelta(minutes=1),
"start_date": datetime(2022, 7, 20),
"catchup": True,
"schedule_interval": "@weekly",
}
def dummy_function():
# just some test function, ignore
file_name = str(datetime.today()) + "_dummy.csv"
with open(file_name, "w") as f:
pass
def trigger_extractor_lambda(ds, **kwargs):
logging.info(ds)
logging.info(date.fromisoformat(ds))
# further code ...
with DAG("ufc-main-dag", default_args=default_args) as dag:
dummy_task = PythonOperator(
task_id="dummy_task", python_callable=dummy_function, dag=dag
)
# lambda pulls raw data into S3
extractor_task = PythonOperator(
task_id="extractor_task",
python_callable=trigger_extractor_lambda,
provide_context=True,
dag=dag,
)
dummy_task >> extractor_task
The logging of the ds shows the current date yet i explicitely set the start date to be in july. What am I missing? I am using MWAA fwiw.
Thanks in advance.
3
Answers
The solution was first to run using "backfill" not catchup, as there were previous dag runs that prevented airflow from seeing that there were more missing tasks.
The
start_date
parameter does not specify the first date of a DAG run. Instead, it defines the beginning of a DAG’s data interval (along withend_date
andschedule_interval
).From Airflow documentation:
Reference: Data Interval (Airflow)
Since you have
catchup=True
andschedule_interval=@weekly
, you’ll have to setstart_date=2022-07-13
(one week before2022-07-20
) if you want to have your DAG beginning running from2022-07-20
. With this configuration, the Airflow scheduler will schedule DAG runs for2022-07-20
through the latest completed data interval.Reference: Catchup (Airflow)
It’s because you’re setting DAG-level settings in
default_args
. You need to setstart_date
andschedule
in the DAG definition itself.default_args
is a set of args that gets passed to each Airflow operator NOT the DAG itself. –See the base definition here. You can also see the example in their base tutorial where the common DAG-level vars are set in the DAG defintion.I’m not sure exactly how Airflow behaves if you pass it args the way you have in your sample code but the correct way to do it would be: