skip to Main Content

Because I could not find any implementation where we don’t use a loop to get the stream content I start to implement one but I’m facing several problems that may some of you can point me to the right place.

The implementation uses a combination of Pub/Sub and the stream:
* log -> stream channel
* log:notification -> pub/sub
* log:lastReadMessage -> contains the last read key from the stream

Publisher

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            while(true)
            {
                var value =  new NameValueEntry[]
                {
                    new NameValueEntry("id", Guid.NewGuid().ToString()),
                    new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
                };

                redisDb.StreamAdd("log", value);
                var publisher = connectionMultiplexer.GetSubscriber();
                publisher.Publish("log:notify", string.Empty, CommandFlags.None);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }

Subscriber

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);


            var observableStream =  CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
                .Subscribe(x => {
                  Console.WriteLine(x);  
                });

            Console.ReadLine();
        }
        private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }


            return Observable.Create<string>(obs => 
            {
                var subscriber = connection.GetSubscriber();
                subscriber.Subscribe($"{channel}:notify", async (ch, msg) => 
                {
                    var locker = await taskFromStreamBlocker
                        .WaitAsync(0)
                        .ConfigureAwait(false);

                    if (!locker)
                    {
                        return;
                    }

                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    taskFromStreamBlocker.Release();
                });

                return Disposable.Create(() => subscriber.Unsubscribe(channel));
            });
        }

Why the semaphore?

Because I could have lots of messages add to the stream and I don’t want o to have the same message processed twice.

THE PROBLEMS

  1. If we have unprocessed messages in the stream, how can we process without having an event from the Pub/Sub
    When we start we can verify if it is unprocessed messages and processes it. If during this time a new message is added to the stream, and we aren’t subscribing yet the Pub/sub, the subscriber will not process the message until we receive a notification through the Pub/Sub.

  2. The semaphore is important to not process the same message twice but at the same time it’s a curse. During the process of a message, another can be added to the stream. When that happens the subscriber will not process right away but only the next time it’s notified (at this point will process two messages).

How you would implement this?
Is there an implementation of the Redis streams using Rx only?
The solution should not use some kind of loop and be memory efficient. Is this possible?

Best wishes

Paulo Aboim Pinto

3

Answers


  1. Chosen as BEST ANSWER

    this is the solution with WHILE that I want to avoid

            private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
            {
                var lastReadMessage = "0-0";
    
                var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
                if (string.IsNullOrEmpty(lastReadMessageData))
                {
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                }
                else
                {
                    lastReadMessage = lastReadMessageData;
                }
    
                return Observable.Create<string>(async obs => 
                {
                    while(!cancellationToken.IsCancellationRequested)
                    {
                        var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
    
                        foreach(var message in messages)
                        {
                            obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                            lastReadMessage = message.Id;
                        }
    
                        redisDb.KeyDelete($"{channel}:lastReadMessage");
                        redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
    
                        await Task.Delay(TimeSpan.FromMilliseconds(500));
                    }
    
                    return Disposable.Empty;
                });
            }
    

  2. and this is another solution using a timer with 200ms elapse time

    
            private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
            {
                var lastReadMessage = "0-0";
    
                var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
                if (string.IsNullOrEmpty(lastReadMessageData))
                {
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                }
                else
                {
                    lastReadMessage = lastReadMessageData;
                }
    
                var instance = ThreadPoolScheduler.Instance;
    
                return Observable.Create<string>(obs => 
                {
                    var disposable = Observable
                        .Interval(TimeSpan.FromMilliseconds(200), instance)
                        .Subscribe(async _ => 
                        {
                            var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
    
                            foreach(var message in messages)
                            {
                                obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                                lastReadMessage = message.Id;
                            }
    
                            redisDb.KeyDelete($"{channel}:lastReadMessage");
                            redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                        });
                    cancellationToken.Register(() => disposable.Dispose());
    
                    return Disposable.Empty;    
                });
           }
    
    
    Login or Signup to reply.
  3. I use a tight loop just do an XRange and save a position – KISS.. but if there is no work it backs off so its pretty fast when there is a lot going on its a tight loop.

    If you need higher performance eg reading while processing however i would caution against this for most cases.

    1. It creates a lot of complexity and this needs to be rock solid.
    2. Redis is normally fast enough
    3. ” I don’t want o to have the same message processed twice.” almost every system has at least once delivery eliminating this around crashes is mind mindbogglingly difficult / slow. You can remove it partially by using a hashset of ids but its pretty trivial for consumers to deal with it and messages designed to be idempotent. This is probably the root cause message design issues. If you partition each reader ( separate stream and 1 worker per stream) you can keep the hashset in memory avoiding scaling / distributed issues. Note a Redis stream can preserve order use this to make simpler idempotent messages.
    4. Exceptions, you don’t want to stop processing a stream because a consumer has a logic exception on 1 message eg get a call at night the whole system has stopped, locks make this worse. Event data cant be changed its happened so its best effort. However infra / redis exceptions do need to throw and be retried. Managing this outside a loop is very painful.
    5. Simple back pressure. If you cant process the work fast enough the loop slows down instead of creating a lot of tasks and blowing up all your memory.

    I dont use distributed locks / semaphores anymore.

    If your dealing with Commands eg dosomething instead of xyz has happened these can fail. Again the consumer should deal with the case it has already happened not the redis / stream reading part.

    Some libs with magic call backs dont solve these issues the call backs will have retry when time out run on any node etc. The complexity / issues are still there they just move somewhere else.

    You may have an observable on top for consumers but this is basically cosmetic it does not solve the problem and if you look under many implementations somewhere you will see the same loop. I would not use this instead get the consumer to register an action.

    eg

        public interface IStreamSubscriber
        {
            void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
            void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
            void Start();
        }    
    

    In your case the call back could have the observable and not use the loop but there is a low level loop underneath which can also do message to object conversion for the consumer.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search