skip to Main Content

I’m building a back office tool with a microservices architecture, and I’m facing challenges with handling concurrent and out-of-order events.

Context:

  • I have microservices that emit events which are used to update a read model in Elasticsearch.
  • The back office tool needs to display data such as orders, customers, addresses and many more.
  • For some things like orders the read model is build based on events coming from more than one microservice
  • Events can sometimes arrive out of order or be delayed.

Let’s imagine I have an order index, and the document looks like this:

{
    "id": "2bc3ec03-eaf4-46cb-9a41-3854098c8763", // comes from order.* events
    "line_items": [], // comes from order.* events
    "shipping_address": { // comes from shipping.* events
        "name": "",
        "surname": ""
    },
    "timestamp": 12381231238 // timestamp of the most recent event altering the data
}

Problem

Let’s imagine we have an order event that arrives with timestamp 2. It is processed and the timestamp in elasticseach is updated. After 10 minutes shipping_address.changed event arrives with timestamp of 1. If I handle out of order events based on this timestamp I would not update the shipping address in my read model.

Handling Concurrency

In both approaches I’m dealing with concurrency by handling one event for given id at a time. When event arrives I queue a job and using WithoutOverlapping Laravel’s middleware it’s guaranteed that only one of the workers will change given document at a time.

Handling out of order events

  1. Redis-Based Temporary Event Storage:

    • When event arrives, I store events in Redis temporarily
    • The job that is queued fetches all the events for given key, orders them based on timestamp from the event
    • The events in Redis have like 15 minute TTL, which seems enough for my use-case
    • Ensures that out-of-order events are handled correctly but adds complexity and memory usage concerns
    • If for some reason event is delayed for more than 15 minutes it won’t be processed, because its timestamp would be earlier than the most recent timestamp in elastic document
  2. Separate Timestamps for Different Data Sources:

    • Each part of the document that comes from different data source has its own timestamp
// Elastic Search document with timestamps
{
    "id": "2bc3ec03-eaf4-46cb-9a41-3854098c8763",
    "line_items": [],
    "shipping_address": { // source of this data are shipping.* events
        "timestamp": 123123123, // timestamp for this part
        "name": "",
        "surname": ""
    },
    "timestamp": 1123123123 // timestamp for order.* events
}
  • Updates are made only if the new event’s timestamp is more recent than the stored timestamp. For comparison I take the timestamp of the data that I want to change, so if I have order.* event I take the "root" timestamp. If I update shipping_address I compare shipping_address.timestamp with shipping_event.timestamp to check if we should apply the change.
  • Event if the event arrives couple hours late, it will still be processed
  1. Store all the events in database
    • Upon receiving an event I can store it in database
    • I can queue a job to update the index
    • The job would take all the available events that affect given index, order them and process them from 1st event ever arrived to the most recent one.
    • I think that would guarantee me the highest data integrity but I’m concerned with the amount of data that will end up in database. Technically at some point in time I can remove or archive the old events for orders that are consider completed for a long time.

Are there any other strategies of dealing with out of order events? Which of the two makes more sense? Do you see any flaws of either of the solution?

Maybe I’m doing this wrong, and there is an easier way to have data in my back office tool. I ditched the idea of querying microservices via API and merging data, because it was too slow.

2

Answers


  1. Chosen as BEST ANSWER

    Finally after a whole day of thinking about this, trying different approaches, few dog walks and coffees I think number 3 is the one I will follow.

    Mainly because my back office tool has a requirement of high data integrity, so I have to minimize the risk of having inconsistent data for a long time.

    Below I attach a diagram of the solution. As you can see I have an event store and every time new event comes I kind of reduce all the events in order by timestamp to get final product.

    Solution diagram

    For anybody wondering what is WithoutOverlapping, it’s a Laravel framework mechanism to guarantee that only one job matching the key will be executed at a time. Even if you have like a 100 workers, Laravel, using redis, will lock a key to make sure no two jobs are altering the same data simultaneously. You can probably implement something similar in your technology of choice.


  2. This is how I have dealt with this kind of problem.

    Order references the shipping

    In Elasticsearch, I have a Shipping index :

    {
      "id": "456",
      "name": "",
      "surname": "",
      "timestamp": "xxx"
    }
    

    and a OrderWithShipping index :

    {
      "id": "123",
      "order": {
        "id": "123",
        "lines": [],
        "shipping": "456",
        "timestamp": "xxx"
      },
      "shipping": {
        "id": "456",
        "name": "",
        "surname": "",
        "timestamp": "xxx"
      }
    }
    

    The job listens to order and shipping events.

    When a order event arrives, it tries to get the shipping into the Shipping index, and indexes a OrderWithShipping element (the shipping might be null).

    When a shipping event arrives, it updates the Shipping index, then updates all the OrderWithShipping elements that references the shipping.

    Ideally, you already have a service with all the shippings that you can query, so the job does not need to maintain its own index of shippings.

    Shipping references the order

    In Elasticsearch, I have a OrderWithShipping index :

    {
      "id": "123",
      "order": {
        "id": "123",
        "lines": [],
        "timestamp": "xxx"
      },
      "shipping": {
        "id": "456",
        "order": "123",
        "name": "",
        "surname": "",
        "timestamp": "xxx"
      }
    }
    

    The job listens to order and shipping events.

    When a order event arrives, it tries to get the OrderWithShipping element with the id of the order. If it exists, the job updates the order field of the element. If it does not exist, it creates a new element with the id of the order and the order (the shipping is null).

    When a shipping event arrives, it tries to get the OrderWithShipping element with the id of the order. If it exists, the job updates the shipping field of the element. If it does not exist, it creates a new element with the id of the order and the shipping (the order is null).

    At some point in time, you may have some OrderWithShipping elements with the shipping but not the order. You can discard them from your backoffice by querying OrderWithShipping with an existing order field.

    Handle conflicts

    In Elasticsearch, you have an optimistic locking mechanism that you should definitely use. Get the current version of the element in the index, update it and try to index the new version. If you get a 409, repeat !

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search