skip to Main Content

I have a SQL table that is filled with pending orders on day one. On the next day, at a specific time, I have to fetch the ‘PENDING’ orders from DB and process each of them by calling an external API. This is what my code flow looks like:

SELECT * FROM orders where status = 'PENDING' LIMIT 200

Now I will call the external API for each of those orders which will either return success or failure for each order and then I’ll update the order status in DB.

UPDATE orders SET status = 'COMPLETED' WHERE id = ANY(<success list>)

UPDATE orders SET status = 'FAILED' WHERE id = ANY(<failure list>)

The above flow will continue to run multiple times until the select query returns 0 rows. I’ve put a LIMIT in the query in order to avoid memory issues and the external API’s throughput capability.

Now there are a couple of issues with the above flow:

  1. Let’s say my code executed the SELECT query and started processing the orders. What if my service gets crashed in between? There will be some orders which would have gone through the API and would’ve received passed or failed responses. But I missed updating their status in DB, therefore when my service will start again, it will again pick those orders and process them again which I don’t want.
  2. My service can be running from multiple instances, therefore same orders whose status = ‘PENDING’ can be picked by various instances leading to double processing of the same order. How to avoid this?

If it helps, my tech stack is Go and PostgreSQL. I am sure the above are some common issues and there must be some standard ways to approach them. I am open to changing any part whether it’s the Go code or DB change which may include locks or transactions. I just want which direction to look for the solution. Any help would be appreciated.

3

Answers


  1. First of all use the Pgx library for accessing postgresql. Then you need to use transactions and row locking. For performance you can use goroutine to do concurrent selection and updation.

    Below given is a sample code for the same. Goroutines are not included in the code

        import (
        "context"
        "fmt"
    
        "github.com/georgysavva/scany/pgxscan"
        "github.com/jackc/pgx/v4/pgxpool"
    )
    
        func main() {
            pool, err := pgxpool.Connect(context.Background(), "postgres://kaushik:abcd@localhost:5432/users")
            if err != nil {
                panic(err)
            }
        
            defer func() {
                pool.Close()
            }()
        
            ctx := context.Background()
        
            tx, err := pool.Begin(ctx)
            if err != nil {
                panic(err)
            }
        
            defer func() {
                if err != nil {
                    err = tx.Rollback(ctx)
                    fmt.Printf("Error while rolling back transaction: %vn", err)
                    return
                }
                err = tx.Commit(ctx)
                if err != nil {
                    fmt.Printf("Error while committing transaction: %vn", err)
                    return
                }
            }()
        
            _, err = tx.Exec(ctx, "LOCK TABLE employee IN ROW EXCLUSIVE MODE")
            if err != nil {
                panic(err)
            }
        
            var e Employee
            err = pgxscan.Get(ctx, tx, &e, "SELECT * FROM employee WHERE id = $1 FOR UPDATE", 1)
            if err != nil {
                panic(err)
            }
        
            if e.Status == "active" {
                fmt.Println("Employee is active, deactivating...")
                e.Status = "inactive"
        
                _, err = tx.Exec(ctx, "UPDATE employee SET status = $1 WHERE id = $2", e.Status, e.ID)
                if err != nil {
                    panic(err)
                }
            }
        
        }
        
        type Employee struct {
            ID        int       `db:"id"`
            Name      string    `db:"name"`
            CreatedAt time.Time `db:"created_at"`
            UpdatedAt time.Time `db:"updated_at"`
            Status    string    `db:"status"`
        }
    
    Login or Signup to reply.
  2. Instead of

    SELECT * FROM orders WHERE status = 'PENDING' LIMIT 200
    

    do

    WITH cte AS (
        -- this is needed only because PostgreSQL doesn't
        -- support the LIMIT clause in UPDATE queries
        SELECT * FROM orders WHERE status = 'PENDING' LIMIT 200
    )
    
    -- use UPDATE-RETURNING to first change
    -- the status and then retrieve the rows
    UPDATE orders AS o SET status='PROCESSING'
    
    -- you can use this column, together with the status column,
    -- to later determine if some orders were left in limbo because
    -- of a crash *after* they were sent off to the processor
    , updated_at=now()
    
    -- use the CTE result to limit the update to
    -- the 200 rows selected in the WITH query above
    FROM cte WHERE o.id=cte.id
    
    -- make sure that this worker ignores rows that
    -- were, in the meantime, updated by another worker
    --
    -- this works because the workers will re-check
    -- the WHERE condition after the lock is released,
    -- (see the linked answer below)
    AND o.status='PENDING'
    
    -- return the matched & updated set/subset of rows
    RETURNING o.*;
    

    https://stackoverflow.com/a/11769059/965900

    Login or Signup to reply.
  3. As you are calling an external API, which might be broken or timeout etc. you might want a more row-by-row approach (perhaps not fashionable but useful in some situations still). Assuming you want to use Postgres itself and not some external programming this might work for you:

    Create a PL/pgSQL block to process the pending orders using a cursor:

    DO LANGUAGE plpgsql $$
    DECLARE
        c CURSOR FOR SELECT id FROM orders WHERE status = 'PENDING' LIMIT 200;
        order_id INTEGER;
    BEGIN
        OPEN c;
        LOOP
            FETCH c INTO order_id;
            EXIT WHEN NOT FOUND;
            PERFORM process_order(order_id);
        END LOOP;
        CLOSE c;
    END;
    $$;
    

    The heavy lifting is inside the called function. This example places an explicit commit here (i.e. per row) this may should allow the possibility that some rows will work whilst others don’t and they can be picked-up/re-processed later. You could also include logic for an "in progress" status depending on your needs and how slow/flaky that API call actually is.

    CREATE OR REPLACE FUNCTION process_order(order_id INTEGER)
    RETURNS VOID AS $$
    DECLARE
        api_result BOOLEAN;
    BEGIN
        -- Start a new transaction
        BEGIN
            -- just a placeholder, how you call that api isn't known
            api_result := call_external_api(order_id);
    
            IF api_result THEN
                UPDATE orders SET status = 'COMPLETED' WHERE id = order_id;
            ELSE
                UPDATE orders SET status = 'FAILED' WHERE id = order_id;
            END IF;
            
            -- Commit the transaction
            COMMIT;
        EXCEPTION
            WHEN OTHERS THEN
                -- Rollback the transaction in case of an error
                ROLLBACK;
        END;
    END;
    $$ LANGUAGE plpgsql;    
    

    Alternatively, you could move the commit around the cursor instead if this level of paranoia isn’t needed. Note row levels commits will add time to the process (how much I cannot tell). It might also have an effect on things like rollback log size so that may affect your choice.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search