skip to Main Content

I have a Windows service that reads data from the database and processes this data using multiple REST API calls.

Originally, this service ran on a timer where it would read unprocessed data from the database and process it using multiple threads limited using SemaphoreSlim. This worked well except that the database read had to wait for all processing to finish before reading again.

ServicePointManager.DefaultConnectionLimit = 10;

Original that works:

// Runs every 5 seconds on a timer
private void ProcessTimer_Elapsed(object sender, ElapsedEventArgs e)
{
    var hasLock = false;
    try
    {
        Monitor.TryEnter(timerLock, ref hasLock);
        if (hasLock)
        {
            ProcessNewData();
        }
        else
        {
            log.Info("Failed to acquire lock for timer."); // This happens all of the time
        }
    }
    finally
    {
        if (hasLock)
        {
            Monitor.Exit(timerLock);
        }
    }
}

public void ProcessNewData()
{
    var unproceesedItems = GetDatabaseItems();

    if (unproceesedItems.Count > 0)
    {
        var downloadTasks = new Task[unproceesedItems.Count];
        var maxThreads = new SemaphoreSlim(semaphoreSlimMinMax, semaphoreSlimMinMax); // semaphoreSlimMinMax = 10 is max threads

        for (var i = 0; i < unproceesedItems .Count; i++)
        {
            maxThreads.Wait();
            var iClosure = i;
            downloadTasks[i] =
            Task.Run(async () =>
                {
                    try
                    {
                        await ProcessItemsAsync(unproceesedItems[iClosure]);
                    }
                    catch (Exception ex)
                    {
                        // handle exception
                    }
                    finally
                    {
                        maxThreads.Release();
                    }
                });
        }

        Task.WaitAll(downloadTasks);
    }
}

To improve efficiency, I rewrite the service to run GetDatabaseItems in a separate thread from the rest so that there is a ConcurrentDictionary of unprocessed items between them that GetDatabaseItems fills and ProcessNewData empties.

The problem is that while 10 unprocessed items are send to ProcessItemsAsync, they are processed two at a time instead of all 10.

The code inside of ProcessItemsAsync calls var response = await client.SendAsync(request); where the delay occurs. All 10 threads make it to this code but come out of it two at a time. None of this code changed between the old version and the new.

Here is the code in the new version that did change:

public void Start()
{
    ServicePointManager.DefaultConnectionLimit = maxSimultaneousThreads;  // 10

    // Start getting unprocessed data
    getUnprocessedDataTimer.Interval = getUnprocessedDataInterval; // 5 seconds
    getUnprocessedDataTimer.Elapsed += GetUnprocessedData; // writes data into a ConcurrentDictionary
    getUnprocessedDataTimer.Start();

    cancellationTokenSource = new CancellationTokenSource();

    // Create a new thread to process data
    Task.Factory.StartNew(() =>
       {
           try
           {
               ProcessNewData(cancellationTokenSource.Token);
           }
           catch (Exception ex)
           {
               // error handling
           }
       }, TaskCreationOptions.LongRunning
    );

}

private void ProcessNewData(CancellationToken token)
{
    // Check if task has been canceled.
    while (!token.IsCancellationRequested)
    {
        if (unprocessedDictionary.Count > 0)
        {
            try
            {
                var throttler = new SemaphoreSlim(maxSimultaneousThreads, maxSimultaneousThreads); // maxSimultaneousThreads = 10
                var tasks = unprocessedDictionary.Select(async item =>
                {
                    await throttler.WaitAsync(token);
                    try
                    {
                        if (unprocessedDictionary.TryRemove(item.Key, out var item))
                        {
                            await ProcessItemsAsync(item);
                        }
                    }
                    catch (Exception ex)
                    {
                        // handle error
                    }
                    finally
                    {
                        throttler.Release();
                    }
                });
                Task.WhenAll(tasks);
            }
            catch (OperationCanceledException)
            {
                break;
            }
        }

        Thread.Sleep(1000);
    }
}

Environment

  • .NET Framework 4.7.1
  • Windows Server 2016
  • Visual Studio 2019

Attempts to fix:

I tried the following with the same bad result (two await client.SendAsync(request) completing at a time):

  • Set Max threads and ServicePointManager.DefaultConnectionLimit to 30
  • Manually create threads using Thread.Start()
  • Replace async/await pattern with sync HttpClient calls
  • Call data processing using Task.Run(async () => and Task.WaitAll(downloadTasks);
  • Replace the new long-running thread for ProcessNewData with a timer

What I want is to run GetUnprocessedData and ProcessNewData concurrently with an HttpClient connection limit of 10 (set in config) so that 10 requests are processed at the same time.

Note: the issue is similar to HttpClient.GetAsync executes only 2 requests at a time? but the DefaultConnectionLimit is increased and the service runs on a Windows Server. It also creates more than 2 connections when original code runs.

Update

I went back to the original project to make sure it still worked, it did. I added a new timer to perform some unrelated operations and the httpClient issue came back. I removed the timer, everything worked. I added a new thread to do parallel processing, the problem came back.

2

Answers


  1. Chosen as BEST ANSWER

    The issue turned out to be the place where ServicePointManager.DefaultConnectionLimit is set.

    In the version where HttpClient was only doing two requests at a time, ServicePointManager.DefaultConnectionLimit was being set before the threads were being created but after the HttpClient was initialized.

    Once I moved it into the constructor before the HttpClient is initialized, everything started working.

    Thank you very much to @Theodor Zoulias for the help.

    TLDR; Set ServicePointManager.DefaultConnectionLimit before initializing the HttpClient.


  2. This is not a direct answer to your question, but a suggestion for simplifying your service that could make the debugging of any problem easier. My suggestion is to implement the producer-consumer pattern using an iterator for producing the unprocessed items, and a parallel loop for consuming them. Ideally the parallel loop would have async delegates, but since you are targeting the .NET Framework you don’t have access to the .NET 6 method Parallel.ForEachAsync. So I will suggest the slightly wasteful approach of using a synchronous parallel loop that blocks threads. You could use either the Parallel.ForEach method, or the PLINQ like in the example below:

    private IEnumerable<Item> Iterator(CancellationToken token)
    {
        while (true)
        {
            Task delayTask = Task.Delay(5000, token);
            foreach (Item item in GetDatabaseItems()) yield return item;
            delayTask.GetAwaiter().GetResult();
        }
    }
    
    public void Start()
    {
        //...
    
        ThreadPool.SetMinThreads(degreeOfParallelism, Environment.ProcessorCount);
    
        new Thread(() =>
        {
            try
            {
                Partitioner
                    .Create(Iterator(token), EnumerablePartitionerOptions.NoBuffering)
                    .AsParallel()
                    .WithDegreeOfParallelism(degreeOfParallelism)
                    .WithCancellation(token)
                    .ForAll(item => ProcessItemAsync(item).GetAwaiter().GetResult());
            }
            catch (OperationCanceledException) { } // Ignore
        }).Start();
    }
    

    Online demo.

    The Iterator fetches unprocessed items from the database in batches, and yields them one by one. The database won’t be hit more frequently than once every 5 seconds.

    The PLINQ query is going to fetch a new item from the Iterator each time it has a worker available, according to the WithDegreeOfParallelism policy. The setting EnumerablePartitionerOptions.NoBuffering ensures that it won’t try to fetch more items in advance.

    The ThreadPool.SetMinThreads is used in order to boost the availability of ThreadPool threads, since the PLINQ is going to use lots of them. Without it the ThreadPool will not be able to satisfy the demand immediately, although it will gradually inject more threads and eventually will catch up. But since you already know how many threads you’ll need, you can configure the ThreadPool from the start.

    In case you dislike the idea of blocking threads, you can find a simple substitute of the Parallel.ForEachAsync here, based on the TPL Dataflow library. It requires installing a NuGet package.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search