skip to Main Content

I’m trying to process multiple messages, from a queue with sessions enabled, in parallel. I’ve tried setting MaxConcurrentCallsPerSession to 5 for example but I am still receiving 1 message at a time.

I wrote a console application to demonstrate what I’m trying to do:

static void Main()
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            //create the queue
            await CreateQueue();

            //initialize queue client
            ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
            {
                TransportType = ServiceBusTransportType.AmqpWebSockets,
            });

            //initialize the sender
            ServiceBusSender sender = queueClient.CreateSender(_queueName);

            //queue 3 messages
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "1" });
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "2" });
            await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = _sessionId, MessageId = "3" });

            //initialize processor
            ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
            {
                AutoCompleteMessages = false,
                ReceiveMode = ServiceBusReceiveMode.PeekLock,
                SessionIds = { _sessionId },
                PrefetchCount = 5,
                MaxConcurrentCallsPerSession = 5
            });

            //add message handler
            processor.ProcessMessageAsync += HandleReceivedMessage;

            //add error handler
            processor.ProcessErrorAsync += ErrorHandler;

            //start the processor
            await processor.StartProcessingAsync();

            Console.ReadLine();
        }

        static async Task CreateQueue()
        {
            ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);

            bool doesQueueExist = await client.QueueExistsAsync(_queueName);

            //check if the queue exists, if not then create one
            if (!doesQueueExist)
            {
                _ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
                {
                    RequiresSession = true,
                    DeadLetteringOnMessageExpiration = true,
                    MaxDeliveryCount = 3,
                    EnableBatchedOperations = true,
                });
            }
        }

        static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
        {
            Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);

            await Task.Delay(5000).ConfigureAwait(false);

            await sessionMessage.CompleteMessageAsync(sessionMessage.Message);

            Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
        }

        static Task ErrorHandler(ProcessErrorEventArgs e)
        {
            Console.WriteLine("Error received");

            return Task.CompletedTask;
        }

When executing the program, what I expect to receive is:

Received message: 1
Received message: 2
Received message: 3
Completed message: 1
Completed message: 2
Completed message: 3

But what I am getting is:

Received message: 1
Completed message: 1
Received message: 2
Completed message: 2
Received message: 3
Completed message: 3

Is what I am trying to achieve possible please?

I am using .NetFramework 4.7.2 and Azure.Messaging.ServiceBus 7.17.4

2

Answers


  1. I tried the code below with .Net Framework 4.7.2 and Azure.Messaging.ServiceBus 7.1.7.4 package to receive messages from the Azure Service Bus Queue.

    Code :

    using Azure.Messaging.ServiceBus;
    using Azure.Messaging.ServiceBus.Administration;
    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    namespace ConsoleApp75
    {
        class Program
        {
            private const string _serviceBusConnectionString = "<ServiceBus_QueueConne_String>";
            private const string _queueName = "<queue_name>";
    
            static async Task MainAsync()
            {
                await CreateQueue();
                ServiceBusClient queueClient = new ServiceBusClient(_serviceBusConnectionString, new ServiceBusClientOptions
                {
                    TransportType = ServiceBusTransportType.AmqpWebSockets,
                });
    
                ServiceBusSender sender = queueClient.CreateSender(_queueName);
                var processors = new List<ServiceBusSessionProcessor>();
    
                try
                {
                    var processorTasks = new List<Task>();
                    for (int i = 0; i < 3; i++)
                    {
                        ServiceBusSessionProcessor processor = queueClient.CreateSessionProcessor(_queueName, new ServiceBusSessionProcessorOptions()
                        {
                            AutoCompleteMessages = false,
                            ReceiveMode = ServiceBusReceiveMode.PeekLock,
                            SessionIds = { (i + 1).ToString() }, 
                            PrefetchCount = 5,
                            MaxConcurrentCallsPerSession = 1 
                        });
    
                        processor.ProcessMessageAsync += async args =>
                        {
                            await HandleReceivedMessage(args);
                        };
                        processor.ProcessErrorAsync += ErrorHandler;
                        processors.Add(processor);
                        processorTasks.Add(processor.StartProcessingAsync());
                    }
                    await Task.WhenAll(processorTasks);
    
                    await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "1", MessageId = "1" });
                    await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "2", MessageId = "2" });
                    await sender.SendMessageAsync(new ServiceBusMessage() { SessionId = "3", MessageId = "3" });
    
                    Console.ReadLine();
                }
                finally
                {
                    foreach (var processor in processors)
                    {
                        await processor.StopProcessingAsync();
                    }
                }
            }
    
            static async Task CreateQueue()
            {
                ServiceBusAdministrationClient client = new ServiceBusAdministrationClient(_serviceBusConnectionString);
                bool doesQueueExist = await client.QueueExistsAsync(_queueName);
                if (!doesQueueExist)
                {
                    _ = await client.CreateQueueAsync(new CreateQueueOptions(_queueName)
                    {
                        RequiresSession = true, 
                        DeadLetteringOnMessageExpiration = true,
                        MaxDeliveryCount = 3,
                        EnableBatchedOperations = true,
                    });
                }
            }
    
            static async Task HandleReceivedMessage(ProcessSessionMessageEventArgs sessionMessage)
            {
                Console.WriteLine("Received message: " + sessionMessage.Message.MessageId);
                await Task.Delay(5000).ConfigureAwait(false);
                await sessionMessage.CompleteMessageAsync(sessionMessage.Message);
                Console.WriteLine("Completed message: " + sessionMessage.Message.MessageId);
            }
    
            static Task ErrorHandler(ProcessErrorEventArgs e)
            {
                Console.WriteLine($"Error received. Error source: {e.ErrorSource}, Exception: {e.Exception}");
                return Task.CompletedTask;
            }
    
            static void Main(string[] args)
            {
                MainAsync().Wait();
            }
        }
    }
    

    Output :

    The following code ran successfully, and I received the exact messages at the output as shown below.

    enter image description here

    Received message: 1
    Received message: 2
    Received message: 3
    Completed message: 1
    Completed message: 2
    Completed message: 3
    
    Login or Signup to reply.
  2. A Service Bus when using sessions enforces FIFO (First-In First-Out) processing order. Thus it cannot start processing message 2 until message 1 is marked completed.

    This page https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions has this excerpt:

    The session lock held by the session receiver is an umbrella for the
    message locks used by the peek-lock settlement mode. Only one receiver
    can have a lock on a session. A receiver might have many in-flight
    messages, but the messages are received in order. Abandoning a message
    causes the same message to be served again with the next receive
    operation.

    That I interpret to mean that it cannot start processing message 2 until 1 has been marked complete because if processing message 1 fails it will still be the first message on the queue. That means for a queue using sessions the receiver cannot process the messages in parallel. If you want parallel processing then you should not use sessions – or give each message a unique session as proposed in the answer by Dasari Kamili.

    You could get the processing you wanted while using sessions be Completing a message as soon as it is retrieved from the queue. But then you need to handle a failure to process the message because it has already been completed before you get to that step.

    You need to ask yourself why you are using sessions and how you want to process the messages, you may need to adjust your process to accomplish your goal.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search