I was carrying out an experiment where I set a job with a cron
30 seconds into the future, killed the scheduler process, waited a minute or two, and then restarted it, in the expectation that the missed job would be executed as soon as the process begins again. The redis server was left on the entire time and I confirmed that the state was persisted there by looking up the key in the redis-cli. I’ve also recreated the same problem using their SQLAlchemy solution.
However the job wasn’t executed at all after the restart, so it stayed missed. I need to figure out how to get missed jobs to recover before using this in production.
Here’s the contents of the script clock.py
so you can reproduce. Any help is appreciated. I assume I have some configuration wrong or am perhaps misunderstanding the features of this system.
# apscheduler version 3.10.4
import logging
import os
import sys
import urllib.parse
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from pytz import utc
# Without this, the paths to `snake` modules etc. will not be located.
sys.path.append(os.path.join(os.path.dirname(__file__), "mysite"))
import django
django.setup()
from django.conf import settings
logging.basicConfig()
# The string "apscheduler" is necessary for logs to give good output.
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
# WEB_CONCURRENCY is set by Heroku dynos automatically
number_of_cores = int(os.environ.get("WEB_CONCURRENCY", os.cpu_count() or 1))
parsed_redis_url = urllib.parse.urlparse(settings.REDIS_URL)
redis_job_store = RedisJobStore(
host=parsed_redis_url.hostname,
port=parsed_redis_url.port,
password=parsed_redis_url.password,
db=settings.CACHE_REDIS_DB,
ssl=True if parsed_redis_url.scheme == "rediss" else False,
ssl_cert_reqs=None,
)
# ==========
## APScheduler Configuration
job_defaults = {
'max_instances': 1,
'misfire_grace_time': 60 * 50
'coalesce': True
}
executors = {'default': ThreadPoolExecutor(number_of_cores)}
jobstores = {
'default': redis_job_store,
}
scheduler = BlockingScheduler(
executors=executors,
logger=logging,
job_defaults=job_defaults,
timezone=utc,
jobstores=jobstores,
)
@scheduler.scheduled_job('cron', hour=20, minute=38, second=40, name="print_ongoing", id="1")
def print_ongoing():
from time import sleep
from django.utils.crypto import get_random_string
unique_id = get_random_string(length=4)
s = 0
while s < 1000:
print(unique_id)
s += 1
sleep(1)
scheduler.start()
I also have the contents of the serialized job from redis
{
'version': 1,
'id': '1',
'func': '__main__:print_ongoing',
'trigger': {
'__type__': 'apscheduler.triggers.cron.CronTrigger',
'timezone': 'pytz._UTC',
'start_date': None,
'end_date': None,
'fields': [
{
'__type__': 'apscheduler.triggers.cron.fields.BaseField',
'name': 'year',
'is_default': True,
'expressions': [
{
'__type__': 'apscheduler.triggers.cron.expressions.AllExpression',
'step': None
}
]
},
{
'__type__': 'apscheduler.triggers.cron.fields.MonthField',
'month': None,
'range': None,
'expressions': [
{
'day': None,
'range': None
}
]
},
{
'__type__': 'apscheduler.triggers.cron.fields.DayOfMonthField',
'day': None,
'range': None,
'expressions': [
{
'day': None,
'range': None
}
]
},
{
'__type__': 'apscheduler.triggers.cron.fields.WeekField',
'week': None,
'range': None,
'expressions': [
{
'week': None,
'range': None
}
]
},
{
'__type__': 'apscheduler.triggers.cron.fields.DayOfWeekField',
'day_of_week': None,
'range': None,
'expressions': [
{
'day_of_week': None,
'range': None
}
]
},
{
'hour': None,
'range': {
'first': 20,
'last': 20
}
},
{
'minute': None,
'range': {
'first': 20,
'last': 20
}
},
{
'second': None,
'range': {
'first': 20,
'last': 20
}
}
]
},
'jitter': None,
'executor': 'default',
'args': None,
'kwargs': {
'print_ongoing': None
},
'misfire_grace_time': 3000,
'coalesce': True,
'max_instances': 1,
'next_run_time': '2024-02-16 00:20:00'
}
2
Answers
After speaking with the maintainer of the library, Alex Grönholm, on Gitter we figured out that
@scheduler.scheduled_job('cron', hour=20, minute=38, second=40, name="print_ongoing", id="1")
addreplace_existing=True
replace_existing=True
part will always remove the previous job.scheduler.add_job(print_ongoing, 'cron', hour=19, minute=28, second=40, replace_existing=False)
then the system will catch up on old jobs.... however, it will also create a 2nd (3rd, 4th, etc.) job on each start-up. This is because there is no identifierscheduler.add_job(print_ongoing, 'cron', hour=19, minute=28, second=40, replace_existing=False, id=2)
prevents clones of the same job from being created... but still does not solve the problem because now it's impossible to restart the script after it's already been running.KeyError
s for duplicates can be rescued. Then stop that scheduler and switch for a blocking scheduler.I think the issue may just be that you’re using coalesce:
https://apscheduler.readthedocs.io/en/3.x/userguide.html#missed-job-executions-and-coalescing