skip to Main Content

I have a simple Spring Cloud Azure Service Bus consumer defined as simply as follows, following the official documentation:

    @Bean
    public Consumer<Message<byte[]>> consume() {
        return message -> someProcessing(message);
    }

The default policy, if the spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.requeue-rejected flag is false, is that messages that fail during consummation are requeued on the main queue for MaxDeliveryCount times, before going to the DLQ. I tested this behaviour and it consistently works – if, for example, I update the code to

    @Bean
    public Consumer<Message<byte[]>> consume() {
        return message -> {
           someProcessing(message);
           throw new IllegalStateException();
        }
    }

the message gets requeued as expected.

If I bump the number of consecutive calls, however, by setting spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.max-concurrent-calls=2, the behaviour becomes undefined. Some times it abandons the messages altogether, other times it requeues them in the main queue. I don’t understand why the behaviour is not consistent, i.e., the messages are always requeued. Part of the stack trace is as following.

2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@71c11ee9]; nested exception is java.lang.IllegalStateException, failedMessage=GenericMessage [payload=byte[1948], headers={azure_service_bus_expires_at=2023-06-11T11:29:47.566Z, OwnedBy=owner, azure_service_bus_received_message_context=com.azure.messaging.servicebus.ServiceBusReceivedMessageContext@780ad3f2, azure_service_bus_message_id=ede22e431e0642ee9d3be263b3989c77, azure_service_bus_enqueued_sequence_number=132449, azure_service_bus_enqueued_time=2023-05-12T11:29:47.566Z, azure_service_bus_lock_token=c2167e9a-b6f5-4aed-b390-0f9714caf2a9, azure_service_bus_sequence_number=6969, azure_service_bus_time_to_live=PT720H, azure_service_bus_delivery_count=0, azure_service_bus_state=ACTIVE, id=fd556d60-a851-e39c-bca0-59e90d83722d, contentType=application/json, azure_service_bus_locked_until=2023-05-26T09:04:47.840Z, timestamp=1685091827953}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
    at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
    at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
    at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
    at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
    at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException
    at com.subscriptions.Consumer.lambda$consume$0(Consumer.java:70)
    at com.helper.AnonymousAuthenticationRunner.runAnonymously(AnonymousAuthenticationRunner.java:24)
    at com.subscriptions.Consumer.lambda$consume$1(Consumer.java:50)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1007)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:742)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:588)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:791)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:623)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 25 more

2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] .s.i.s.i.ServiceBusInboundChannelAdapter : Error in the operation USER_CALLBACK occurred on entity service/subscriptions/Subscription. Error: {}

com.azure.messaging.servicebus.ServiceBusException: failed to send Message to channel 'service.Subscription.errors'; nested exception is com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
    at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:387)
    at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'service.Subscription.errors'; nested exception is com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:262)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:219)
    at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
    at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
    at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
    at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
    ... 10 common frames omitted
Caused by: com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
    at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$52(ServiceBusReceiverAsyncClient.java:1480)
    at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3733)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
    at reactor.core.publisher.Operators.error(Operators.java:198)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863)
    at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:340)
    at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onError(MonoCacheTime.java:363)
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
    at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork.onComplete(ReceiverUnsettledDeliveries.java:736)
    at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries.lambda$sendDispositionImpl$3(ReceiverUnsettledDeliveries.java:334)
    at com.azure.core.amqp.implementation.handler.DispatchHandler.onTimerTask(DispatchHandler.java:32)
    at com.azure.core.amqp.implementation.ReactorDispatcher$WorkScheduler.run(ReactorDispatcher.java:207)
    at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118)
    at org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61)
    at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:390)
    at org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)
    at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    ... 5 common frames omitted
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Mono.block(Mono.java:1707)
        at com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:65)
        at com.azure.spring.cloud.stream.binder.servicebus.ServiceBusMessageChannelBinder.abandon(ServiceBusMessageChannelBinder.java:230)
        at com.azure.spring.cloud.stream.binder.servicebus.ServiceBusMessageChannelBinder.lambda$getErrorMessageHandler$1(ServiceBusMessageChannelBinder.java:189)
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
        at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:262)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:219)
        at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
        at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
        at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
        at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
        at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        ... 5 common frames omitted
Caused by: com.azure.core.amqp.exception.AmqpException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
    ... 19 common frames omitted

I noticed that setting spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false makes messages be delivered correctly, as I am checkpointing the messages manually. Could the issue be from somewhere around that?

2

Answers


  1. so, it looks like you have concurrent messages? if a few fail at the same time, it can be hard to understand the issue and fix it.

    I’m going to give this a try for you…but I hate java.

    Question 1. have you tried messageContent.abandon()? next, i would find a video breaking down concurrent flows. My limited understanding of the logic is it helps with speed and scale. It is weak at providing and order to the process for a human lol. I understand it like set() vs array in programing python (the language I know best). concurrency will split the load across CPU’s in an order that will increase the speed of processing the data over all. it does this by not repeating its self using a data structure like or better than the set() function in python.

    (here is a video i watch to research the problem: https://www.youtube.com/watch?v=0LQPNjjfolk)

    I speed up looping through data by using set() over arrays. This is because only unique values can go in set() and a true or false check is quicker than indexing the data. It also saves memory in some languages too.

    if you are processing many concurrent events, you would want to design an approach similar to set().

    my understanding of service bus consumers, when you use auto-complete and its set to true if an exception is thrown the message is considered failed and is automatically abandon and requeued.

    the system needs a broker acknowledgment. it may never get it during concurrent processing. Logging this kind of exception is a must in Networking for IT systems. as an ex-support engineer i remember many hard days analysis Unidirectional data flow with packet capture:(

    If this is so, you should try messageContent.abandon(). This will let you further process even with concurrent message processing.

    I would recommend reviewing best practice architecture for service bus:

    1. https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers

    2. https://learn.microsoft.com/en-us/azure/well-architected/services/messaging/service-bus/operational-excellence

    I would also recommend planning testing your system. by thinking about how you will test your system, you can better, plan, research and solve new issues in house:). I got to a solution by review best practices for java and service bus architecture (the video i shared), then i looked at the Microsoft documentation for service bus, then I thought of the logic in python (because i hate java 🙂

    I am in my first year at Microsoft as an Azure Cloud Architect. Thank you for choosing Microsoft and please let me know your feedback. I will update this post if you have any questions.

    Update:

    As i contune to read through the documentation it looks like we may have a solution for your use case: PeekLock.

    https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers

    Please tell me if the above fit your use case.

    Final update (I found you a solution in java :):

    according to the MS Java SDK documentation here is example code for using PeekLock in java:

    docs: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-m

    code:

    send messages

    ServiceBusSenderClient sender = new ServiceBusClientBuilder()
        .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
        .sender()
        .queueName("<< QUEUE NAME >>")
        .buildClient();
    List<ServiceBusMessage> messages = Arrays.asList(
        new ServiceBusMessage("Hello world").setMessageId("1"),
        new ServiceBusMessage("Bonjour").setMessageId("2"));
    
    sender.sendMessages(messages);
    
    // When you are done using the sender, dispose of it.
    sender. Close();
    

    Receive messages:

     // Sample code that processes a single message which is received in PeekLock mode.
    Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
        final ServiceBusReceivedMessage message = context.getMessage();
        // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
        // handling message reaches desired state such that it doesn't require Service Bus to redeliver
        // the same message, then context.complete() should be called otherwise context.abandon().
        final boolean success = Math.random() < 0.5;
        if (success) {
            try {
                context.complete();
            } catch (Exception completionError) {
                System.out.printf("Completion of the message %s failedn", message.getMessageId());
                completionError.printStackTrace();
            }
        } else {
            try {
                context.abandon();
            } catch (Exception abandonError) {
                System.out.printf("Abandoning of the message %s failedn", message.getMessageId());
                abandonError.printStackTrace();
            }
        }
    };
    
    // Sample code that gets called if there's an error
    Consumer<ServiceBusErrorContext> processError = errorContext -> {
        System.err.println("Error occurred while receiving message: " + errorContext.getException());
    };
    
    // create the processor client via the builder and its sub-builder
    ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                                    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
                                    .processor()
                                    .queueName("<< QUEUE NAME >>")
                                    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                                    .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
                                    .processMessage(processMessage)
                                    .processError(processError)
                                    .disableAutoComplete()
                                    .buildProcessorClient();
    
    // Starts the processor in the background and returns immediately
    processorClient.start();
    

    The above is an example for the java SDK. Here is a questions in relation to Spring Java and PeekLock:

    For the Azure servicebus jms spring boot starter, is there a properties setting to requeue messages if the processing fails? – see here about PeekLock with spring java

    Login or Signup to reply.
  2. According to the Javadoc there : https://github.com/Azure/azure-sdk-for-java/blob/fd4ed4402038bf529d02642ed037a4669b396f1a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java#L50

    when you use the default "ReceiveMode", you are supposed to let know the service what you want it to do with the message if you fail to process it.

    So, I think @BlackFox is right even if the API he’s using is not the same as yours. You cannot just throw an exception, you must tell to the azure bus what to do with the message. Until you tell it what to do, the message is locked.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search