skip to Main Content

I have an issue where, if I subscribe multiple times to the same Redis channel, my subscribers start receiving duplicate messages.

Here is my subscription method:

public void SubscribeToUserNotifications(string userId, Action<string?> onMessage)
{
    _subscriber.Subscribe
        (
        new RedisChannel($"orders:notifications:user:{userId}",
            RedisChannel.PatternMode.Literal), (channel, message) =>
            {
                onMessage(message);
            });
}

I also have a method that publishes a message when a new order is added for a user:

public async Task AddOrderForUserAsync(string userId, OrderInfo order)
{
    var db = _redis.GetDatabase();

    string orderKey = $"orders:user:{userId}";

    string orderData = JsonConvert.SerializeObject(order);

    await db.HashSetAsync(orderKey, order.Id.ToString(), orderData);

    _subscriber.Publish(
        new RedisChannel($"orders:notifications:user:{userId}", RedisChannel.PatternMode.Literal),
        orderData
    );
}

I reviewed my code and realized that each time a user subscribes, a new subscription to the same channel is created, which leads to the same message being sent to the same subscriber multiple times.
I tried to check if the subscription already exists, but Redis itself doesn’t seem to provide a direct way to avoid duplicate subscriptions.

I want the subscriber to receive only one notification per published message, even if they subscribe multiple times to the same channel.

Question:
How can I avoid or prevent duplicate notifications when subscribing multiple times to the same Redis channel? Is there a built-in way in Redis to manage this, or should I handle it on the application side?

2

Answers


  1. Chosen as BEST ANSWER

    Issue: How to properly subscribe to Redis Pub/Sub with an asynchronous handler in C#? If you're trying to use Redis Pub/Sub with StackExchange.Redis in C#, and you want to handle messages asynchronously, you may encounter issues where Subscribe expects a synchronous delegate (Action<RedisChannel, RedisValue>), but your handler is asynchronous.

    To solve this, you can wrap your asynchronous logic inside a Task.Run call, which allows you to execute asynchronous code within the synchronous delegate.

    Here's how this issue was resolved:

    Solution: Method to add an order and notify subscribers:

    public async Task AddOrderForUserAsync(string userId, OrderInfo order)
    {
        var db = _redis.GetDatabase();
    
        string streamKey = $"{_streamKeyPrefix}{userId}";
    
        string orderData = JsonConvert.SerializeObject(order);
    
        // Add the order to Redis Stream
        await db.StreamAddAsync(streamKey, new NameValueEntry[]
        {
            new NameValueEntry("orderId", order.Id.ToString()),
            new NameValueEntry("orderData", orderData)
        });
    
        // Publish a notification to subscribers via Pub/Sub
        _subscriber.Publish(
            new RedisChannel($"orders:notifications:user:{userId}", RedisChannel.PatternMode.Literal),
            orderData);
    }
    

    Method to subscribe to notifications with asynchronous handling:

    public void SubscribeToUserNotifications(string userId, Action<string?> onMessage)
    {
        string streamKey = $"{_streamKeyPrefix}{userId}";
    
        // Subscribe to Pub/Sub notifications for the user's orders
        _subscriber.Subscribe(new RedisChannel($"orders:notifications:user:{userId}", RedisChannel.PatternMode.Literal),
        (channel, message) =>
        {
            // Run asynchronous code inside Task.Run
            Task.Run(async () =>
            {
                var db = _redis.GetDatabase();
    
                // Read messages from Redis Stream within the consumer group
                var entries = await db.StreamReadGroupAsync(streamKey, _groupName, userId, ">", count: 1);
    
                foreach (var entry in entries)
                {
                    // Process the received data
                    var orderData = entry.Values.FirstOrDefault(x => x.Name == "orderData").Value;
                    onMessage(orderData);
    
                    // Acknowledge the processing of the message
                    await db.StreamAcknowledgeAsync(streamKey, _groupName, entry.Id);
                }
            });
        });
    }
    

  2. No built-in way. You could use a flag before subscribe.

    bool hasExecuted = false;
    public void SubscribeToUserNotifications(string userId, Action<string?> onMessage)
    {
        if (!hasExecuted)
        {
            _subscriber.Subscribe(...);
            hasExecuted = true;
        }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search