skip to Main Content

I have used ServiceStack since a while back, and I am well aware of the message based API design that is preferred, and this is something I use in some REST based APIs.

I am now looking into the Redis / MQ library and, as always, enjoy the structure and functionality of ServiceStack. However, I am looking into replacing some legacy communication code with an MQ server, and have tested out some SS examples, and it works well.

However, some legacy code I am working with uses the same class for outgoing request and the response, so like GetSomething is sent, and the reply is an instance of the same class GetSomething, but with a property like GetSomething.Result that contains the reply/result.

Since I wanted a drop-in replacement of the current communication model, I looked to see if this scenario could be supported “out of the box”, but I didn’t really find anything to solve this. When I so things like this in the Consumer that has the Handler:

mqHost.RegisterHandler<GetSomething>(base.ExecuteMessage);

and the publisher, who wants the reply:

mqServer.RegisterHandler<GetSomething>(m => {...});

what happens is that the publisher picks up the request immediately, and it never reaches the Consumer. If I remove the listener for the reply in the Publisher, it reaches the Consumer, but when the consumer then replies with the same DTO, GetSomething, it gets stuck in an eternal loop, because I’m thinking the reply is placed in the same MQ queue.

Is there a smart way to solve this using ServiceStack?

I have some ideas of possible work-arounds but I’d like to know if this can be solved in a better and smarter way.

2

Answers


  1. Chosen as BEST ANSWER

    I just wanted to share one workaround, which might not be the most beautiful, but seems to work. I am still interested if there are better ways to do this.

    Publisher:

    The publisher assigns a RequestFilter to the RedisMqServer, and in that method, modifies the .Body, replacing the wrapper with the actual request.

    The Publisher then calls .RegisterHandler once for the response wrapper class, and then for each actual/real Handler as is intended. This will result in the correct Service handler to be called:

        public RedisClient(string name)
        {
            Name = name;
            redisFactory = new PooledRedisClientManager("localhost:6379");
            mqServer = new RedisMqServer(redisFactory, retryCount: 2);
    
            mqServer.RequestFilter = RequestFilter;
    
            // Response wrapper, ContainerResponse implements IProtocolContainer
            mqServer.RegisterHandler<ContainerResponse>(m => 
            {
                return m;
            });
    
            mqServer.RegisterHandler<GetSomething>(m =>
            {
                // m.Body is here an GetSomething
                return null;
            });
            mqServer.Start();
        }
    
        private ServiceStack.Messaging.IMessage RequestFilter(ServiceStack.Messaging.IMessage message)
        {
            if (message.Body is IProtocolContainer protocolContainer)
            {
                message.Body = protocolContainer.TheRequest;
            }
            return message;
        }
    
        public void AddMessage<T>(T theRequest) where T : CoreRequest
        {
            using (var mqClient = mqServer.CreateMessageQueueClient())
            {
                mqClient.Publish(new ContainerRequest(theRequest));
            }
        }
    }
    

    Consumer:

    The same principle goes for the consumer:

        public override void Configure(Container container)
        {
            container.Register(new ConsumerInfo() { Name = ServiceName });
    
            var redisFactory = new PooledRedisClientManager("localhost:6379");
            container.Register<IRedisClientsManager>(redisFactory);
            var mqHost = new RedisMqServer(redisFactory, retryCount: 2);
    
            mqHost.RequestFilter = RequestFilter;
            mqHost.ResponseFilter = ResponseFilter;
    
            mqHost.RegisterHandler<ContainerRequest>(base.ExecuteMessage);
            mqHost.RegisterHandler<GetSomething>(base.ExecuteMessage);
            mqHost.Start();
        }
    
        private object ResponseFilter(object arg)
        {
            return new ContainerResponse(arg as CoreRequest);
        }
    
        private ServiceStack.Messaging.IMessage RequestFilter(ServiceStack.Messaging.IMessage message)
        {
            if (message.Body is IProtocolContainer protocolContainer)
            {
                System.Diagnostics.Debug.WriteLine($"tReplaced Body with {protocolContainer.TheRequest.GetType().Name}");
                message.Body = protocolContainer.TheRequest;
            }
            return message;
        }
    }
    

  2. The ServiceStack MQ Message Workflow is defined in the docs:

    ServiceStack MQ Message Workflow

    And explains what happens when:

    Messages with no responses are sent to ‘.outq’ Topic

    Messages with Responses are published to the Response .inq

    Responses from Messages with ReplyTo are published to that address

    Messages with exceptions are re-tried then published to .dlq dead-letter-queue

    So returning the Request DTO puts it in the INQ of that Request DTO which will be executed by the handler registered to handle it, which in this case happens to be itself, thus the loop. In Redis MQ Services returning null or void publishes it to Redis’s transient/rolling Response DTO .outq.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search