skip to Main Content

I’m using Django 4.0.4 and huey 2.4.3. What I would like to atchieve it’s to run a task everyday at 10am, using a periodic task.

My task folder path => project/apps/utils/tasks

+-- project/
|   +-- apps/
|       +-- utils/
|           +-- tasks/
|               +--__init__.py
|               +--sms_task.py
|               +--tasks.py

In the init.py file in tasks folder I’ve imported all tasks:

__all__ = ["tasks", "sms_task"]

Here my Huey Config in settings.py file:

    HUEY = {
    'huey_class': 'huey.RedisHuey',  # Huey implementation to use.
    'name': 'ASISPO',  # Name of the Redis connection.
    'immediate': False,
    'connection': {
        'url': env('REDIS_URL', default=None),  # Allow Redis config via a DSN.
    },
    'consumer': {
        'blocking': True,
        'loglevel': True,
        'workers': 8,  # Number of consumer workers.
        'scheduler_interval': 1,  # Check schedule every second, -s.
        'health_check_interval': 5,  # Check worker health every second.
        'simple_log': True,
        },
    }

What I’ve done so far in my tasks.py file:

@periodic_task(crontab(hour='10'))
def getjplus1():
    calculation = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
    filter_fields = {
        'date_ope': calculation,
        'escalade': 'False',
        'rel_dou': 'False',
        'rel_dou_ok':'False',
        'rel_hemo':'False',
        'rel_hemo_ok':'False'
        }
    for key, value in filter_fields.items():
        suivis_patient = SuiviPatient.objects.filter(
            response_suivi_patient__suivi_field_name__icontains=key ,
            response_suivi_patient__response__icontains=value, 
            status='planifié',
            archived=False
            )
    
    get_suivi_patient_j1(list(set(suivis_patient)))

But it’s not running at 10am.

however it works when I run periodic_task every minute like so:

@periodic_task(crontab(minute="*/1"))
def getjplus1():
    calculation = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
    filter_fields = {
        'date_ope': calculation,
        'escalade': 'False',
        'rel_dou': 'False',
        'rel_dou_ok':'False',
        'rel_hemo':'False',
        'rel_hemo_ok':'False'
        }
    for key, value in filter_fields.items():
        suivis_patient = SuiviPatient.objects.filter(
            response_suivi_patient__suivi_field_name__icontains=key ,
            response_suivi_patient__response__icontains=value, 
            status='planifié',
            archived=False
            )
    
    get_suivi_patient_j1(list(set(suivis_patient))

)

2

Answers


  1. For this you can use via pip install django-celery and use celery.

    from celery.schedules import crontab
    from celery.task import periodic_task
    
    @periodic_task(run_every=crontab(hour=10, minute=0))
    def every_day():
        print("This is run every Monday morning at 10:00")
    
    Login or Signup to reply.
  2. Since you mentioned that you are using huey package as part of your code. You can utilize its periodic_task decorator and add crontab within it.

    from huey import crontab
    
    @huey.periodic_task(crontab(hour='10'))
    def every_ten_in_the_morning():
        print('This task runs every 10 in the morning.')
    

    You can also check its documentation to know more about huey periodic task : https://huey.readthedocs.io/en/latest/guide.html#periodic-tasks

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search