skip to Main Content

A system I’m working on currently is receiving messages in redis. I would like to write a python script that can read in those messages , modify them and return them to a different channel/key in redis.

For example:

 redis-cli  -a 'redis' monitor | grep '"pushName"'

will return :

1707963448.404618 [0 192.168.86.75:51520] "LPUSH" "pushName" "{"uuid":"be70"...,{}}

How do I subscribe to get the messages from pushName because when I try to do this

mobile = r.pubsub()
mobile.subscribe('pushName')
for message in mobile.listen():
    print(message)

Nothing displays execept for :

{'type': 'subscribe', 'pattern': None, 'channel': 'pushName', 'data': 1}

. I already know my connection criteria is fine because I can get a list of the channels when I do this:

index_list = []
for key in r.scan_iter('*'):
    index_list.append(key)

But messages are flying when I do :

 redis-cli  -a 'redis' monitor | grep '"pushName"'

2

Answers


  1. You seem to be confusing debug output with messages, with Redis LISTs and with Redis Pub/Sub.

    If your other (real) code is doing LPUSH operations it is appending items to a Redis LIST.

    Those LPUSH operations will show up in your redis-cli monitor command because that is a debugging tool showing you all Redis internal operations.

    The LPUSH commands (which operate on a Redis LIST) will not show up when you subscribe to Redis Pub/Sub because that is a completely separate feature of Redis and you will only see messages when you subscribe to a topic and when somebody publishes on that topic… but nobody is doing any publishing as such, they are only doing LPUSH to a LIST.

    Login or Signup to reply.
  2. Maybe you could adapt things so they work more like you are expecting. If you want to be notified every time there is an LPUSH onto a LIST, you could change your LPUSH to a MULTI/EXEC transaction that does the LPUSH and then PUBLISHES a message to notify all interested parties in one single transaction . That would make your sender look like this:

    #!/usr/bin/env python3
    
    from time import sleep
    import redis
    import numpy as np
    
    # Redis connection
    r = redis.Redis(host='192.168.0.10', port=6379, db=0)
    
    # Pub/sub connection
    p = r.pubsub()
    
    # Empty list at start of each run
    r.delete('myList')
    
    # Push 10 items to Redis LIST, publishing that they are available
    N = 10
    for i in range(N):
        print(f'DEBUG: Publishing item {i} and notifying subscribers')
        # Start pipeline, i.e. MULTI/EXEC transaction
        pipe = r.pipeline()
        pipe.lpush('myList', i)
        pipe.publish('notifications',f'LPUSHed to myList')
        pipe.execute()
        sleep(1)
    

    Then your receiver might look like this, subscribing to the notifications stream, and grabbing the LIST when notified:

    #!/usr/bin/env python3
    
    from time import time, sleep
    import redis
    import numpy as np
    
    def msgHandler(message):
        """Called whenever a message is published"""
        print(message)
        # See what's in our LIST
        items = r.lrange('myList',0,100)
        print(items)
        # You could put the items into a regular Python queue here for main program to read
    
    # Redis connection
    r = redis.Redis(host='192.168.0.10', port=6379, db=0)
    
    # Subscribe to notifications and register callback for when messages arrive
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe(**{'notifications': msgHandler})
    # Start a background message receiver
    # See https://redis.readthedocs.io/en/stable/advanced_features.html
    thread = pubsub.run_in_thread(sleep_time=0.001)
    
    # Do something else for 15 seconds
    endTime = time() + 15
    while time() < endTime:
        print('Waiting for data:')
        sleep(0.3)
    
    # Stop the redis pub/sub listener
    thread.stop()
    

    When you run the receiver, it looks like this:

    Waiting for data:
    Waiting for data:
    {'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
    [b'0']
    Waiting for data:
    Waiting for data:
    Waiting for data:
    Waiting for data:
    {'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
    [b'1', b'0']
    Waiting for data:
    Waiting for data:
    Waiting for data:
    {'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
    [b'2', b'1', b'0']
    ...
    ...
    Waiting for data:
    Waiting for data:
    {'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
    [b'9', b'8', b'7', b'6', b'5', b'4', b'3', b'2', b'1', b'0']
    Waiting for data:
    

    Note that there are actually 3 ways of waiting for messages you are subscribed to and they are described here. Basically, you can use:

    while True:
        message = p.get_message()
        if message:
            # do something with the message
        time.sleep(0.001)  # be nice to the system :)
    

    Or you can use:

    for message in p.listen():
        # do something with the message
    

    Or you can use a thread like I did.


    Note that I am using a pipeline (MULTI/EXEC) for 2 reasons:

    • it is an atomic transaction, so we know the item will be pushed on the list before the PUBLISH
    • there is less overhead/latency because it is a single round-trip to the server, rather than one round-trip per operation

    Note that you can chain the pipeline elements together, so that the 4-line pipeline in my sender code becomes:

    r.pipeline().lpush('myList', i).publish('notifications',f'LPUSHed to myList').execute()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search