skip to Main Content

Each row of my table tasks is a task, and I have N concurrently running task processors.

tasks {
   creation_time: timestamp,
   processing_attempts: smallint,
   # ... <more fields related to the task>
}

The tasks are unsanitized user input, so I should be prepared for anything, even crashes on the processor. I want each processor to select the oldest unlocked row with less than K processing attempts (I could do that with SELECT ... FOR UPDATE SKIP LOCKED).

Then I want to immediately increment (and commit) the processing_attempts, so that if this process crashes, the culprit task already has a mark, which if it reaches K, will exclude it from the queue. Then I want to continue processing the task without the risk of another processor taking it.

Apparently this can’t be done inside a transaction, because it seems there is no way to commit and keep the exclusive lock on the row (i.e. commit and immediately start a new transaction with the same locks).

So, what would be the solution for me here that:

  • Just one processor gets one task;
  • Once a task is taken by a processor, if the session ends unexpectedly, its processing_attempts will be necessarily incremented.

2

Answers


  1. Chosen as BEST ANSWER

    After posting this question, I had an idea, which seems to be similar to @JohnH comment, but I am not sure.

    I need two simultaneous connections to the database for it.

    I add another table to hold the processing_attempts, and do the following:

    1. From connection 1 start the transaction and SELECT ... FOR UPDATE SKIP LOCKED;;
    2. when I get the task, from connection 2 I UDPATE the processing_attempts for that task I got (or INSERT if missing) and commit;
    3. From the same connection 2, I open a transaction and delete the row I just updated, but leave it open;
    4. Then I process the task;
    5. If everything went right, I delete the task via connection 1;
    6. Commit both transactions.

    I haven't tested it, and my fear is that the transactions would revert in SERIALIZABLE isolation level, because I would need to join both tables in order to get the task.

    And after writing all this, it sounds a terrible idea.


  2. Add an additional column started_processing of type timestamp with time zone with a default value of -infinity.

    Whenever a processor grabs a task, it sets the column to the current_timestamp and commits right away. The column acts as a lock that can time out, so if the processor crashes, the row won’t be locked for ever.

    Processors search jobs woth a query like

    SELECT ...
    FROM tasks
    WHERE started_processing < current_timestamp - INTERVAL '5 minutes'
    LIMIT 1
    FOR UPDATE SKIP LOCKED;
    

    Here, 5 minutes stands for the maximum execution time that a task is allowed.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search