In Django, I have created an app for sending emails to clients. I am using Celery Beat to schedule a daily task that executes the schedule_emails
function at 12:30 AM. This function runs perfectly. However, I am encountering a problem where the send_email
function is running multiple times and sending duplicate emails to specific clients from my email account at the same time.
I have checked the logs of the celery worker it shows that a particular task_id
was received multiple times and succeed succeeded multiple times.
I am using the supervisor to run celery
and celery_beat
services in the background.
settings.py
# Celery Settings
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERY_TASK_ACKS_LATE = True
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BEAT_SCHEDULE_FILENAME = '.celery/beat-schedule'
CELERYD_LOG_FILE = '.celery/celery.log'
CELERYBEAT_LOG_FILE = '.celery/celerybeat.log'
CELERY_BEAT_SCHEDULE = {
'schedule_emails': {
'task': 'myapp.tasks.schedule_emails',
'schedule': crontab(hour=0, minute=30),
},
}
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from decouple import config
if config('ENVIRONMENT') == 'development':
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'MyProject.settings.development')
else:
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'MyProject.settings.production')
app = Celery('MyProject**strong text**')
app.conf.enable_utc = False
app.conf.update(timezone='Asia/Kolkata')
app.config_from_object(settings, namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request : {self.request!r}')
task.py
from celery import shared_task
@shared_task
def send_emails(client_email):
# Code to send Email
@shared_task
def schedule_emails():
client_data = [] # List of dictionaries with client email and the time
for data in client_data :
send_emails.apply_async(args=[data.get('email')],eta=data.get('time'))
Supervisor Settings
[program:celery_worker]
command=path-to-enviroment/bin/celery -A MyProject worker --loglevel=info
directory=project-path
user=admin
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=project-path/.celery/celery_worker.log
[program:celery_beat]
command=path-to-enviroment/bin/celery -A MyProject beat --loglevel=info
directory=project-path
user=username
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=project-path/MyProject/.celery/celery_beat.log```
celery_beat.log
[2023-07-01 19:23:51,519: INFO/MainProcess] beat: Starting...
[2023-07-02 00:30:00,056: INFO/MainProcess] Scheduler: Sending due task schedule_emails (auto_email.tasks.schedule_emails)
[2023-07-02 04:00:00,091: INFO/MainProcess] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
[2023-07-03 00:30:00,091: INFO/MainProcess] Scheduler: Sending due task schedule_emails (auto_email.tasks.schedule_emails)
[2023-07-03 04:00:00,092: INFO/MainProcess] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
[2023-07-04 00:30:00,091: INFO/MainProcess] Scheduler: Sending due task schedule_emails (auto_email.tasks.schedule_emails)
[2023-07-04 04:00:00,090: INFO/MainProcess] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
celery_worker.log
[2023-07-04 00:30:37,760: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 01:32:16,692: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 01:35:23,098: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 02:36:54,644: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 03:38:36,868: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 03:38:44,074: INFO/ForkPoolWorker-1] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] succeeded in 3.069899449998047s: None
[2023-07-04 03:38:44,081: INFO/ForkPoolWorker-1] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] succeeded in 0.004782014002557844s: None
[2023-07-04 03:38:44,190: INFO/ForkPoolWorker-2] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] succeeded in 3.1878137569874525s: None
2
Answers
I have used
redis
as a broker in which the default value ofvisibility_timeout
is3600
which is 1 hour.visibility_timeout
is a setting by which we can set how long the task is invisible to other workers. During this time the worker should execute the task and send acknowledge signal to the Redis broker. If within that period task is not marked as acknowledged, the Redis broker assigns that task again to the worker. I have increased the value ofvisibility_timeout
to 1 day because I am schedule my task for 1 da only.settings.py
This is in your for loop code :
with each
apply_async
you are executing a new task and celery receives all those tasks.so now if the emails are duplicate make sure
client_data
is valid.and if so then try getting the latest data in your
send_emails
task.