skip to Main Content

I am new to working with Azure and I am trying to retrieve some messages from the Azure Service Bus queue that was set up by a client, but nothing happens when I run the code. It just waits for the timeout to complete and then stops–no errors and/or exceptions.

In addition to that, I stumbled upon the peekMessages() function and was able to successfully retrieve the messages.

The connection string and queue name are both valid. And as per client, they already had the "Listen" permission set up as well.

This is what I have done so far:

AzureServiceBusQueue queue = new AzureServiceBusQueue(MSCConstants.AZURE_UAT_CONNECTION_STRING, MSCConstants.AZURE_UAT_QUEUE_MAME);
queue.receiveMessages();
public void receiveMessages() throws Exception {
    AtomicBoolean sampleSuccessful = new AtomicBoolean(true);
    CountDownLatch countdownLatch = new CountDownLatch(1);

    try {
        ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
                .connectionString(connectionString)
                .receiver()
                .queueName(queueName)
                .maxAutoLockRenewDuration(Duration.ofMinutes(1))
                .disableAutoComplete()
                .buildAsyncClient();

        Disposable subscription = receiver.receiveMessages()
                .subscribe(message -> {
                            System.out.printf("Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                                    message.getBody());
                        },
                        error -> {
                            System.err.println("Error occurred while receiving message: " + error);
                            sampleSuccessful.set(false);
                        });


        countdownLatch.await(60, TimeUnit.SECONDS);
        subscription.dispose();
        receiver.close();
    } catch (Exception ex) {
        ex.printStackTrace();
        throw ex;
    }
}

And when I ran this code, the program just waits for 60 seconds and then it stops. No errors, exceptions, or any log messages in the console.

Do you have any ideas/suggestions on what’s happening and what to do?

UPDATE:

I have also tried the approach below.

When I tried executing the code, it first prints "Testing." Then it waits for a couple of minutes before printing "123" next. The printing of the message details in between is never executed.

public void receiveMessagesV2() throws Exception {
        ServiceBusReceiverAsyncClient receiver = null;
        try {
            receiver = new ServiceBusClientBuilder()
                    .connectionString(getConnectionString())
                    .receiver()
                    .queueName(getQueueName())
                    .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                    .disableAutoComplete()
                    .buildAsyncClient();

            for (int i = 0; i < 3; i++) {
                System.out.println("Testing");
                ServiceBusReceiverAsyncClient finalReceiver = receiver;
                receiver.receiveMessages().subscribe(message -> {
                    System.out.printf("Sequence #: %s. Contents: %s%n", message.getSequenceNumber(), message.getBody());

                    // Messages from the sync receiver MUST be settled explicitly.
                    finalReceiver.complete(message);
                });
                System.out.println("123");
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            throw ex;
        } finally {
             receiver.close();
        }
    }

2

Answers


  1. Chosen as BEST ANSWER

    Looks like this is caused by a compatibility issue of the azure-messaging-servicebus library.

    I was initially using the 7.14 version, and after downgrading to 7.10.1 (as suggested by this post), I was finally able to receive the messages without any issues.


  2. I tried the below Java code to receive messages from an Azure Service Bus Queue. The code leverages the Azure SDK for Java along with the Reactor and RxJava libraries for handling asynchronous operations.

    Code :

    import com.azure.messaging.servicebus.*;
    import reactor.core.Disposable;
    import java.time.Duration;
    
    public class AzureServiceBusQueue {
    
        private final String connectionString;
        private final String queueName;
    
        public AzureServiceBusQueue(String connectionString, String queueName) {
            this.connectionString = connectionString;
            this.queueName = queueName;
        }
    
        public void receiveMessages() throws Exception {
            ServiceBusReceiverAsyncClient receiver = null;
    
            try {
                receiver = new ServiceBusClientBuilder()
                        .connectionString(connectionString)
                        .receiver()
                        .queueName(queueName)
                        .maxAutoLockRenewDuration(Duration.ofMinutes(1))
                        .disableAutoComplete()
                        .buildAsyncClient();
    
                final ServiceBusReceiverAsyncClient finalReceiver = receiver; 
    
                Disposable subscription = receiver.receiveMessages()
                        .subscribe(message -> {
                                    System.out.printf("Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                                            message.getBody());
    
                                    finalReceiver.complete(message);
                                },
                                error -> {
                                    System.err.println("Error occurred while receiving message: " + error);
                                },
                                () -> {
                                    System.out.println("Subscription completed");
                                });
    
                Thread.sleep(60000);
    
                subscription.dispose();
            } catch (Exception ex) {
                ex.printStackTrace();
                throw ex;
            } finally {
                if (receiver != null) {
                    receiver.close();
                }
            }
        }
    
        public static void main(String[] args) {
            AzureServiceBusQueue queue = new AzureServiceBusQueue("<ServiceBusQueue_connecString>", "<queue_name>");
            try {
                queue.receiveMessages();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    pom.xml :

        <dependencies>
            <dependency>
                <groupId>com.azure</groupId>
                <artifactId>azure-messaging-servicebus</artifactId>
                <version>7.8.0</version>
            </dependency>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-core</artifactId>
                <version>3.4.14</version>
            </dependency>
            <dependency>
                <groupId>io.projectreactor.netty</groupId>
                <artifactId>reactor-netty</artifactId>
                <version>1.0.15</version>
            </dependency>
            <dependency>
                <groupId>io.reactivex.rxjava3</groupId>
                <artifactId>rxjava</artifactId>
                <version>3.1.0</version>
            </dependency>
        </dependencies>
    

    I gave the Listen permissions in the shared access policies in the Service Bus Queue at the Azure Portal as below,

    enter image description here

    I sent five messages to the Azure Service Bus Queue in Portal as below,

    enter image description here

    Output :

    The code executed successfully and retrieved the five messages from the Azure Service Bus Queue.

    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search