skip to Main Content

I created two programs to send and receive video feed using ZeroMQ. However, the receiving program always gets stuck on the .recv()-method.

I have used two libraries of ZeroMQ for this program: one, the native zmq, the other a derived imagezmq. The imagezmq is used for sending and receiving frame data from the video while the native zmq library is used for sending and receiving the time, when the image has been sent.

The imagezmq part works fine.

The program only gets stuck on the zmq part.

Following are my two programs :

FinalCam.py

import struct
import time
import imutils
import imagezmq
import cv2
import zmq    
import socket
import pickle

#                                         # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')

hostName = socket.gethostname()           # send RPi hostname with each image

vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir)           # init the camera

context = zmq.Context()                   # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")

while True:                               # send images as stream until Ctrl-C

    ret, frame = cap.read()
    frame = imutils.resize(frame, width=400)  # resize without compressionq

    captureTime = time.time()
    sender.send_image(hostName, frame)
    print (captureTime)
    captureTime = struct.pack('d', captureTime)
    #msg = pickle.dumps(captureTime)
    print("message primed")
    socket.send(captureTime)
    print("time sent")

which generated this output :

1591824603.5772414
message primed
time sent

FinalRecieve.py

import cv2
import imagezmq
import time
import zmq
import struct

FRAMES = 5
image_hub = imagezmq.ImageHub()           # image socket

context = zmq.Context()                   #  time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")

while True:                               # show streamed images until Ctrl-C

    loopTime = time.time()

    for i in range (0, FRAMES):
        hostName, frame = image_hub.recv_image()
        image_hub.send_reply(b'OK')
        print("recieved image, waiting for time")
        captureTime = socket.recv()
        print("meow")
        print(captureTime)

    finishTime = time.time()
    fpsTime = finishTime - loopTime

    fps = FRAMES / fpsTime
    print(fps)

which generated this output :

received image, waiting for time

2

Answers


  1. Here’s a couple of things to try to get the native-zmq parts working:

    Use .connect()-method for SUB-sockets :
    socket.connect("tcp://localhost:6666")

    And .bind()-method for your PUB-sockets :
    socket.bind("tcp://*:6666")

    It’s explained here in the guide that connect should be used to create an outgoing connection from a socket.

    In the sibling doc for .bind(), it explains that it’s for accepting connections.

    Also try setting socket options : socket.setsockopt(zmq.SUBSCRIBE, "")

    It is described here in the guide that the SUB-sockets initially filter out all messages, so that’d explain why you’re not receiving anything. The example above provides an empty filter, which accepts all incoming messages.

    It’s important to note that with PUB and SUB based distribution, that the SUB might not necessarily receive messages due to timing of its connection or network conditions. E.g. everything sent from the publisher before the subscriber connects isn’t receivable

    Login or Signup to reply.
  2. The Problem Definition :

    "The imagezmq part works fine. The program only gets stuck on the zmq part. "


    Solution ?

    ZeroMQ is so smart ( since v2.0+ ) that is does not require anyone to be rigid on whether to { .bind() | .connect() } at all and one AccessPoint may freely .bind() some and also .connect() other TransportClass-channels for 1:N communiation topologies as liked & needed ( in other words, the "reversed" .bind()/.connect() is always possible ).

    Not so the ImageZMQ-spin-off module. There are set of hard-wired choices made behind your domain of control (and any hardcore hijacking the Class-internal attributes { ImageHub.zmq_socket | ImageSender.zmq_socket } was hardly a dream of the ImageZMQ authors ( architecture-(co)-authors ) ).

    Given the imagezmq published internalities are themselves pre-decided on the hardcoded choices ( the same Class instances, depending on mode, sometimes .bind() and .connect() otherwise ) was declared as working, lets focus on using it to its (published) maximum.

    Based on the said, compose your working parts so as to send the time "inside" the working scheme :

    PAYLOAD_MASK = "{0:}|||{1:}"
    ...
    while ...
          sender.send_image( PAYLOAD_MASK.format( time.time(), hostName ),
                             frame
                             )
    

    and may easily decode accordingly on .recv()-side, without a need to repair the used struct.pack() problems ( In , the more in ZeroMQ interconnected worlds, one does never know, what the remote platform is, the less what byte-ordering ( Endian-convention ) will be assumed "there", so struct.pack() shall always explicitly declare Endian-type in the format-string. Always. If not sure, one may design a self-detecting message, that may salvage such naive uses and test / adjust your format-string for your local-side usage for .unpack()-method accordingly for either case of any implicitly blind .pack()-sender … goes beyond the scope of this post, yet all RPi / non-RPi platform projects are prone to this (only)-assumed-"same"-Endian caveat )


    Surprise :

    If using pyzmq version 18.., which is not listed in the compatibility list of the ImageZMQ and the use-case is there doomed to crash or cause hidden troubles.

    So one ought rather start with zmq.pyzmq_version() checks (a professional policy in Configuration Management for controlled environments, isn’t it?) and catch & resolve any non-matching cases.


    Why the zmq part did not work ?

    Hard to say. Without a full code, one may just guess. My candidate would be a wrong / missing termination with zmq.LINGER not explicitly set to zero, which causes hung-up Context()-instance(s) that block the actual resources until reboot. A proper use of the ZeroMQ tools reflects these methods of defensive-programming ( LINGER, CONFLATE, MAXMSGSIZE, IMMEDIATE, white-listing and many other defensive .setsockopt() and Context()-parametrisation options ), because is complex and may broke on many places outside of your domain of control or Line-of-Sight.

    So, do not hesitate to become a defensively-programming designer, if your architected systems are to become robust and self-healing.

    aLocalCONTEXT = zmq.Context( nIOthreads )
    aSUBsocket = aLocalCONTEXT.socket( zmq.SUB )
    try: 
          aSUBsocket.bind( "tcp://*:6666" )
    except:
          ...
    aSUBsocket.setsockopt( zmq.LINGER,      0 )
    aSUBsocket.setsockopt( zmq.SUBSCRIBE, b"" )
    aSUBsocket.setsockopt( zmq.MAXMSGSIZE, ...)
    ...
    

    Best re-read the native ZeroMQ API documentation about tuning other ISO-OSI-L2/L3 related paramters for best performance and safest strategies. Worth the time doing this indeed for each new API update, all the way since the v2.1+…


    A Counterexample of This ?

    Look at how .send_reply() method is unprotected from crashing for non-REQ-REP instances of its own ImageHub Class.

    Why?

    The so far non-protected call to the .send_reply()-method will crash the application once any such call for any ImageHub-class instance, that was initialised in it non-default REQ_REP = False ( i.e. on a SUB-side of the PUB/SUB mode available ).

    aNonDefaultImageHUB_instance.send( "this will crash" )
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/ms/anaconda2/lib/python2.7/site-packages/zmq/sugar/socket.py", line 395, in send
        return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
      File "zmq/backend/cython/socket.pyx", line 725, in zmq.backend.cython.socket.Socket.send
      File "zmq/backend/cython/socket.pyx", line 772, in zmq.backend.cython.socket.Socket.send
      File "zmq/backend/cython/socket.pyx", line 247, in zmq.backend.cython.socket._send_copy
      File "zmq/backend/cython/socket.pyx", line 242, in zmq.backend.cython.socket._send_copy
      File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
    zmq.error.ZMQError: Operation not supported
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search