skip to Main Content

Problem: The start date of my DAG is not being set properly, can anyone tell me why?
Here is sample code:

default_args = {
    "owner": "hello",
    "email_on_failure": "false",
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
    "start_date": datetime(2022, 7, 20),
    "catchup": True,
    "schedule_interval": "@weekly",
}


def dummy_function():
    # just some test function, ignore
    file_name = str(datetime.today()) + "_dummy.csv"
    with open(file_name, "w") as f:
        pass


def trigger_extractor_lambda(ds, **kwargs):

    logging.info(ds)
    logging.info(date.fromisoformat(ds))
    # further code ...

with DAG("ufc-main-dag", default_args=default_args) as dag:
    dummy_task = PythonOperator(
        task_id="dummy_task", python_callable=dummy_function, dag=dag
    )
    # lambda pulls raw data into S3
    extractor_task = PythonOperator(
        task_id="extractor_task",
        python_callable=trigger_extractor_lambda,
        provide_context=True,
        dag=dag,
    )

dummy_task >> extractor_task

The logging of the ds shows the current date yet i explicitely set the start date to be in july. What am I missing? I am using MWAA fwiw.
Thanks in advance.

3

Answers


  1. Chosen as BEST ANSWER

    The solution was first to run using "backfill" not catchup, as there were previous dag runs that prevented airflow from seeing that there were more missing tasks.


  2. The start_date parameter does not specify the first date of a DAG run. Instead, it defines the beginning of a DAG’s data interval (along with end_date and schedule_interval).

    From Airflow documentation:

    Similarly, since the start_date argument for the DAG and its tasks points to the same logical date, it marks the start of the DAG’s first data interval, not when tasks in the DAG will start running. In other words, a DAG run will only be scheduled one interval after start_date.

    Reference: Data Interval (Airflow)

    Since you have catchup=True and schedule_interval=@weekly, you’ll have to set start_date=2022-07-13 (one week before 2022-07-20) if you want to have your DAG beginning running from 2022-07-20. With this configuration, the Airflow scheduler will schedule DAG runs for 2022-07-20 through the latest completed data interval.

    Reference: Catchup (Airflow)

    Login or Signup to reply.
  3. It’s because you’re setting DAG-level settings in default_args. You need to set start_date and schedule in the DAG definition itself.

    default_args is a set of args that gets passed to each Airflow operator NOT the DAG itself. –See the base definition here. You can also see the example in their base tutorial where the common DAG-level vars are set in the DAG defintion.

    I’m not sure exactly how Airflow behaves if you pass it args the way you have in your sample code but the correct way to do it would be:

    default_args = {
       "retries": 0 # for each Airflow task
       # any other task args to pass
    }
    with DAG(
        start_date = pendulum.datetime(2022, 7, 20, tz_info=UTC),
        schedule = "cron string",
        catchup = True,
        default_args = default_args
    ) as dag:
       # DAG things here
       # All your tasks will inherit default_args unless you explicitly set them
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search