skip to Main Content

Let’s suppose we have a list of tasks to be executed and some workers that pop items from that list.
If a worker crashes unexpectedly before finishing the execution of the task then that task is lost.
What kind of mechanism could prevent that so we can reprocess abandoned tasks?

2

Answers


  1. Chosen as BEST ANSWER

    There is no EXPIRE for set or zset members and there is no atomic operation to pop from zset and push to list. So I ended writing this lua script which runs atomically.

    First I add a task to the executing-tasks zset with a timestamp score ((new Date()).valueOf() in javascript):

    ZADD 1619028226766 executing-tasks

    Then I run the script:

    EVAL [THE SCRIPT] 2 executing-tasks tasks 1619028196766

    If the task is more than 30 seconds old it will be sent to the tasks list. If not, it will be sent back to the executing-tasks zset.

    Here is the script

      local source = KEYS[1]
      local destination = KEYS[2]
      local min_score = ARGV[1]
      local popped = redis.call('zpopmin', source)
      local id = popped[1]
      local score = popped[2]
      if table.getn(popped) > 0 then
        if score < min_score then
          redis.call('rpush', destination, id)
          return { "RESTORED", id }
        else
          redis.call('zadd', source, score, id)
          return { "SENT_BACK", id }
        end
      end
      return { "NOTHING_DONE" }
    

  2. You need to use ZSET to solve this issue

    Pop operation

    • Add to ZSET with expiry time
    • Remove from list

    Ack Operation

    • Remove from ZSET

    Worker

    You need to run a scheduled worker that will move items from ZSET to list if they are expired

    Read it in detail, how I did in Rqueue https://medium.com/@sonus21/introducing-rqueue-redis-queue-d344f5c36e1b

    Github Code: https://github.com/sonus21/rqueue

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search