I’m using Django 4.0.4 and huey 2.4.3. What I would like to atchieve it’s to run a task everyday at 10am, using a periodic task.
My task folder path => project/apps/utils/tasks
+-- project/
| +-- apps/
| +-- utils/
| +-- tasks/
| +--__init__.py
| +--sms_task.py
| +--tasks.py
In the init.py file in tasks folder I’ve imported all tasks:
__all__ = ["tasks", "sms_task"]
Here my Huey Config in settings.py file:
HUEY = {
'huey_class': 'huey.RedisHuey', # Huey implementation to use.
'name': 'ASISPO', # Name of the Redis connection.
'immediate': False,
'connection': {
'url': env('REDIS_URL', default=None), # Allow Redis config via a DSN.
},
'consumer': {
'blocking': True,
'loglevel': True,
'workers': 8, # Number of consumer workers.
'scheduler_interval': 1, # Check schedule every second, -s.
'health_check_interval': 5, # Check worker health every second.
'simple_log': True,
},
}
What I’ve done so far in my tasks.py file:
@periodic_task(crontab(hour='10'))
def getjplus1():
calculation = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
filter_fields = {
'date_ope': calculation,
'escalade': 'False',
'rel_dou': 'False',
'rel_dou_ok':'False',
'rel_hemo':'False',
'rel_hemo_ok':'False'
}
for key, value in filter_fields.items():
suivis_patient = SuiviPatient.objects.filter(
response_suivi_patient__suivi_field_name__icontains=key ,
response_suivi_patient__response__icontains=value,
status='planifié',
archived=False
)
get_suivi_patient_j1(list(set(suivis_patient)))
But it’s not running at 10am.
however it works when I run periodic_task every minute like so:
@periodic_task(crontab(minute="*/1"))
def getjplus1():
calculation = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
filter_fields = {
'date_ope': calculation,
'escalade': 'False',
'rel_dou': 'False',
'rel_dou_ok':'False',
'rel_hemo':'False',
'rel_hemo_ok':'False'
}
for key, value in filter_fields.items():
suivis_patient = SuiviPatient.objects.filter(
response_suivi_patient__suivi_field_name__icontains=key ,
response_suivi_patient__response__icontains=value,
status='planifié',
archived=False
)
get_suivi_patient_j1(list(set(suivis_patient))
)
2
Answers
For this you can use via
pip install django-celery
and use celery.Since you mentioned that you are using
huey
package as part of your code. You can utilize itsperiodic_task
decorator and add crontab within it.You can also check its documentation to know more about huey periodic task : https://huey.readthedocs.io/en/latest/guide.html#periodic-tasks