Explicit questions in a bulleted list at the end
I am creating a new ASP.NET solution that will be deployed as a microservice to handle processing and storing audits for transactions in our system. The data that needs to be audited is being placed on an Azure Service Bus queue and I would like the service I’m developing to continuously process those requests as they come in. To achieve this I am creating a BackgroundService that, when started, will run continuously and process messages that enter the queue. I started with a simple one queue case and the code looked like the below.
Program.cs (entrypoint/startup code)
using System.Data.SqlClient;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Azure;
var builder = WebApplication.CreateBuilder(args);
// Various builder initializations that are not relevant for this post
builder.Services.AddAzureClients(clientBuilder =>
{
clientBuilder.AddServiceBusClient("<AzureServiceBusConnectionString>").WithName("queueName");
});
//QueueProcessorService is the custom background service I created
builder.Services.AddHostedService<QueueProcessorService>();
// We have a custom extension for the web app builder that builds/runs
builder.CustomRunWebApi();
QueueProcessorService
public class QueueProcessorService : BackgroundService
{
private readonly List<ServiceBusProcessor> _queueProcessors = new();
public QueueProcessorService(IAzureClientFactory<ServiceBusClient> serviceBusClientFactory)
{
// I have tried various options for this and it performs about the same everytime
var processorOptions = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 20,
PrefetchCount = 50,
};
var client = serviceBusClientFactory.CreateClient("queue1");
var processor = client.CreateProcessor("queue1", processorOptions);
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
processor.StartProcessingAsync();
_queueProcessors.Add(processor);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Through testing and messing around I need to yield here so the web api builds and runs as expected.
await Task.Delay(1, stoppingToken);
stoppingToken.Register(async () =>
{
foreach (var processor in _queueProcessors)
{
await processor.DisposeAsync();
}
});
while (!stoppingToken.IsCancellationRequested)
{
continue;
}
return;
}
private static async Task MessageHandler(ProcessMessageEventArgs args)
{
Console.WriteLine($"Starting processing message for queue {args.EntityPath}");
try
{
var body = args.Message.Body.ToString();
var claimJson = JObject.Parse(body);
// Process JSON
// Eventually this will have business logic but for testing it's doing nothing
Console.WriteLine($"Completed processing message for queue {args.EntityPath}");
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
private static Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
I ran some simple test cases on the above and it worked fine. I then wanted to figure out how to appropriately handle the scale of our transactions. We have fewer than 70 customers each that has their own isolated system that processes transactions. I would like to develop this to be multi-tenant so I would like to use a shared Service Bus. For logical separation I was leaning towards one queue per customer and simply instantiating one processor per queue. Here’s how I tried to implement the one queue per customer in my code.
I made this change in Program.cs to add multiple clients. Note the connection string is the same, is this actually creating multiple instances? Based on this article I am trying to create one client per queue. Additionally that article mentions to use multiple factories, but I don’t really get the benefit to creating a separate factory versus a separate instance.
builder.Services.AddAzureClients(clientBuilder =>
{
// QueueNames is just a list of queue names
foreach (var queueName in QueueConfiguration.QueueNames)
{
clientBuilder.AddServiceBusClient("<ConnectionString>").WithName(queueName);
}
});
I made the following changes to the QueueProcessorService constructor
public QueueProcessorService(IAzureClientFactory<ServiceBusClient> serviceBusClientFactory)
{
foreach (var queueName in QueueConfiguration.QueueNames)
{
var processorOptions = new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 20,
PrefetchCount = 10,
};
var client = serviceBusClientFactory.CreateClient(queueName);
var processor = client.CreateProcessor(queueName, processorOptions);
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
processor.StartProcessingAsync();
_queueProcessors.Add(processor);
}
}
I created a simple script which runs for a configurable amount of seconds and just sends messages to random queues within QueueNames
and it takes a significant amount of time after that script finishes to fully empty all the queues. For reference, I was testing with 50 queues and this is a Standard tier service bus. We will use a Premium in production, but I wanted to ensure that my code/structure is sound and with what I am seeing I do not think that it is sound, hence this post.
I have a few questions after this experiment:
- Am I simply trying to process too many queues in the background service? I know I could probably handle this all in one queue, but I like the logical separation and it’s far below the limit provided by Azure Service Bus so I would have thought that would not be an issue
- Is there a more appropriate way to configure and initialize the N queue processors on startup? Based on the Learn article I linked previously I was trying to create one client per queue. Additionally I followed this post for my approach. However, I know that post the setup is different in that each client has a different namespace/connection-string.
- Is this performance issue likely due to using a standard tier Azure Service Bus? I doubt it. For reference, the script I was using was not an exceptionally high throughput. It was anywhere from 440 to 600 messages in 30 seconds. When it was done processing there were anywhere from 130-200 messages spread across the queues and it took about a minute to ninety seconds to process the rest of them.
- I have tried tweaking some of the ServiceBusProcessorOptions but it doesn’t seem to magically fix the issue. I followed the general advice of the article I linked earlier in my post, but is there some other advice or input that could help me here?
My thought is that either I am not getting the right client/queue granularity during DI and/or my configuration is off. Thanks for any help, and if I can clarify anything or more details are needed please let me know.
2
Answers
You can find answer of you asked question below:
Handling multiple queues is feasible and shouldn’t inherently cause issues, especially since you’re well within Azure’s limits. The logical separation per customer is a good practice in a multi-tenant environment for both security and organization. However, managing a large number of queues can introduce complexity and potential bottlenecks, depending on how the processing is implemented and the resources available.
Your approach to create one client per queue is aligned with best practices for isolation and scalability. However, make sure that each processor instance is efficiently using resources and not leading to resource contention. The standard practice is to have a client per queue for better isolation and control.Regarding multiple factories versus multiple instances, the main benefit of multiple factories would be in scenarios where you need distinctly different configurations or lifecycles for the clients they create.
The performance issue you’re experiencing might not be solely due to using the standard tier of Azure Service Bus, though the premium tier offers better throughput and more features that might be beneficial for high load scenarios. It’s essential to analyze the performance metrics in the context of the resources and limits of the standard tier. The processing delay could be due to various factors including the way the Service Bus is being interacted with, network latency, or resource constraints on the processing side.
Tweaking
ServiceBusProcessorOptions
can impact performance, but it’s not a guaranteed fix. It’s crucial to find the right balance inMaxConcurrentCalls
andPrefetchCount
to optimize throughput without overwhelming your service. Monitor the metrics and adjust these settings based on the actual load and performance. Sometimes, less aggressive settings can lead to more stable and predictable performance, especially under varying load conditions.How many CPU cores does your host machine have, and what else is running on the machine? From your description, you’re creating 70 processors each of which uses a persistent background task, and each will launch an additional 20 concurrent processing tasks.
So, you’ve got 1,470 background tasks competing for resources on the machine at all times – which is very likely too much work for a single machine.
Factories or factories?
The article uses the term "factory" because the legacy packages had this concept for pooling connections. This has been replaced in
Azure.Messaging.ServiceBus
byServiceBusClient
which performs the same function.This factory that you’re seeing in the DI registration is not the same concept as in your referenced article. That is used to allow named registrations, as prior to .NET 8, the built-in DI container did not natively support the concept.
To DI or not to DI
There is inherently no difference to you creating the client manually and caching it in
_queueProcessors
versus using DI with named clients. However, you don’t want to do both, as DI explicitly owns the client and manages its lifetime when used and will dispose the clients on your behalf.Execute is heavily CPU-bound
One other thing that I’d call attention to is that this implementation is going to hurt your overall performance in a non-trivial way. This loop is going to spin quickly and hammer your CPU.
I’d suggest restructuring to the following, which keeps the method from completing until the token is signaled, but in a paused state that isn’t resource-intensive: