There is a valid TelegramOperator function:
send_telegram_message = TelegramOperator(
task_id="send_telegram_message",
token="<token>",
chat_id="<chat ID>",
text="Some notification"
)
It works that way:
t1 >> send_telegram_message
But I want it to notify me about failed tasks. I’ve tried to put it into default args, but it causes ‘broken DAG’ error somewhy and I cannot see the reason in UI:
default_args={
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"on_failure_callback": send_telegram_message,
"retries": 1,
"retry_delay": timedelta(seconds=30)
},
If I put on_failure_callback param into task parameters, it simply doesn’t work either in on_failure_callback or on_success_callback cases (no error traceback too, just successfully completed task in UI):
t1 = BashOperator(
task_id="entering_virtual_environment",
depends_on_past=False,
bash_command="source /home/fitwist/airflow/airflow_env/bin/activate",
retries=2,
on_failure_callback=send_telegram_message
)
I’ve also tried to create higher order function and put it inside DAG (nothing is being send to Telegram):
def task_success_callback(context):
op = TelegramOperator(
task_id="send_telegram_message",
token="<token>",
chat_id="<chat ID>",
text="Some notification"
)
op.execute(context)
with DAG(
"monthly_effectiveness",
default_args={
...
},
...
on_failure_callback=send_telegram_message,
...
) as dag:
...
Using direct function call in task flow causes nothing, not even an error:
t1 >> t2 >> send_telegram_message
send_telegram_message >> send_failed_telegram_message
How do I get failure notifications in Telegram?
2
Answers
This thread helped me to get the logic needed:
From docs:
So your callback function should accept one positional argument.
Example callback function:
Errors when callback functions cannot be executed and logs printed inside the callback functions will be available in the scheduler logs
<airflow-logs-path>/logs/scheduler/<date>/<dag-name>.py.log
.