skip to Main Content

There is a valid TelegramOperator function:


    send_telegram_message = TelegramOperator(
        task_id="send_telegram_message",
        token="<token>",
        chat_id="<chat ID>",
        text="Some notification"
    )

It works that way:

t1 >> send_telegram_message

But I want it to notify me about failed tasks. I’ve tried to put it into default args, but it causes ‘broken DAG’ error somewhy and I cannot see the reason in UI:


    default_args={
        "depends_on_past": False,
        "email_on_failure": False,
        "email_on_retry": False,
        "on_failure_callback": send_telegram_message,
        "retries": 1,
        "retry_delay": timedelta(seconds=30)
    },


If I put on_failure_callback param into task parameters, it simply doesn’t work either in on_failure_callback or on_success_callback cases (no error traceback too, just successfully completed task in UI):


    t1 = BashOperator(
            task_id="entering_virtual_environment",
            depends_on_past=False,
            bash_command="source /home/fitwist/airflow/airflow_env/bin/activate",
            retries=2,
            on_failure_callback=send_telegram_message
    )


I’ve also tried to create higher order function and put it inside DAG (nothing is being send to Telegram):


    def task_success_callback(context):
        op = TelegramOperator(
            task_id="send_telegram_message",
            token="<token>",
            chat_id="<chat ID>",
            text="Some notification"
    
        )
        op.execute(context)
    
    
    with DAG(
        "monthly_effectiveness",
        default_args={
            ...
        },
        ...
        on_failure_callback=send_telegram_message,
        ...
    ) as dag:
    ...


Using direct function call in task flow causes nothing, not even an error:


    t1 >> t2 >> send_telegram_message
    send_telegram_message >> send_failed_telegram_message

How do I get failure notifications in Telegram?

2

Answers


  1. Chosen as BEST ANSWER

    This thread helped me to get the logic needed:

    from airflow.utils.trigger_rule import TriggerRule
    
    [T1, T2, T3, ...] >> DummyOperator(
        dag=dag,
        task_id="task_a",
        trigger_rule=TriggerRule.ONE_FAILED
    )
    

  2. From docs:

    on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails. A context dictionary is passed as a single parameter to this function.

    So your callback function should accept one positional argument.

    Example callback function:

    def my_failure_callback(_context):
        print("*****************DAG failed****************")
    
    
    dag = DAG("my_dag",
              start_date=datetime(2023, 1, 1),
              catchup=False,
              on_failure_callback=my_failure_callback)
    
    
    op1 = BashOperator(task_id="explicit_failure",
                       bash_command="fail this",
                       dag=dag)
    

    Errors when callback functions cannot be executed and logs printed inside the callback functions will be available in the scheduler logs <airflow-logs-path>/logs/scheduler/<date>/<dag-name>.py.log.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search