I have an issue where, if I subscribe multiple times to the same Redis channel, my subscribers start receiving duplicate messages.
Here is my subscription method:
public void SubscribeToUserNotifications(string userId, Action<string?> onMessage)
{
_subscriber.Subscribe
(
new RedisChannel($"orders:notifications:user:{userId}",
RedisChannel.PatternMode.Literal), (channel, message) =>
{
onMessage(message);
});
}
I also have a method that publishes a message when a new order is added for a user:
public async Task AddOrderForUserAsync(string userId, OrderInfo order)
{
var db = _redis.GetDatabase();
string orderKey = $"orders:user:{userId}";
string orderData = JsonConvert.SerializeObject(order);
await db.HashSetAsync(orderKey, order.Id.ToString(), orderData);
_subscriber.Publish(
new RedisChannel($"orders:notifications:user:{userId}", RedisChannel.PatternMode.Literal),
orderData
);
}
I reviewed my code and realized that each time a user subscribes, a new subscription to the same channel is created, which leads to the same message being sent to the same subscriber multiple times.
I tried to check if the subscription already exists, but Redis itself doesn’t seem to provide a direct way to avoid duplicate subscriptions.
I want the subscriber to receive only one notification per published message, even if they subscribe multiple times to the same channel.
Question:
How can I avoid or prevent duplicate notifications when subscribing multiple times to the same Redis channel? Is there a built-in way in Redis to manage this, or should I handle it on the application side?
2
Answers
Issue: How to properly subscribe to Redis Pub/Sub with an asynchronous handler in C#? If you're trying to use Redis Pub/Sub with StackExchange.Redis in C#, and you want to handle messages asynchronously, you may encounter issues where Subscribe expects a synchronous delegate (Action<RedisChannel, RedisValue>), but your handler is asynchronous.
To solve this, you can wrap your asynchronous logic inside a Task.Run call, which allows you to execute asynchronous code within the synchronous delegate.
Here's how this issue was resolved:
Solution: Method to add an order and notify subscribers:
Method to subscribe to notifications with asynchronous handling:
No built-in way. You could use a flag before subscribe.