skip to Main Content

The basic concept of Airflow does not allow triggering a Dag on an irregular interval.
Actually, I want to trigger a dag every time a new file is placed on a remote server (like HTTPS, sftp, s3 …)
But Airflow requires a defined data_interval. Using e.g. HttpSensor works only once during the scheduled time window. In my current example, I am using redis to persist the current file state.

""" DAG for operational District heating """
import json
from datetime import datetime

import redis
import requests
from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
from airflow.providers.http.sensors.http import HttpSensor


def check_up_for_new_file(
        response: requests.models.Response,
) -> bool:
    """ uses redis to check if a new file is on the server"""
    current_header = {
        key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
        for key, value in response.headers.items()
    }

    conn = redis.Redis(host='redis', port=6379)
    recent_header = conn.hgetall("header_dict")

    recent_header = {
        key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
        for key, value in recent_header.items()
    }

    if 'Content-Length' not in recent_header.keys():
        conn.hmset("header_dict", current_header)
        return False

    if recent_header['Content-Length'] != current_header['Content-Length']:
        conn.hmset("header_dict", current_header)
        return True
    else:
        return False


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'concurrency': 6
}

with DAG(
        dag_id='check_ext',
        start_date=datetime(2022, 11, 24),
        tags=['test'],
        catchup=False,
        default_args=default_args,
) as dag:
    check_for_new_file = HttpSensor(
        task_id='check_up_for_new_file',
        http_conn_id='_conn_id',
        endpoint='<some-url>',
        poke_interval=20,
        dag=dag,
        response_check=check_up_for_new_file
    )
    invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
        task_id='run_process_with_external_files',
        function_name='LAMBDA_FUNCTION',
        payload=json.dumps({"source_type": "some stuff"}),
    )
    check_for_new_file >> invoke_lambda_function

How does this dag restart after success to check again for new files?

2

Answers


  1. Chosen as BEST ANSWER

    You have to take care about the following two points to have a Dag that runs everytime a sensor recognize an external event.

    1. schedule_interval: Use the preset None
    2. Use TriggerDagRunOperator

    It is by design to create an infinite loop to check out the external

    """ DAG for operational District heating """
    import json
    from datetime import datetime
    
    import redis
    import requests
    from airflow import DAG
    from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
    from airflow.providers.http.sensors.http import HttpSensor
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    
    
    def check_up_for_new_file(
            response: requests.models.Response,
    ) -> bool:
        """ uses redis to check if a new file is on the server"""
        current_header = {
            key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
            for key, value in response.headers.items()
        }
    
        conn = redis.Redis(host='redis', port=6379)
        recent_header = conn.hgetall("header_dict")
    
        recent_header = {
            key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
            for key, value in recent_header.items()
        }
    
        if 'Content-Length' not in recent_header.keys():
            conn.hmset("header_dict", current_header)
            return False
    
        if recent_header['Content-Length'] != current_header['Content-Length']:
            conn.hmset("header_dict", current_header)
            return True
        else:
            return False
    
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'email': ['[email protected]'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'concurrency': 6
    }
    
    with DAG(
            dag_id='check_ext',
            start_date=datetime(2022, 11, 24),
            tags=['test'],
            catchup=False,
            schedule_interval=None,
            default_args=default_args,
    ) as dag:
        check_for_new_file = HttpSensor(
            task_id='check_up_for_new_file',
            http_conn_id='_conn_id',
            endpoint='<some-url>',
            poke_interval=20,
            dag=dag,
            response_check=check_up_for_new_file
        )
        invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
            task_id='run_process_with_external_files',
            function_name='LAMBDA_FUNCTION_NAME',
            payload=json.dumps({"source_type": "some stuff"}),
        )
        restart_dag = TriggerDagRunOperator(
            task_id='restart_dag',
            trigger_dag_id='check_ext',
            dag=dag
        )
        check_for_new_file >> invoke_lambda_function >> restart_dag
    
    

    For some of you not familiar with HttpSensor. The base path to the server has to be defined as environment variable with AIRFLOW_CONN_{_CONN_ID}=https://remote_server.com . Then you can call the connection by matching the _CONN_ID.


  2. As already mentioned in the question itself, airflow is not an event based triggered system, and it’s main paradigm is to do a pre-scheduled batch processing.
    Nervertheless, it’s definitely achievable, in multiple ways:

    1. As suggested in the answer by @dl.meteo, you can run a sensor (there are many supported, HTTP, FTP, FTPS and etc..) in a endless loop in a pre-defined interval (every 30s, every mintute and such..) and when sensor is fired up (task successfully completes), you can trigger a specific dag (with TriggerDagRunOperator). This approach has a downside – it will hold airflow’s resources forever, and while it can be good for several lone use case, it’s definitely NOT scalalble to use with hundreds of dags.
    2. A better approach IMHO would be:
      a. to use Airflow’s REST API, to trigger a specific dag (external
      event system directly calls airflow’s REST API)
      b. if external event system has a webhook system, use it to PUSH the event to a queue (e.g. kinesis) and calls airflow’s REST
      API (to trigger a specific dag)
      c. in case there is no webhook system, use a lambda (or a light docker container service) to PULL for new events from your external
      event system (on a schduledqpre-defined period of time), and calls airflow’s REST API (to trigger a specific
      dag)

    https://brocktibert.com/post/trigger-airflow-dags-via-the-rest-api/

    Triggering Airflow DAG via API

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search