skip to Main Content

I am using Azure.Messaging.ServiceBus nuget package to work with Azure service bus. We have created a topic and a subscription. The subscription has 100+ messages. We want to read all the message and continue to read message as they arrive.

Microsoft.Azure.ServiceBus package (deprecated now) provided RegisterMessageHandler which use to process every incoming message. I am not able to find similar option under Azure.Messaging.ServiceBus nuget package.

I am able to read one message at a time but I have to call await receiver.ReceiveMessageAsync(); every time manually.

2

Answers


  1. Chosen as BEST ANSWER

    As suggested by @gaurav Mantri, I used ServiceBusProcessor class to implement event based model for processing messages

     public async Task ReceiveAll()
        {
            string connectionString = "Endpoint=sb://sb-test-today.servicebus.windows.net/;SharedAccessKeyName=manage;SharedAccessKey=8e+6SWp3skB3Aedsadsadasdwz5DU=;";
            string topicName = "topicone";
            string subscriptionName = "subone";
            await using var client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
            {
                TransportType = ServiceBusTransportType.AmqpWebSockets
            });
    
            var options = new ServiceBusProcessorOptions
            {
                // By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
                // Set AutoCompleteMessages to false to [settle messages](https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
                // In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
                AutoCompleteMessages = false,
    
                // I can also allow for multi-threading
                MaxConcurrentCalls = 1
            };
    
            await using ServiceBusProcessor processor = client.CreateProcessor(topicName, subscriptionName, options);
    
            processor.ProcessMessageAsync += MessageHandler;
            processor.ProcessErrorAsync += ErrorHandler;
    
            await processor.StartProcessingAsync();
    
            Console.ReadKey();
        }
    
    
        public async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine(body);
    
            // we can evaluate application logic and use that to determine how to settle the message.
            await args.CompleteMessageAsync(args.Message);
        }
    
        public Task ErrorHandler(ProcessErrorEventArgs args)
        {
            // the error source tells me at what point in the processing an error occurred
            Console.WriteLine(args.ErrorSource);
            // the fully qualified namespace is available
            Console.WriteLine(args.FullyQualifiedNamespace);
            // as well as the entity path
            Console.WriteLine(args.EntityPath);
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    

  2. To receive multiple messages (a batch), you should use ServiceBusReceiver.ReceiveMessagesAsync() (not plural, not singular ‘message’). This method will return whatever number of messages it can send back. To ensure you retrieve all 100+ messages, you’ll need to loop until no messages are available.

    If you’d like to use a processor, that’s also available in the new SDK. See my answer to a similar question here.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search