skip to Main Content

When any new message received in the stream, on that time OnMessageReceived event should trigger.

public event EventHandler<MqMessageReceivedEventArgs> OnMessageReceived;


var result = await redisDatabase.StreamRangeAsync(StreamName, "-", "+", 1, Order.Ascending);

  protected virtual void OnMessageReceivedEvent(MqMessageReceivedEventArgs e)
  {
      OnMessageReceived?.Invoke(this, e);
  }


2

Answers


  1. According to official documentation, you have to do the following:

    Define pasing function:

    public static class RedisHelper
    {
        Dictionary<string, string> ParseResult(StreamEntry entry) => 
            entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
    }
    

    Start consumer task:

    
    var tokenSource = new CancellationTokenSource();
    var token = tokenSource.Token;
    
    var readTask = Task.Run(async () =>
    {
        while (!token.IsCancellationRequested)
        {
            var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
            if (result.Any())
            {
                var dictionaries = result.Select(r => ParseResult(r)).ToList();
                // or invoke event in loop
                OnMessageReceivedEvent(new MqMessageReceivedEventArgs(dictionaries))
            }
    
            await Task.Delay(1000);
        }
    });
    
    Login or Signup to reply.
  2. looks like you are trying to call/trigger OnMessageReceivedEvent, when this called a event with new msg will receive in stream. asuming logic already implemented and it listen for msg in stream.

    public async Task ListenForMessages()
    {
        while (true)
        {
            var result = await redisDatabase.StreamRangeAsync(StreamName, "-", "+", 1, Order.Ascending);
            if (result.Length > 0)
            {
                foreach (var message in result)
                {
                    OnMessageReceived?.Invoke(this, new MqMessageReceivedEventArgs(message));
                }
            }
            await Task.Delay(1000); // Delay before checking for new messages again
        }
    }
    public class MqMessageReceivedEventArgs : EventArgs
    {
    public MqMessageReceivedEventArgs(StreamEntry message)
    {
        // Initialize any properties you want to pass along with the event
    }
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search