The basic concept of Airflow does not allow triggering a Dag on an irregular interval.
Actually, I want to trigger a dag every time a new file is placed on a remote server (like HTTPS, sftp, s3 …)
But Airflow requires a defined data_interval. Using e.g. HttpSensor works only once during the scheduled time window. In my current example, I am using redis to persist the current file state.
""" DAG for operational District heating """
import json
from datetime import datetime
import redis
import requests
from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
from airflow.providers.http.sensors.http import HttpSensor
def check_up_for_new_file(
response: requests.models.Response,
) -> bool:
""" uses redis to check if a new file is on the server"""
current_header = {
key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
for key, value in response.headers.items()
}
conn = redis.Redis(host='redis', port=6379)
recent_header = conn.hgetall("header_dict")
recent_header = {
key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
for key, value in recent_header.items()
}
if 'Content-Length' not in recent_header.keys():
conn.hmset("header_dict", current_header)
return False
if recent_header['Content-Length'] != current_header['Content-Length']:
conn.hmset("header_dict", current_header)
return True
else:
return False
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'concurrency': 6
}
with DAG(
dag_id='check_ext',
start_date=datetime(2022, 11, 24),
tags=['test'],
catchup=False,
default_args=default_args,
) as dag:
check_for_new_file = HttpSensor(
task_id='check_up_for_new_file',
http_conn_id='_conn_id',
endpoint='<some-url>',
poke_interval=20,
dag=dag,
response_check=check_up_for_new_file
)
invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
task_id='run_process_with_external_files',
function_name='LAMBDA_FUNCTION',
payload=json.dumps({"source_type": "some stuff"}),
)
check_for_new_file >> invoke_lambda_function
How does this dag restart after success to check again for new files?
2
Answers
You have to take care about the following two points to have a Dag that runs everytime a sensor recognize an external event.
None
It is by design to create an infinite loop to check out the external
For some of you not familiar with HttpSensor. The base path to the server has to be defined as environment variable with
AIRFLOW_CONN_{_CONN_ID}=https://remote_server.com
. Then you can call the connection by matching the _CONN_ID.As already mentioned in the question itself, airflow is not an event based triggered system, and it’s main paradigm is to do a pre-scheduled batch processing.
Nervertheless, it’s definitely achievable, in multiple ways:
a. to use Airflow’s REST API, to trigger a specific dag (external
event system directly calls airflow’s REST API)
b. if external event system has a webhook system, use it to PUSH the event to a queue (e.g. kinesis) and calls airflow’s REST
API (to trigger a specific dag)
c. in case there is no webhook system, use a lambda (or a light docker container service) to PULL for new events from your external
event system (on a schduledqpre-defined period of time), and calls airflow’s REST API (to trigger a specific
dag)
https://brocktibert.com/post/trigger-airflow-dags-via-the-rest-api/
Triggering Airflow DAG via API