skip to Main Content

I want to send notification when Airflow DockerOperator can’t create Docker image. Notification can be sent in case of DockerOperator execution errors using on_failure_callback.

To be more specific, I want to catch 2 errors.

  1. private Docker repository is not running(10.11.12.13 is not running in example below)
  2. execution server is not running(20.21.22.23:2345 is not running in example below)
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.docker.operators.docker import DockerOperator

def send_slack():
    print('send error message')

default_args = {
    'on_failure_callback': send_slack,
}

with DAG(
    dag_id='test_dag',
    default_args=default_args,
    schedule_interval='45 * * * *',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(minutes=420),
    concurrency=1,
    tags=['test']
) as dag:

    t = DockerOperator(
        task_id="test_operator",
        container_name="test_container",
        image=f"10.11.12.13/myapp:latest",
        force_pull=False,
        auto_remove=True,
        command = " python my_test.py ",
        docker_url="tcp://20.21.22.23:2375",
        cpus=1,
        mem_limit="1g",
        mount_tmp_dir=False
    )

    t

if __name__ == "__main__":
    dag.cli()

2

Answers


  1. Chosen as BEST ANSWER

    According to Jarek's answer, I implement very rough CustomOperator below.

    class MyCustomDockerOperator(DockerOperator):
        def execute(self, context) -> Option[str]:
            try:
                super().execute(context)
            except AirflowException:
                raise
            except Exception as e:
                raise AirflowException('something is wrong in creating docker image.') from e
    

  2. If you want to handle the specific errors, you need to look at the code of the DockerOperator and create your own custom operator:

    class MyCustomDockerOperator(DockerOperator):
        ... # here implement any customisations
    

    You can look at how the operator is implemented and override specific methods, catch specific exception and handle it in the way you see best for you.

    And then – use MyCustomDockerOperator in your DAG. You can even add it to a shared util code if you want to use it in multiple DAGs.

    https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search