skip to Main Content

im using a Java project and i manage to create a new subscription for my topic.
i have created my subscription with corelationID filter and added to the filter a cusom properties.

enter image description here

now i want to write/receive a message for the specific topic with the correlationId filter with the specific custom properties ..

my final goal is that i’ll have service that write/read to and from a specific topic, with a specific subscription with specific correlationID filter and specificcustom properties .

this is my sender :

   public void sendMessage(String message) {
    Map<String, Object> customProperties = new HashMap<>();
    customProperties.put("version", "1.0");
    ServiceBusMessage serviceBusMessage = new ServiceBusMessage(message+"  "+new Date());
    String correlationId = serviceBusMessage.getCorrelationId();
    System.out.println("CorrelationId: " + correlationId);
    serviceBusMessage.setSubject("version");
    Map<String, Object> applicationProperties = serviceBusMessage.getApplicationProperties();
    applicationProperties.put("version", "1.0");
    senderClient.sendMessage(serviceBusMessage);
    System.out.println("Message sent: " + message);
}

this is my receiver :

    public ServiceBusReceiver(String connectionString, String topicName, String subscriptionName) {
    this.processorClient = new ServiceBusClientBuilder()
            .proxyOptions(getProxyOptions())
            .transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
            .connectionString(connectionString)
            .processor()
            .topicName(topicName)
            .subscriptionName(subscriptionName)
            .processMessage(this::processMessage)
         /*   .processMessage(context -> {
                String message = context.getMessage().getBody().toString();
                System.out.println("Received message: " + message);
            })*/.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
            .maxConcurrentCalls(1)   // Process one message at a time
            .processError(context -> {
                System.err.println("Error occurred: " + context.getException().getMessage());
            })
            .buildProcessorClient();
}

This is my subscription creator :

   CreateSubscriptionOptions options = new CreateSubscriptionOptions()
            .setMaxDeliveryCount(10)
            .setDefaultMessageTimeToLive(Duration.ofDays(1))
            .setDeadLetteringOnMessageExpiration(true);
    try {
        
        ServiceBusAdministrationClient adminClient
                = new ServiceBusAdministrationClientBuilder()
                .connectionString(connectionString)
                .buildClient();

        CorrelationRuleFilter correlationRuleFilter = new CorrelationRuleFilter("version");
        correlationRuleFilter.setLabel("version");
        correlationRuleFilter.getProperties().put("version", "1.0");

        CreateRuleOptions ruleOptions = new CreateRuleOptions();
        ruleOptions.setFilter(correlationRuleFilter);

        SubscriptionProperties subscription = adminClient.createSubscription(topicName, subscriptionName, "version", options, ruleOptions);
        System.out.println("Subscription created: " + subscription.getSubscriptionName());


    } catch (Exception e) {
        System.out.println("ERROR : " + e.getMessage());

    }

i keep getting the system properties and the custom properties but i only want the custom properties

hope you can help me understand how my code should look like ..

Thank you
Oded

2

Answers


  1. I have tried the below code using Java to send and receive messages to Azure Service Bus Topic to a specific subscription with a specific correlationID filter and custom properties.

    • I have added the correlation filter in the Azure Service Bus Topic’s Subscription as shown below.

    enter image description here

    KamASBReceiver.java :

    import com.azure.messaging.servicebus.*;
     
    public class KamASBReceiver {
        private static final String connectionString = "<TopicConneString>";
        private static final String topicName = "<TpicName>";
        private static final String subscriptionName = "<subName>";
     
        public static void main(String[] args) {
            ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                    .connectionString(connectionString)
                    .processor()
                    .topicName(topicName)
                    .subscriptionName(subscriptionName)
                    .processMessage(KamASBReceiver::processMessage)
                    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                    .processError(context -> System.out.println("Error occurred: " + context.getException()))
                    .buildProcessorClient();
            processorClient.start();
            System.out.println("Started receiving messages.");
            Runtime.getRuntime().addShutdownHook(new Thread(processorClient::close));
        }
     
        private static void processMessage(ServiceBusReceivedMessageContext context) {
            ServiceBusReceivedMessage message = context.getMessage();
            if ("version".equals(message.getCorrelationId()) &&
                    "1.0".equals(message.getApplicationProperties().get("version"))) {
                System.out.println("Received message with CorrelationId and custom property.");
                System.out.printf("Message ID: %s, Content: %s%n", message.getMessageId(), message.getBody().toString());
            } else {
                System.out.println("Received message that doesn't match the specified filter criteria.");
            }
        }
    }
    

    KamASBSender.java :

    import com.azure.messaging.servicebus.*;
     
    public class KamASBSender {
        private static final String connectionString = "<TopicConnecString>";
        private static final String topicName = "<TopicName>";
     
        public static void main(String[] args) {
            ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                    .connectionString(connectionString)
                    .sender()
                    .topicName(topicName)
                    .buildClient();
            try {
                ServiceBusMessage message = new ServiceBusMessage("Hi Kamali. Welcome to my world!")
                        .setCorrelationId("version");
                message.getApplicationProperties().put("version", "1.0");
                senderClient.sendMessage(message);
                System.out.println("Message sent successfully.");
            } finally {
                senderClient.close();
            }
        }
    }
    

    pom.xml :

    <dependencies>  
        <dependency>  
            <groupId>com.azure</groupId>  
            <artifactId>azure-messaging-servicebus</artifactId>  
            <version>7.17.4</version>  
        </dependency>  
    </dependencies>
    

    Output :

    I run the receiver code first and then the sender.

    I got the following output for the receiver code.

    enter image description here

    I got the below output for sender code.

    enter image description here

    Login or Signup to reply.
  2. i keep getting the system properties and the custom properties but i only want the custom properties

    That’s because of the correlation filter you’re constructing.

    You can create a correlation filter that’s using a custom header (application property) value, but you need to provision the filter correctly. I have a post that shows a somewhat similar request.

    If you want correlation filter on the Label system property, set it with the value of the version you want to filter messages based on. If you want to use a custom header (application header), use that. Don’t use both as it seems redundant.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search