skip to Main Content

I was carrying out an experiment where I set a job with a cron 30 seconds into the future, killed the scheduler process, waited a minute or two, and then restarted it, in the expectation that the missed job would be executed as soon as the process begins again. The redis server was left on the entire time and I confirmed that the state was persisted there by looking up the key in the redis-cli. I’ve also recreated the same problem using their SQLAlchemy solution.

However the job wasn’t executed at all after the restart, so it stayed missed. I need to figure out how to get missed jobs to recover before using this in production.

Here’s the contents of the script clock.py so you can reproduce. Any help is appreciated. I assume I have some configuration wrong or am perhaps misunderstanding the features of this system.

# apscheduler version 3.10.4

import logging
import os
import sys
import urllib.parse

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from pytz import utc

# Without this, the paths to `snake` modules etc. will not be located.
sys.path.append(os.path.join(os.path.dirname(__file__), "mysite"))

import django

django.setup()

from django.conf import settings

logging.basicConfig()
# The string "apscheduler" is necessary for logs to give good output.
logging.getLogger('apscheduler').setLevel(logging.DEBUG)


# WEB_CONCURRENCY is set by Heroku dynos automatically
number_of_cores = int(os.environ.get("WEB_CONCURRENCY", os.cpu_count() or 1))


parsed_redis_url = urllib.parse.urlparse(settings.REDIS_URL)
redis_job_store = RedisJobStore(
    host=parsed_redis_url.hostname,
    port=parsed_redis_url.port,
    password=parsed_redis_url.password,
    db=settings.CACHE_REDIS_DB,
    ssl=True if parsed_redis_url.scheme == "rediss" else False,
    ssl_cert_reqs=None,
)

# ==========
## APScheduler Configuration

job_defaults = {
    'max_instances': 1,
    'misfire_grace_time': 60 * 50
    'coalesce': True
}

executors = {'default': ThreadPoolExecutor(number_of_cores)}


jobstores = {
    'default': redis_job_store,
}

scheduler = BlockingScheduler(
    executors=executors,
    logger=logging,
    job_defaults=job_defaults,
    timezone=utc,
    jobstores=jobstores,
)


@scheduler.scheduled_job('cron', hour=20, minute=38, second=40, name="print_ongoing", id="1")
def print_ongoing():
    from time import sleep

    from django.utils.crypto import get_random_string

    unique_id = get_random_string(length=4)
    s = 0
    while s < 1000:
        print(unique_id)
        s += 1
        sleep(1)


scheduler.start()

I also have the contents of the serialized job from redis

{
    'version': 1,
    'id': '1',
    'func': '__main__:print_ongoing',
    'trigger': {
        '__type__': 'apscheduler.triggers.cron.CronTrigger',
        'timezone': 'pytz._UTC',
        'start_date': None,
        'end_date': None,
        'fields': [
            {
                '__type__': 'apscheduler.triggers.cron.fields.BaseField',
                'name': 'year',
                'is_default': True,
                'expressions': [
                    {
                        '__type__': 'apscheduler.triggers.cron.expressions.AllExpression',
                        'step': None
                    }
                ]
            },
            {
                '__type__': 'apscheduler.triggers.cron.fields.MonthField',
                'month': None,
                'range': None,
                'expressions': [
                    {
                        'day': None,
                        'range': None
                    }
                ]
            },
            {
                '__type__': 'apscheduler.triggers.cron.fields.DayOfMonthField',
                'day': None,
                'range': None,
                'expressions': [
                    {
                        'day': None,
                        'range': None
                    }
                ]
            },
            {
                '__type__': 'apscheduler.triggers.cron.fields.WeekField',
                'week': None,
                'range': None,
                'expressions': [
                    {
                        'week': None,
                        'range': None
                    }
                ]
            },
            {
                '__type__': 'apscheduler.triggers.cron.fields.DayOfWeekField',
                'day_of_week': None,
                'range': None,
                'expressions': [
                    {
                        'day_of_week': None,
                        'range': None
                    }
                ]
            },
            {
                'hour': None,
                'range': {
                    'first': 20,
                    'last': 20
                }
            },
            {
                'minute': None,
                'range': {
                    'first': 20,
                    'last': 20
                }
            },
            {
                'second': None,
                'range': {
                    'first': 20,
                    'last': 20
                }
            }
        ]
    },
    'jitter': None,
    'executor': 'default',
    'args': None,
    'kwargs': {
        'print_ongoing': None
    },
    'misfire_grace_time': 3000,
    'coalesce': True,
    'max_instances': 1,
    'next_run_time': '2024-02-16 00:20:00'
}

2

Answers


  1. Chosen as BEST ANSWER

    After speaking with the maintainer of the library, Alex Grönholm, on Gitter we figured out that

    1. The decorators (@scheduler.scheduled_job('cron', hour=20, minute=38, second=40, name="print_ongoing", id="1") add replace_existing=True
    2. The replace_existing=True part will always remove the previous job.
    3. By using the non-decorator version scheduler.add_job(print_ongoing, 'cron', hour=19, minute=28, second=40, replace_existing=False) then the system will catch up on old jobs.... however, it will also create a 2nd (3rd, 4th, etc.) job on each start-up. This is because there is no identifier
    4. Adding an id - e.g. scheduler.add_job(print_ongoing, 'cron', hour=19, minute=28, second=40, replace_existing=False, id=2) prevents clones of the same job from being created... but still does not solve the problem because now it's impossible to restart the script after it's already been running.
    5. The ultimate solution was to 1) use a paused non-blocking scheduler to add jobs in a way that the KeyErrors for duplicates can be rescued. Then stop that scheduler and switch for a blocking scheduler.
    number_of_cores = os.cpu_count() 
    
    
    # ==========
    ## APScheduler Configuration
    
    job_defaults = {
        'max_instances': 1,  # Only one instance of each job can run at a time
        # If the clock crashes, make sure to run the missed job if it's up to this much late.
        'misfire_grace_time': 60 * 60,
        'coalesce': True,  # Prevent jobs being run being run in parallel if the server was down for a while.
    }
    
    jobstores = {
        'default': redis_job_store,
    }
    
    executors = {'default': ThreadPoolExecutor(), 'processpool': ProcessPoolExecutor(number_of_cores)}
    
    background_scheduler = BackgroundScheduler(
        executors=executors,
        logger=logging,
        job_defaults=job_defaults,
        jobstores=jobstores,
    )
    
    
    background_scheduler.start(paused=True)
    
    blocking_scheduler = BlockingScheduler(
        executors=executors,
        logger=logging,
        job_defaults=job_defaults,
        jobstores=jobstores,
    )
    
    
    
    def schedule(func, schedule_type, **kwargs):
        job_id = func.__name__
    
        try:
            background_scheduler.add_job(func, schedule_type, **kwargs, id=job_id, replace_existing=False)
        except ConflictingIdError as e:
            logging.info(f"Job {job_id} already scheduled. Skipping.")
            next_run_time = background_scheduler.get_job(job_id).next_run_time
            logging.info(f"It will next run at {next_run_time}.")
    
    
    def heartbeat()::
        logging.info("Heartbeat heard.")
    
    
    schedule(heartbeat, 'interval', minutes=10)
    
    

  2. I think the issue may just be that you’re using coalesce:

    if coalescing is enabled for the job and the scheduler sees one or more queued executions for the job, it will only trigger it once. No misfire events will be sent for the “bypassed” runs.

    https://apscheduler.readthedocs.io/en/3.x/userguide.html#missed-job-executions-and-coalescing

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search