skip to Main Content

What I want to AchieveAzure Service Bus Message Listener to start / stop receiving messages from queue/topic.

Below is a detailed explanation.

Currently I have integrated Azure Service Bus in my application and we listen message as soon as spring boot application starts. Now I want to modify this logic. By default Azure Service Bus Message Listener will be disable. On ApplicationReadyEvent I want to perform some task and after that again I want to enable Azure Service Bus Message Listener to start listening from topic or queue.

So how can I achieve that ?

application.yml

spring:
  cloud:
    azure:
      servicebus:
        namespace: **********
        
xxx:
  azure:
    servicebus:
      connection: ***********
      queue: **********

AzureConfiguration.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
import com.azure.spring.messaging.servicebus.core.listener.ServiceBusMessageListenerContainer;
import com.azure.spring.messaging.servicebus.core.properties.ServiceBusContainerProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;


@Configuration
public class AzureConfiguration{

    @Value("${xxx.azure.servicebus.connection}")
    private String serviceBusConnection;

    @Value("${xxx.azure.servicebus.queue}")
    private String serviceBusQueue;

    private static final String SERVICE_BUS_INPUT_CHANNEL = "yyyyy";
    private static final String SENSOR_DATA_CHANNEL = "zzzzz";
    private static final String SERVICE_BUS_LISTENER_CONTAINER = "aaaaa";


    @Bean(name = SERVICE_BUS_LISTENER_CONTAINER)
    public ServiceBusMessageListenerContainer serviceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory) {

        ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
        containerProperties.setConnectionString(serviceBusConnection);
        containerProperties.setEntityName(serviceBusQueue);
        containerProperties.setAutoComplete(true);
        return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
    }


    @Bean
    public ServiceBusInboundChannelAdapter serviceBusInboundChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {

        ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
        adapter.setOutputChannel(inputChannel);
        
        return adapter;
    }


    @Bean(name = SERVICE_BUS_INPUT_CHANNEL)
    public MessageChannel serviceBusInputChannel() {

        return new DirectChannel();
    }


    @Bean(name = SENSOR_DATA_CHANNEL)
    public MessageChannel sensorDataChannel() {

        return new DirectChannel();
    }


    @Bean
    public IntegrationFlow serviceBusMessageFlow() {

        return IntegrationFlows.from(SERVICE_BUS_INPUT_CHANNEL)
                .<byte[], String>transform(String::new)
                .channel(SENSOR_DATA_CHANNEL)
                .get();
    }
}

AppEventListenerService.java

import com.azure.spring.integration.servicebus.inbound.ServiceBusInboundChannelAdapter;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Slf4j
@Service
@AllArgsConstructor
public class AppEventListenerService{

   
    @EventListener(ApplicationReadyEvent.class)
    public void OnApplicationStarted() {
        log.debug("Enter OnApplicationStarted");
        // By Default Azure Service Bus Message Listener will be disable
        // do some task
        // Enable Azure Bus Message Listener
        log.debug("Exit OnApplicationStarted");
    }
}

In above code in AppEventListenerService.java ,

// Enable Azure Bus Message Listener – Here I want to start ServiceBusConsumer to receive message from topic/queue.

3

Answers


    • Here I have a work around where we use the JMS to consume the service bus message

    • The reason to use JMS is that when we use the @JMSListener we can start stop it.

    • Now to Implement JMS with ServiceBus refer this MSDOC

    • Now you have to Autowired this JmsListenerEndpointRegistry object and stop the listener.

    @Autowired  
    JmsListenerEndpointRegistry registry;
    

    To stop the JMS you will have to use the stop function:

    registry.stop();
    
    • Here I have create two Api which will start/Stop the JMS and receiver of messages:
    @Component  
    @RestController  
    public class Reciever {  
      
        @Autowired  
      JmsListenerEndpointRegistry registry;  
      
      @GetMapping("/stop")  
        public String readBlobFile ()  
        {  
            registry.stop();  
     return "Stopped" ;  
      }  
      
        @GetMapping("/start")  
        public String readBlobFile1 ()  
        {  
            registry.start();  
     return "StARTED" ;  
      }  
    
    
        private static final String QUEUE_NAME = "test";  
     private final Logger logger = LoggerFactory.getLogger(Reciever.class);  
      
      
      
      @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")  
        public void receiveMessage(String s) {  
            logger.info("Received message: {}", s);  
      }  
    }
    
    • Now First I call the /stop Api which will stop the JMS and the message will only start coming once the /startapi is called .

    output:

    enter image description here

    Login or Signup to reply.
  1. As you’re using an integration flow registered as a bean the simplest way to start/stop it is to autowire it as StandardIntegrationFlow and call the corresponding method like so:

    @Slf4j
    @Service
    @DependsOn({"serviceBusMessageFlow"})
    @RequiredArgsConstructor
    public class AppEventListenerService {
        private final StandardIntegrationFlow serviceBusMessageFlow;
    
        @EventListener(ApplicationReadyEvent.class)
        public void OnApplicationStarted() {
            log.debug("Enter OnApplicationStarted");
            // Disable Azure Bus Message Listener
            serviceBusMessageFlow.stop();
    
            // do some task
    
            // Enable Azure Bus Message Listener
            serviceBusMessageFlow.start();
    
            log.debug("Exit OnApplicationStarted");
        }
    }
    

    Note the @DependsOn annotation might be needed to force the flow bean to be initialized before the event listener bean.
    Also, it should be noted that some messages might happen to go through after the flow is initialized and before the listener is triggered.

    Login or Signup to reply.
  2. Literally, if you just want to stop the listener and then start it on ApplicationReadyEvent, then you can autowire the ServiceBusInboundChannelAdapter(or ServiceBusMessageListenerContainer) in your AppEventListenerService.java and then simply call the its stop() and start() API in the AppEventListenerService#OnApplicationStarted method.

    However, both the ServiceBusMessageListenerContainer and ServiceBusInboundChannelAdapter implements SmartLifecycle interface and is enabled auto-start-up by default. So if you use the above solution, the listener (as well as adapter) has been triggered to start before ApplicationReadyEvent, which means there will still be a period that the listener is consuming messages.

    So I assume you may want to turn off the listener till your own business logic has been done. If so, then currently ServiceBusMessageListenerContainer doesn’t provide the function to disable auto-start-up, and we will put your feature request to our backlog.

    But you could still use the below workarounds to meet your request.

    Workaround-1

    1. You can extend the ServiceBusMessageListenerContainer to override the auto-start-up behavior,
    public class CustomServiceBusMessageListenerContainer extends ServiceBusMessageListenerContainer {
    
        private boolean autoStartUp = true;
        /**
         * Create an instance using the supplied processor factory and container properties.
         *  @param processorFactory the processor factory.
         * @param containerProperties the container properties.
         */
        public CustomServiceBusMessageListenerContainer(ServiceBusProcessorFactory processorFactory, ServiceBusContainerProperties containerProperties) {
            super(processorFactory, containerProperties);
        }
    
        public void setAutoStartUp(boolean autoStartUp) {
            this.autoStartUp = autoStartUp;
        }
    
        @Override
        public final boolean isAutoStartup() {
            return this.autoStartUp;
        }
    }
    
    1. When declaring the ServiceBusMessageListenerContainer and ServiceBusInboundChannelAdapter bean, disable their auto-start-up function.
        @Bean(SERVICE_BUS_LISTENER_CONTAINER)
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            ...
            CustomServiceBusMessageListenerContainer listenerContainer = new CustomServiceBusMessageListenerContainer(processorFactory, containerProperties);
            listenerContainer.setAutoStartUp(false);
            return listenerContainer;
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel,
            @Qualifier(SERVICE_BUS_LISTENER_CONTAINER) ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            adapter.setAutoStartup(false);
            return adapter;
        }
    
    1. Start the ServiceBusInboundChannelAdapter after your business logic in AppEventListenerService#OnApplicationStarted.

    Workaround-2

    This might be a bit hack, since we don’t expose the api to disable auto-start-up in ServiceBusMessageListenerContainer, but it can be done in ServiceBusInboundChannelAdapter. So you can choose to not declare a bean of ServiceBusMessageListenerContainer but change it as a local variable for the adapter,

        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(SERVICE_BUS_INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            ...
            ServiceBusMessageListenerContainer listenerContainer = new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
    
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            adapter.setAutoStartup(false);
            return adapter;
        }
    

    then start the ServiceBusInboundChannelAdapter after your business logic in AppEventListenerService#OnApplicationStarted.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search