skip to Main Content

In Django, I have created an app for sending emails to clients. I am using Celery Beat to schedule a daily task that executes the schedule_emails function at 12:30 AM. This function runs perfectly. However, I am encountering a problem where the send_email function is running multiple times and sending duplicate emails to specific clients from my email account at the same time.

I have checked the logs of the celery worker it shows that a particular task_id was received multiple times and succeed succeeded multiple times.

I am using the supervisor to run celery and celery_beat services in the background.

settings.py

# Celery Settings
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERY_TASK_ACKS_LATE = True
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BEAT_SCHEDULE_FILENAME = '.celery/beat-schedule'
CELERYD_LOG_FILE = '.celery/celery.log'
CELERYBEAT_LOG_FILE = '.celery/celerybeat.log'

CELERY_BEAT_SCHEDULE = {
    'schedule_emails': {
        'task': 'myapp.tasks.schedule_emails',
        'schedule': crontab(hour=0, minute=30),
    },
}

celery.py

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery
from django.conf import settings
from decouple import config

if config('ENVIRONMENT') == 'development':
    os.environ.setdefault('DJANGO_SETTINGS_MODULE',
                            'MyProject.settings.development')
else:
    os.environ.setdefault('DJANGO_SETTINGS_MODULE',
                            'MyProject.settings.production')

app = Celery('MyProject**strong text**')
app.conf.enable_utc = False

app.conf.update(timezone='Asia/Kolkata')

app.config_from_object(settings, namespace='CELERY')

app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request : {self.request!r}')

task.py

from celery import shared_task
@shared_task
def send_emails(client_email):
    # Code to send Email

@shared_task
def schedule_emails():
    client_data = [] # List of dictionaries with client email and the time 

    for data in client_data :
        send_emails.apply_async(args=[data.get('email')],eta=data.get('time'))

Supervisor Settings

[program:celery_worker]
command=path-to-enviroment/bin/celery -A MyProject worker --loglevel=info
directory=project-path
user=admin
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=project-path/.celery/celery_worker.log

[program:celery_beat]
command=path-to-enviroment/bin/celery -A MyProject beat --loglevel=info
directory=project-path
user=username
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=project-path/MyProject/.celery/celery_beat.log```

celery_beat.log

[2023-07-01 19:23:51,519: INFO/MainProcess] beat: Starting...
[2023-07-02 00:30:00,056: INFO/MainProcess] Scheduler: Sending due task schedule_emails (auto_email.tasks.schedule_emails)
[2023-07-02 04:00:00,091: INFO/MainProcess] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
[2023-07-03 00:30:00,091: INFO/MainProcess] Scheduler: Sending due task schedule_emails (auto_email.tasks.schedule_emails)
[2023-07-03 04:00:00,092: INFO/MainProcess] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)
[2023-07-04 00:30:00,091: INFO/MainProcess] Scheduler: Sending due task schedule_emails (auto_email.tasks.schedule_emails)
[2023-07-04 04:00:00,090: INFO/MainProcess] Scheduler: Sending due task celery.backend_cleanup (celery.backend_cleanup)

celery_worker.log

[2023-07-04 00:30:37,760: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 01:32:16,692: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 01:35:23,098: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 02:36:54,644: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 03:38:36,868: INFO/MainProcess] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] received
[2023-07-04 03:38:44,074: INFO/ForkPoolWorker-1] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] succeeded in 3.069899449998047s: None
[2023-07-04 03:38:44,081: INFO/ForkPoolWorker-1] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] succeeded in 0.004782014002557844s: None
[2023-07-04 03:38:44,190: INFO/ForkPoolWorker-2] Task auto_email.tasks.send_emails[66c8ef59-34e2-48be-85e4-ed14cd9b56cf] succeeded in 3.1878137569874525s: None



2

Answers


  1. Chosen as BEST ANSWER

    I have used redis as a broker in which the default value of visibility_timeout is 3600 which is 1 hour. visibility_timeout is a setting by which we can set how long the task is invisible to other workers. During this time the worker should execute the task and send acknowledge signal to the Redis broker. If within that period task is not marked as acknowledged, the Redis broker assigns that task again to the worker. I have increased the value of visibility_timeout to 1 day because I am schedule my task for 1 da only.

    settings.py

    CELERY_BROKER_TRANSPORT_OPTIONS = {
        'visibility_timeout': 86400  # Set the visibility timeout to 1 Day
    }
    
    

  2. This is in your for loop code :

    for data in client_data :
            send_emails.apply_async(args=[data.get('email')],eta=data.get('time'))
    

    with each apply_async you are executing a new task and celery receives all those tasks.

    so now if the emails are duplicate make sure client_data is valid.

    and if so then try getting the latest data in your send_emails task.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search