skip to Main Content

The codes below show how I test the Redis stream functions.

And I found that different processes with the same consumer name are competing to consume messages in the same stream. In my understanding, if this performance is normal, Redis should not design a function to specify the consumer name.

Is there any problem with my understanding? Or am I using the wrong method?

import asyncio
import aioredis

# consumer with name "a", subscribing two streams
async def consume_a():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_a",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

consumer with name "b", subscribing two streams

async def consume_b():
    try:
        while True:
            redis = aioredis.from_url("redis://localhost:36379")
            res = await redis.xreadgroup(
                "test_consumer_group",
                "consumer_b",
                streams={"test_stream": ">", "test_stream_1": ">"},
                count=1,
                block=10000,
                noack=True,
            )
            print(res)
    finally:
        await redis.close()

create group before runing script

async def config_group_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

async def config_group_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
        print(res)
    finally:
        await redis.close()

producers

async def produce_0():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream",
                {"domain_name": "test_domain_name_0", "sid": 0},
                maxlen=5,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

async def produce_1():
    try:
        redis = aioredis.from_url("redis://localhost:36379")
        i = 0
        while i < 100:
            res = await redis.xadd(
                "test_stream_1",
                {"domain_name": "test_domain_name_1", "sid": 1},
                maxlen=2,
            )
            print(res)
            i += 1
    finally:
        await redis.close()

test code

if __name__ == "__main__":
    # two coroutines consume messages from two streams with the same consumer name
    asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))

2

Answers


  1. Base on the Redis document:

    One of the guarantees of consumer groups is that a given consumer can only see the history of messages that were delivered to it, so a message has just a single owner.

    Read these document for more information:

    Login or Signup to reply.
  2. for consumer to get self PEL, or compete and repeated consume PEL

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search