I have a simple Spring Cloud Azure Service Bus consumer defined as simply as follows, following the official documentation:
@Bean
public Consumer<Message<byte[]>> consume() {
return message -> someProcessing(message);
}
The default policy, if the spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.requeue-rejected
flag is false
, is that messages that fail during consummation are requeued on the main queue for MaxDeliveryCount
times, before going to the DLQ. I tested this behaviour and it consistently works – if, for example, I update the code to
@Bean
public Consumer<Message<byte[]>> consume() {
return message -> {
someProcessing(message);
throw new IllegalStateException();
}
}
the message gets requeued as expected.
If I bump the number of consecutive calls, however, by setting spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.max-concurrent-calls=2
, the behaviour becomes undefined. Some times it abandons the messages altogether, other times it requeues them in the main queue. I don’t understand why the behaviour is not consistent, i.e., the messages are always requeued. Part of the stack trace is as following.
2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@71c11ee9]; nested exception is java.lang.IllegalStateException, failedMessage=GenericMessage [payload=byte[1948], headers={azure_service_bus_expires_at=2023-06-11T11:29:47.566Z, OwnedBy=owner, azure_service_bus_received_message_context=com.azure.messaging.servicebus.ServiceBusReceivedMessageContext@780ad3f2, azure_service_bus_message_id=ede22e431e0642ee9d3be263b3989c77, azure_service_bus_enqueued_sequence_number=132449, azure_service_bus_enqueued_time=2023-05-12T11:29:47.566Z, azure_service_bus_lock_token=c2167e9a-b6f5-4aed-b390-0f9714caf2a9, azure_service_bus_sequence_number=6969, azure_service_bus_time_to_live=PT720H, azure_service_bus_delivery_count=0, azure_service_bus_state=ACTIVE, id=fd556d60-a851-e39c-bca0-59e90d83722d, contentType=application/json, azure_service_bus_locked_until=2023-05-26T09:04:47.840Z, timestamp=1685091827953}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException
at com.subscriptions.Consumer.lambda$consume$0(Consumer.java:70)
at com.helper.AnonymousAuthenticationRunner.runAnonymously(AnonymousAuthenticationRunner.java:24)
at com.subscriptions.Consumer.lambda$consume$1(Consumer.java:50)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1007)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:742)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:588)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:791)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:623)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 25 more
2023-05-26 12:03:47.955 ERROR 52176 --- [undedElastic-12] .s.i.s.i.ServiceBusInboundChannelAdapter : Error in the operation USER_CALLBACK occurred on entity service/subscriptions/Subscription. Error: {}
com.azure.messaging.servicebus.ServiceBusException: failed to send Message to channel 'service.Subscription.errors'; nested exception is com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:387)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'service.Subscription.errors'; nested exception is com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:262)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:219)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
... 10 common frames omitted
Caused by: com.azure.messaging.servicebus.ServiceBusException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
at com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.lambda$updateDisposition$52(ServiceBusReceiverAsyncClient.java:1480)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3733)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:340)
at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onError(MonoCacheTime.java:363)
at reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:201)
at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries$DispositionWork.onComplete(ReceiverUnsettledDeliveries.java:736)
at com.azure.core.amqp.implementation.handler.ReceiverUnsettledDeliveries.lambda$sendDispositionImpl$3(ReceiverUnsettledDeliveries.java:334)
at com.azure.core.amqp.implementation.handler.DispatchHandler.onTimerTask(DispatchHandler.java:32)
at com.azure.core.amqp.implementation.ReactorDispatcher$WorkScheduler.run(ReactorDispatcher.java:207)
at org.apache.qpid.proton.reactor.impl.SelectableImpl.readable(SelectableImpl.java:118)
at org.apache.qpid.proton.reactor.impl.IOHandler.handleQuiesced(IOHandler.java:61)
at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:390)
at org.apache.qpid.proton.engine.BaseHandler.onReactorQuiesced(BaseHandler.java:87)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:206)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:292)
at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
... 5 common frames omitted
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:65)
at com.azure.spring.cloud.stream.binder.servicebus.ServiceBusMessageChannelBinder.abandon(ServiceBusMessageChannelBinder.java:230)
at com.azure.spring.cloud.stream.binder.servicebus.ServiceBusMessageChannelBinder.lambda$getErrorMessageHandler$1(ServiceBusMessageChannelBinder.java:189)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:262)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:219)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter.access$600(ServiceBusInboundChannelAdapter.java:73)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:201)
at com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter$IntegrationRecordMessageListener.onMessage(ServiceBusInboundChannelAdapter.java:186)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:383)
at com.azure.messaging.servicebus.ServiceBusProcessorClient$1.onNext(ServiceBusProcessorClient.java:362)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
... 5 common frames omitted
Caused by: com.azure.core.amqp.exception.AmqpException: A disposition requested earlier is waiting for the broker's ack; a new disposition request is not allowed.
... 19 common frames omitted
I noticed that setting spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
makes messages be delivered correctly, as I am checkpointing the messages manually. Could the issue be from somewhere around that?
2
Answers
so, it looks like you have concurrent messages? if a few fail at the same time, it can be hard to understand the issue and fix it.
I’m going to give this a try for you…but I hate java.
Question 1. have you tried messageContent.abandon()? next, i would find a video breaking down concurrent flows. My limited understanding of the logic is it helps with speed and scale. It is weak at providing and order to the process for a human lol. I understand it like set() vs array in programing python (the language I know best). concurrency will split the load across CPU’s in an order that will increase the speed of processing the data over all. it does this by not repeating its self using a data structure like or better than the set() function in python.
(here is a video i watch to research the problem: https://www.youtube.com/watch?v=0LQPNjjfolk)
I speed up looping through data by using set() over arrays. This is because only unique values can go in set() and a true or false check is quicker than indexing the data. It also saves memory in some languages too.
if you are processing many concurrent events, you would want to design an approach similar to set().
my understanding of service bus consumers, when you use auto-complete and its set to true if an exception is thrown the message is considered failed and is automatically abandon and requeued.
the system needs a broker acknowledgment. it may never get it during concurrent processing. Logging this kind of exception is a must in Networking for IT systems. as an ex-support engineer i remember many hard days analysis Unidirectional data flow with packet capture:(
If this is so, you should try messageContent.abandon(). This will let you further process even with concurrent message processing.
I would recommend reviewing best practice architecture for service bus:
https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers
https://learn.microsoft.com/en-us/azure/well-architected/services/messaging/service-bus/operational-excellence
I would also recommend planning testing your system. by thinking about how you will test your system, you can better, plan, research and solve new issues in house:). I got to a solution by review best practices for java and service bus architecture (the video i shared), then i looked at the Microsoft documentation for service bus, then I thought of the logic in python (because i hate java 🙂
I am in my first year at Microsoft as an Azure Cloud Architect. Thank you for choosing Microsoft and please let me know your feedback. I will update this post if you have any questions.
Update:
As i contune to read through the documentation it looks like we may have a solution for your use case: PeekLock.
https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers
Please tell me if the above fit your use case.
Final update (I found you a solution in java :):
according to the MS Java SDK documentation here is example code for using PeekLock in java:
docs: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-m
code:
send messages
Receive messages:
The above is an example for the java SDK. Here is a questions in relation to Spring Java and PeekLock:
For the Azure servicebus jms spring boot starter, is there a properties setting to requeue messages if the processing fails? – see here about PeekLock with spring java
According to the Javadoc there : https://github.com/Azure/azure-sdk-for-java/blob/fd4ed4402038bf529d02642ed037a4669b396f1a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ServiceBusReceiveMode.java#L50
when you use the default "ReceiveMode", you are supposed to let know the service what you want it to do with the message if you fail to process it.
So, I think @BlackFox is right even if the API he’s using is not the same as yours. You cannot just throw an exception, you must tell to the azure bus what to do with the message. Until you tell it what to do, the message is locked.