What I want to Achieve – Azure Service Bus Message Listener to start / stop receiving messages from queue/topic.
Below is a detailed explanation.
Currently I have integrated Azure Service Bus in my application and we listen message as soon as spring boot application starts. Now I want to modify this logic. By default Azure Service Bus Message Listener will be disable. On ApplicationReadyEvent
I want to perform some task and after that again I want to enable Azure Service Bus Message Listener to start listening from topic or queue.
So how can I achieve that ?
application.yml
spring:
cloud:
azure:
servicebus:
namespace: **********
xxx:
azure:
servicebus:
connection: ***********
queue: **********
AzureConfiguration.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
@Configuration
public class AzureConfiguration{
@Value("${xxx.azure.servicebus.connection}")
private String serviceBusConnection;
@Value("${xxx.azure.servicebus.queue}")
private String serviceBusQueue;
private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
private static final String SENSOR_DATA_CHANNEL = "zzzzz";
private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";
@Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {
ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
containerProperties.setConnectionString(serviceBusConnection);
containerProperties.setEntityName(serviceBusQueue);
containerProperties.setAutoComplete(true);
return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
}
@Bean
public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
@Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
@Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean(name = SERVICE_BUS_INPUT_CHANNEL)
public MessageChannel serviceBusInputChannel() {
return new DirectChannel();
}
@Bean(name = SENSOR_DATA_CHANNEL)
public MessageChannel sensorDataChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow serviceBusMessageFlow() {
return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
.<byte[], String>transform(String::new)
.channel(SENSOR_DATA_CHANNEL)
.get();
}
}
AppEventListenerService.java
import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{
@EventListener(ApplicationReadyEvent.class)
public void OnApplicationStarted() {
log.debug("Enter OnApplicationStarted");
// By Default Azure Service Bus Message Listener will be disable
// do some task
// Enable Azure Bus Message Listener
log.debug("Exit OnApplicationStarted");
}
}
In above code in AppEventListenerService.java ,
// Enable Azure Bus Message Listener – Here I want to start ServiceBusConsumer to receive message from topic/queue.
3
Answers
Here I have a work around where we use the JMS to consume the service bus message
The reason to use JMS is that when we use the
@JMSListener
we can start stop it.Now to Implement JMS with ServiceBus refer this MSDOC
Now you have to
Autowired
thisJmsListenerEndpointRegistry
object and stop the listener.To stop the JMS you will have to use the stop function:
/stop
Api which will stop the JMS and the message will only start coming once the/start
api is called .output:
As you’re using an integration flow registered as a bean the simplest way to start/stop it is to autowire it as
StandardIntegrationFlow
and call the corresponding method like so:Note the
@DependsOn
annotation might be needed to force the flow bean to be initialized before the event listener bean.Also, it should be noted that some messages might happen to go through after the flow is initialized and before the listener is triggered.
Literally, if you just want to stop the listener and then start it on
ApplicationReadyEvent
, then you can autowire theServiceBusInboundChannelAdapter
(orServiceBusMessageListenerContainer
) in yourAppEventListenerService.java
and then simply call the its stop() and start() API in theAppEventListenerService#OnApplicationStarted
method.However, both the
ServiceBusMessageListenerContainer
andServiceBusInboundChannelAdapter
implementsSmartLifecycle
interface and is enabled auto-start-up by default. So if you use the above solution, the listener (as well as adapter) has been triggered to start beforeApplicationReadyEvent
, which means there will still be a period that the listener is consuming messages.So I assume you may want to turn off the listener till your own business logic has been done. If so, then currently
ServiceBusMessageListenerContainer
doesn’t provide the function to disable auto-start-up, and we will put your feature request to our backlog.But you could still use the below workarounds to meet your request.
Workaround-1
ServiceBusMessageListenerContainer
to override the auto-start-up behavior,ServiceBusMessageListenerContainer
andServiceBusInboundChannelAdapter
bean, disable their auto-start-up function.ServiceBusInboundChannelAdapter
after your business logic inAppEventListenerService#OnApplicationStarted
.Workaround-2
This might be a bit hack, since we don’t expose the api to disable auto-start-up in
ServiceBusMessageListenerContainer
, but it can be done inServiceBusInboundChannelAdapter
. So you can choose to not declare a bean ofServiceBusMessageListenerContainer
but change it as a local variable for the adapter,then start the
ServiceBusInboundChannelAdapter
after your business logic inAppEventListenerService#OnApplicationStarted
.