skip to Main Content

I have an external service notification hitting an endpoint when there has been a change in my database. This pulls the updated data from the database and fires it out to connected clients via SignalR.

This service can send multiple calls within the span of a few seconds, but I’d like to only respond to the final one. I’d like to throttle (not sure if that’s the right word in this case) the calls to the database and notifications to the client. The important part here is I’d like to respond to the FINAL request withing a timeframe as I don’t want to miss any changes sent to clients.

Here’s an example of the problem I’d like to avoid. This is a log of hits to the API ending in a deadlock error (I can handle that, but would like to avoid the whole situation altogether).

RefreshOrderNotifications received at Tue Oct 03 2023 09:34:19 GMT-0400 (Eastern Daylight Time)
RefreshOrderNotifications received at Tue Oct 03 2023 09:34:57 GMT-0400 (Eastern Daylight Time)
RefreshOrderNotifications received at Tue Oct 03 2023 09:34:57 GMT-0400 (Eastern Daylight Time)
RefreshOrderNotifications received at Tue Oct 03 2023 09:34:57 GMT-0400 (Eastern Daylight Time)
Server - Error: Notification Server connection to DB failed. Transaction (Process ID 68) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

I’m having a hard time conceptualizing how to approach this problem. As each request is on a different thread, I’m not quite sure how to communicate across those threads. Would a static property even be useful or wise in this case? I’m not sure if it’s shared across threads.

I can stand to have a 1-3 second delay in the response to the API call, if that’s helpful.

Adding clarification based on comments (All good questions, thank you)

  • There is no payload to process in the API call other than to query the SQL server and send off notifications via SignalR.
  • In the endpoint call, the caller doesn’t get anything back other than an empty 200 OK. The caller just wants to kick off a notification.
  • The call sends off a notification to the connected clients.
  • I’m trying to avoid unnecessary messages to the clients and hits to the DB. I’d like to simply discard the first calls, then send a notification off for the perceived last one.

Second edit:
Based on the really helpful response from @qkhanhpro, I put this together. It has some holes as the actions aren’t atomic (I believe that’s how you refer to it), so if there is a quick succession of calls at the same time, then they will slip by. This is as good as I need for now. I’ll clean it up and do some further testing.

 public class ThrottleManager
{
    private List<string> _queue = new List<string>();

    public async Task<ThrottleResult> QueueThenRun(string runKey, int delayMilliseconds, Func<Task<ThrottleResult>> funcToRun)
    {
        var result = new ThrottleResult();

        if (_queue.Contains(runKey) || funcToRun == null)
        {           
            //we're just queuing up the call, so return success
            return result;
        }
        _queue.Add(runKey);

        await Task.Delay(delayMilliseconds);
        _queue.Remove(runKey);

        var returnMe = await funcToRun();
        return returnMe;
    }
}

public class ThrottleResult
{
    public bool Success { get; set; } = true;
    public string Message { get; set; } = string.Empty;
    public Exception? Exception { get; set; } = null;
}

2

Answers


  1. You can create a singleton class that hold a single ConcurrentDictionary. The "Key" would be the criteria you would like to "group" by, such as the OrderId or UserId, the Value should be a Task

    Whenever a request/thread hit you

    • Try to see if the Key-Value pair exists
    • If the Key exits, await the corresponding Task
    • If the Key does not exists, create the Key-Value pair and start the Task
    • The Task should incorporate a small wait time using Task.Delay()
    • The Task, after its delay should do its processing, and remove the Key-Value pair from ConcurrentDictionary as it finish
    Login or Signup to reply.
  2. This is generally known as request deduplication.

    You can achieve that with ThrottlingTroll’s SemaphoreRateLimitMethod.

    Configure ThrottlingTroll like this:

    app.UseThrottlingTroll(options =>
    {
        options.Config = new ThrottlingTrollConfig
        {
            Rules = new[]
            {
                new ThrottlingTrollRule
                {
                    // Organizing a distributed critical section (one request allowed at a time)
                    LimitMethod = new SemaphoreRateLimitMethod
                    {
                        PermitLimit = 1
                    }
                }
            }
        };
    
        options.ResponseFabric = async (check, request, response, token) =>
        {
            // Shortcutting extra requests to 200 status code and empty response
            response.StatusCode = 200;
        };
    });
    

    This will put a distributed critical section on top of your endpoint – only one request will be allowed at a time, the rest will be dismissed with 200 status code.

    Then put an await Task.Delay(numOfMilliseconds); line in front of your API method. While the first request sleeps at that line, other unwanted ones are dismissed by ThrottlingTroll. Then the first request resumes and does the job.

    And yes, by adding a Redis Counter Store you can make this truly distributed – it will behave as expected even across multiple computing instances.

    And yes, they now require me to explicitly state that ThrottlingTroll is my library.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search