im using a Java project and i manage to create a new subscription for my topic.
i have created my subscription with corelationID filter and added to the filter a cusom properties.
now i want to write/receive a message for the specific topic with the correlationId filter with the specific custom properties ..
my final goal is that i’ll have service that write/read to and from a specific topic, with a specific subscription with specific correlationID filter and specificcustom properties .
this is my sender :
public void sendMessage(String message) {
Map<String, Object> customProperties = new HashMap<>();
customProperties.put("version", "1.0");
ServiceBusMessage serviceBusMessage = new ServiceBusMessage(message+" "+new Date());
String correlationId = serviceBusMessage.getCorrelationId();
System.out.println("CorrelationId: " + correlationId);
serviceBusMessage.setSubject("version");
Map<String, Object> applicationProperties = serviceBusMessage.getApplicationProperties();
applicationProperties.put("version", "1.0");
senderClient.sendMessage(serviceBusMessage);
System.out.println("Message sent: " + message);
}
this is my receiver :
public ServiceBusReceiver(String connectionString, String topicName, String subscriptionName) {
this.processorClient = new ServiceBusClientBuilder()
.proxyOptions(getProxyOptions())
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(connectionString)
.processor()
.topicName(topicName)
.subscriptionName(subscriptionName)
.processMessage(this::processMessage)
/* .processMessage(context -> {
String message = context.getMessage().getBody().toString();
System.out.println("Received message: " + message);
})*/.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.maxConcurrentCalls(1) // Process one message at a time
.processError(context -> {
System.err.println("Error occurred: " + context.getException().getMessage());
})
.buildProcessorClient();
}
This is my subscription creator :
CreateSubscriptionOptions options = new CreateSubscriptionOptions()
.setMaxDeliveryCount(10)
.setDefaultMessageTimeToLive(Duration.ofDays(1))
.setDeadLetteringOnMessageExpiration(true);
try {
ServiceBusAdministrationClient adminClient
= new ServiceBusAdministrationClientBuilder()
.connectionString(connectionString)
.buildClient();
CorrelationRuleFilter correlationRuleFilter = new CorrelationRuleFilter("version");
correlationRuleFilter.setLabel("version");
correlationRuleFilter.getProperties().put("version", "1.0");
CreateRuleOptions ruleOptions = new CreateRuleOptions();
ruleOptions.setFilter(correlationRuleFilter);
SubscriptionProperties subscription = adminClient.createSubscription(topicName, subscriptionName, "version", options, ruleOptions);
System.out.println("Subscription created: " + subscription.getSubscriptionName());
} catch (Exception e) {
System.out.println("ERROR : " + e.getMessage());
}
i keep getting the system properties and the custom properties but i only want the custom properties
hope you can help me understand how my code should look like ..
Thank you
Oded
2
Answers
I have tried the below code using Java to send and receive messages to Azure Service Bus Topic to a specific subscription with a specific
correlationID
filter andcustom properties
.correlation filter
in the Azure Service Bus Topic’s Subscription as shown below.KamASBReceiver.java :
KamASBSender.java :
pom.xml :
Output :
I run the receiver code first and then the sender.
I got the following output for the receiver code.
I got the below output for sender code.
That’s because of the correlation filter you’re constructing.
You can create a correlation filter that’s using a custom header (application property) value, but you need to provision the filter correctly. I have a post that shows a somewhat similar request.
If you want correlation filter on the
Label
system property, set it with the value of the version you want to filter messages based on. If you want to use a custom header (application header), use that. Don’t use both as it seems redundant.