I’m trying to process multiple messages, from a queue with sessions enabled, in parallel. I’ve tried setting MaxConcurrentCallsPerSession to 5 for example but I am still receiving 1 message at a time.
I wrote a console application to demonstrate what I’m trying to do:
static void Main()
{
MainAsync().Wait();
}
static async Task MainAsync()
{
//create the queue
await CreateQueue();
//initialize queue client
ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
});
//initialize the sender
ServiceBusSender sender = queueClient.CreateSender(_queueName);
//queue 3 messages
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "1" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "2" });
await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "3" });
//initialize processor
ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
{
AutoCompleteMessages = false,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
SessionIds = { _sessionId },
PrefetchCount = 5,
MaxConcurrentCallsPerSession = 5
});
//add message handler
processor.ProcessMessageAsync += HandleReceivedMessage;
//add error handler
processor.ProcessErrorAsync += ErrorHandler;
//start the processor
await processor.StartProcessingAsync();
Console.ReadLine();
}
static async Task CreateQueue()
{
ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);
bool doesQueueExist = await client.QueueExistsAsync(_queueName);
//check if the queue exists, if not then create one
if (!doesQueueExist)
{
_ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
{
RequiresSession = true,
DeadLetteringOnMessageExpiration = true,
MaxDeliveryCount = 3,
EnableBatchedOperations = true,
});
}
}
static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
{
Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);
await Task.Delay(5000).ConfigureAwait(false);
await sessionMessage.CompleteMessageAsync(sessionMessage.Message);
Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
}
static Task ErrorHandler(ProcessErrorEventArgs e)
{
Console.WriteLine("Error received");
return Task.CompletedTask;
}
When executing the program, what I expect to receive is:
Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3
But what I am getting is:
Received message: 1
Completed message: 1
Received message: 2
Completed message: 2
Received message: 3
Completed message: 3
Is what I am trying to achieve possible please?
I am using .NetFramework 4.7.2 and Azure.Messaging.ServiceBus 7.17.4
2
Answers
I tried the code below with .Net Framework 4.7.2 and Azure.Messaging.ServiceBus 7.1.7.4 package to receive messages from the Azure Service Bus Queue.
Code :
Output :
The following code ran successfully, and I received the exact messages at the output as shown below.
A Service Bus when using sessions enforces FIFO (First-In First-Out) processing order. Thus it cannot start processing message 2 until message 1 is marked completed.
This page https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions has this excerpt:
That I interpret to mean that it cannot start processing message 2 until 1 has been marked complete because if processing message 1 fails it will still be the first message on the queue. That means for a queue using sessions the receiver cannot process the messages in parallel. If you want parallel processing then you should not use sessions – or give each message a unique session as proposed in the answer by Dasari Kamili.
You could get the processing you wanted while using sessions be Completing a message as soon as it is retrieved from the queue. But then you need to handle a failure to process the message because it has already been completed before you get to that step.
You need to ask yourself why you are using sessions and how you want to process the messages, you may need to adjust your process to accomplish your goal.