I want to send notification when Airflow DockerOperator can’t create Docker image. Notification can be sent in case of DockerOperator execution errors using on_failure_callback
.
To be more specific, I want to catch 2 errors.
- private Docker repository is not running(10.11.12.13 is not running in example below)
- execution server is not running(20.21.22.23:2345 is not running in example below)
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.docker.operators.docker import DockerOperator
def send_slack():
print('send error message')
default_args = {
'on_failure_callback': send_slack,
}
with DAG(
dag_id='test_dag',
default_args=default_args,
schedule_interval='45 * * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
dagrun_timeout=timedelta(minutes=420),
concurrency=1,
tags=['test']
) as dag:
t = DockerOperator(
task_id="test_operator",
container_name="test_container",
image=f"10.11.12.13/myapp:latest",
force_pull=False,
auto_remove=True,
command = " python my_test.py ",
docker_url="tcp://20.21.22.23:2375",
cpus=1,
mem_limit="1g",
mount_tmp_dir=False
)
t
if __name__ == "__main__":
dag.cli()
2
Answers
According to Jarek's answer, I implement very rough CustomOperator below.
If you want to handle the specific errors, you need to look at the code of the DockerOperator and create your own custom operator:
You can look at how the operator is implemented and override specific methods, catch specific exception and handle it in the way you see best for you.
And then – use MyCustomDockerOperator in your DAG. You can even add it to a shared util code if you want to use it in multiple DAGs.
https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html