skip to Main Content

I am using Stackexchange.Redis package in .NET Core 3.1
I can use pub-sub method like this:-

            var channel = connectionMultiplexer.GetSubscriber().Subscribe("channel1");
             channel.OnMessage(msg =>
             {
                 var message = msg.Message;
                 Console.Out.WriteLine(message);
             });

But pub-sub is not stored anywhere in the Redis server. I found redis stream persists in the redis store. So I wanted to replace pub-sub with the stream. But I don’t see any way to implement this as I did for pub-sub. How can I implement pub-sub like way for stream data?

2

Answers


  1. Try with:

    connectionMultiplexer.GetSubscriber().Subscribe("channel", (channel, message) => {
        await DoSomethingWith(message);
    });
    
    Login or Signup to reply.
  2. So using the Stackexchange.Redis package, I have also searched and could not find an answer, however, I came across this question and in the question the user refers to this github issue . In the Github issue, chrisckc proposes a slight work around that uses both the Streams as well as Pub/Sub to achieve an almost Pub/Sub for streams. I encourage you to read the Github issue for some of the cons. So this is my take on how to implement it using chrisckc suggestion

    var db = connectionMultiplexer.GetDatabase();
    var subscriber = connectionMultiplexer.GetSubscriber();
    
    /////////////The Subscriber
    string channelAndStreamName = "channel1";
    var channel = connectionMultiplexer.GetSubscriber().Subscribe(channelAndStreamName);
    channel.OnMessage(msg =>
    {
        bool ifAllSubscriberShouldProcessAMessage = true;
    
        if (ifAllSubscriberShouldProcessAMessage)
        {
            //You then can retrieve your message normally
            var message = msg.Message;
            Console.Out.WriteLine(message);
    
            //Or you can even retrieve it from the actual stream. 
            var streamEntryArray = db.StreamRead(channelAndStreamName, "0-0");
    
            //Then Parse your streamEntryArray objects To what you expect
            //See example https://github.com/redis-developer/redis-streams-with-dotnet/blob/main/RedisStreamsStackExchange/Program.cs
        }
        else
        {
    
            //In this case you would only want one consumer to process the message,
            //e.g. if you have multiple instances of your app running for load balancing purposes
            //Firstly follow https://developer.redis.com/develop/dotnet/streams/stream-basics/
            //So that you understand the groups and how to set it up
            //Then instead of having a continuous loop as the article suggests, you instead
            //subscribe using the example I have shown here, but your subscribers should only process
            //the message if they are the ones who received it via the StreamRead, just like below.
            //Remember that all subscribers will receive the message via Pub/Sub, but in your
            //processing logic, retrieve the message via streams, if the message is there
            //then that instance is the one meant to process the actual message
    
            string groupName = "group1";
            var result = db.StreamReadGroup(channelAndStreamName, groupName, "avg-1", ">", 1);
    
            if (result.Any())
            {
                //If there is a result, then this subscriber instance is the one in the group who
                //received the message, in theory, the others did not
    
                var id = result.First().Id;
                db.StreamAcknowledge(channelAndStreamName, groupName, id);
    
                var message = msg.Message;
                Console.Out.WriteLine(message);
            }
        }
    });
    
    
    
    /////////////The Publisher
    
    //This is the value that we are going to first add to the stream
    //then after we add, we also publish the same value so that it is received by subscribers.
    //For easy use, the stream name as well as the channel name are the same
    string valueToAdd = $"This is the new entry {DateTime.UtcNow}";
    
    //Adding to persistent stream first
    await db.StreamAddAsync(channelAndStreamName,
               new NameValueEntry[]
                   {new("temp", valueToAdd), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())});
    
    //after adding the new value to the stream, you can then publish to subscribers
    await subscriber.PublishAsync(channelAndStreamName, valueToAdd);
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search