skip to Main Content

Problem

I am trying to create a method to listen to a stream from a Redis server and return list of stream-entries depending on the current type of stream. For this I created a simple abstract base class with the properties I expect from all stream-entry types

public abstract class RedisStreamEntryDto
{
    public required float Progress { get; set; }
    public required bool RunDone { get; set; }
}

The class to listen to the stream would look something like the following code, which already illustrates the problem:

public class RedisUtils(ConnectionMultiplexer redis)
{

    public async Task<List<RedisStreamEntryDto>> GetStreamEntriesAsync(string streamName): 
    {

        var db = redis.GetDatabase();

        bool stopListening = false;
        List<RedisStreamEntryDto> streamItems = [];

        while (!stopListening)
        {
            StreamEntry[] streamEntries = await db.StreamReadAsync(streamName, "0-0");

            foreach (var entry in streamEntries)
            {
                if(entry.IsNull)
                {
                    continue;
                }

                var add_entry = // how to instantiate?

                streamItems.Add(add_entry);

                if (add_entry.RunDone)
                {
                    stopListening = true;
                    break;
                }
            }

        }
        return streamItems;
    }
}

Usually I would handle this by passing around JSONS and call JsonSerializer.Deserialize<T>(entry), but the received values are RedisValue types and have no built-in deserialization or rather I would first have to create a JSON string which seems hacky.

Approach

The Idea was to simply force all classes, which inherit from RedisStreamEntryDto to implement a static FromStreamEntry(StreamEntry entry) method that returns the correct instance. Where I am stuck at is how to implement it in the base class in order to be able to use it. Abstract static methods are not possible, I have also already tried to use an interface instead of the abstract class, but this basically leads to the same problem.
Only solution I can think of so far is to create a separate factory class, which is a bit annoying for this "simple" issue. Is there no better solution?

Edit
Some additional info based on the comments below: I know from the caller, which type of stream entry is expected, so I could make the method generic:

public class RedisUtils(ConnectionMultiplexer redis)
{

    public async Task<List<T>> GetStreamEntriesAsync<T>(string streamName) where T: RedisStreamEntryDto
    {

        var db = redis.GetDatabase();

        bool stopListening = false;
        List<T> streamItems = [];

        while (!stopListening)
        {
            StreamEntry[] streamEntries = await db.StreamReadAsync(streamName, "0-0");

            foreach (var entry in streamEntries)
            {
                if(entry.IsNull)
                {
                    continue;
                }

                var add_entry = T.FromStreamEntry(entry) // how to instantiate?

                streamItems.Add(add_entry);

                if (add_entry.RunDone)
                {
                    stopListening = true;
                    break;
                }
            }

        }
        return streamItems;
    }
}

Question still remains, how to implement the base abstract class (or interface) to be able to do this.

2

Answers


  1. Chosen as BEST ANSWER

    So after the discussion in the comments above, my implementation looks as follows (obviously this is a test implementation in a single file only, without error checks and all that other needed stuff):

    public interface IRedisStreamEntryFactory
    {
        RedisStreamEntryDto CreateEntryFromStreamEntry(StreamEntry entry);
    }
    
    public class ConcreteTaskStreamEntryFactory : IRedisStreamEntryFactory
    {
        public RedisStreamEntryDto CreateEntryFromStreamEntry(StreamEntry entry)
        {
            return new MyConcreteTaskStreamEntryDto()
            {
                Message = entry["message"].ToString(),
                Progress = float.Parse(entry["progress"]),
                RunDone = bool.Parse(entry["run_done"])
            };
        }
    }
    
    public class OtherConcreteTaskStreamEntryFactory : IRedisStreamEntryFactory
    {
        public RedisStreamEntryDto CreateEntryFromStreamEntry(StreamEntry entry)
        {
            return new MyOtherConcreteTaskStreamEntryDto()
            {
                OtherProperty = entry["other_property"].ToString(),
                Progress = float.Parse(entry["progress"]),
                RunDone = bool.Parse(entry["run_done"])
            };
        }
    }
    
    public class RedisUtils(ConnectionMultiplexer redis)
    {
    
        public async Task<List<RedisStreamEntryDto>> GetStreamEntriesAsync(IRedisStreamEntryFactory factory, string streamName)
        {
    
            var db = redis.GetDatabase();
    
            bool stopListening = false;
            List<RedisStreamEntryDto> streamItems = [];
    
            while (!stopListening)
            {
                StreamEntry[] streamEntries = await db.StreamReadAsync(streamName, "0-0");
    
                foreach (var entry in streamEntries)
                {
                    if(entry.IsNull)
                    {
                        continue;
                    }
    
                    var add_entry = factory.CreateEntryFromStreamEntry(entry);
    
                    streamItems.Add(add_entry);
    
                    if (add_entry.RunDone)
                    {
                        stopListening = true;
                        break;
                    }
                }
    
            }
            return streamItems;
        }
    }
    

    Which I guess is just a simple factory pattern anyway.


  2. if the requirement is just to be able to instantiate within your method without knowing the concrete class then it is easily done using an additional constraint. Note, your concrete classes must have a parameterless constructor.

    This can be an interface:

    public interface IRedisStreamEntryDto
    {
        float Progress { get; set; }
        bool RunDone { get; set; }
    }
    

    Now in your method signature, add the new() constraint which tells the compiler that the class will have a parameterless constructor:

    public async Task<List<T>> GetStreamEntriesAsync<T>(string streamName) where T : class, IRedisStreamEntryDto, new()
    

    Now you can instantiate within your method with:

    T add_entry = new T();
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search