The codes below show how I test the Redis stream functions.
And I found that different processes with the same consumer name are competing to consume messages in the same stream. In my understanding, if this performance is normal, Redis should not design a function to specify the consumer name.
Is there any problem with my understanding? Or am I using the wrong method?
import asyncio
import aioredis
# consumer with name "a", subscribing two streams
async def consume_a():
try:
while True:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xreadgroup(
"test_consumer_group",
"consumer_a",
streams={"test_stream": ">", "test_stream_1": ">"},
count=1,
block=10000,
noack=True,
)
print(res)
finally:
await redis.close()
consumer with name "b", subscribing two streams
async def consume_b():
try:
while True:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xreadgroup(
"test_consumer_group",
"consumer_b",
streams={"test_stream": ">", "test_stream_1": ">"},
count=1,
block=10000,
noack=True,
)
print(res)
finally:
await redis.close()
create group before runing script
async def config_group_0():
try:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xgroup_create("test_stream", "test_consumer_group")
print(res)
finally:
await redis.close()
async def config_group_1():
try:
redis = aioredis.from_url("redis://localhost:36379")
res = await redis.xgroup_create("test_stream_1", "test_consumer_group")
print(res)
finally:
await redis.close()
producers
async def produce_0():
try:
redis = aioredis.from_url("redis://localhost:36379")
i = 0
while i < 100:
res = await redis.xadd(
"test_stream",
{"domain_name": "test_domain_name_0", "sid": 0},
maxlen=5,
)
print(res)
i += 1
finally:
await redis.close()
async def produce_1():
try:
redis = aioredis.from_url("redis://localhost:36379")
i = 0
while i < 100:
res = await redis.xadd(
"test_stream_1",
{"domain_name": "test_domain_name_1", "sid": 1},
maxlen=2,
)
print(res)
i += 1
finally:
await redis.close()
test code
if __name__ == "__main__":
# two coroutines consume messages from two streams with the same consumer name
asyncio.run(asyncio.gather(consume_a(), consume_a(), produce_0(), produce_1()))
2
Answers
Base on the Redis document:
Read these document for more information:
for consumer to get self PEL, or compete and repeated consume PEL