I created two programs to send and receive video feed using ZeroMQ. However, the receiving program always gets stuck on the .recv()
-method.
I have used two libraries of ZeroMQ for this program: one, the native zmq
, the other a derived imagezmq
. The imagezmq
is used for sending and receiving frame data from the video while the native zmq
library is used for sending and receiving the time, when the image has been sent.
The imagezmq
part works fine.
The program only gets stuck on the zmq
part.
Following are my two programs :
FinalCam.py
import struct
import time
import imutils
import imagezmq
import cv2
import zmq
import socket
import pickle
# # image sending
sender = imagezmq.ImageSender(connect_to='tcp://localhost:5555')
hostName = socket.gethostname() # send RPi hostname with each image
vid_dir = "/root/redis-opencv-videostream/vtest.avi"
cap = cv2.VideoCapture(vid_dir) # init the camera
context = zmq.Context() # setup for sending time
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:6666")
while True: # send images as stream until Ctrl-C
ret, frame = cap.read()
frame = imutils.resize(frame, width=400) # resize without compressionq
captureTime = time.time()
sender.send_image(hostName, frame)
print (captureTime)
captureTime = struct.pack('d', captureTime)
#msg = pickle.dumps(captureTime)
print("message primed")
socket.send(captureTime)
print("time sent")
which generated this output :
1591824603.5772414
message primed
time sent
FinalRecieve.py
import cv2
import imagezmq
import time
import zmq
import struct
FRAMES = 5
image_hub = imagezmq.ImageHub() # image socket
context = zmq.Context() # time socket
socket = context.socket(zmq.SUB)
socket.bind("tcp://*:6666")
while True: # show streamed images until Ctrl-C
loopTime = time.time()
for i in range (0, FRAMES):
hostName, frame = image_hub.recv_image()
image_hub.send_reply(b'OK')
print("recieved image, waiting for time")
captureTime = socket.recv()
print("meow")
print(captureTime)
finishTime = time.time()
fpsTime = finishTime - loopTime
fps = FRAMES / fpsTime
print(fps)
which generated this output :
received image, waiting for time
2
Answers
Here’s a couple of things to try to get the native-
zmq
parts working:Use
.connect()
-method forSUB
-sockets :socket.connect("tcp://localhost:6666")
And
.bind()
-method for yourPUB
-sockets :socket.bind("tcp://*:6666")
It’s explained here in the guide that connect should be used to create an outgoing connection from a socket.
In the sibling doc for
.bind()
, it explains that it’s for accepting connections.Also try setting socket options :
socket.setsockopt(zmq.SUBSCRIBE, "")
It is described here in the guide that the
SUB
-sockets initially filter out all messages, so that’d explain why you’re not receiving anything. The example above provides an empty filter, which accepts all incoming messages.It’s important to note that with
PUB
andSUB
based distribution, that theSUB
might not necessarily receive messages due to timing of its connection or network conditions. E.g. everything sent from the publisher before the subscriber connects isn’t receivableThe Problem Definition :
Solution ?
ZeroMQ is so smart ( since v2.0+ ) that is does not require anyone to be rigid on whether to
{ .bind() | .connect() }
at all and one AccessPoint may freely.bind()
some and also.connect()
otherTransportClass
-channels for1:N
communiation topologies as liked & needed ( in other words, the "reversed".bind()/.connect()
is always possible ).Not so the
ImageZMQ
-spin-off module. There are set of hard-wired choices made behind your domain of control (and any hardcore hijacking the Class-internal attributes{ ImageHub.zmq_socket | ImageSender.zmq_socket }
was hardly a dream of the ImageZMQ authors ( architecture-(co)-authors ) ).Given the
imagezmq
published internalities are themselves pre-decided on the hardcoded choices ( the same Class instances, depending on mode, sometimes.bind()
and.connect()
otherwise ) was declared as working, lets focus on using it to its (published) maximum.Based on the said, compose your working parts so as to send the time "inside" the working scheme :
and may easily decode accordingly on
.recv()
-side, without a need to repair the usedstruct.pack()
problems ( In distributed-computing, the more in ZeroMQ interconnected worlds, one does never know, what the remote platform is, the less what byte-ordering ( Endian-convention ) will be assumed "there", sostruct.pack()
shall always explicitly declare Endian-type in theformat
-string. Always. If not sure, one may design a self-detecting message, that may salvage such naive uses and test / adjust yourformat
-string for your local-side usage for.unpack()
-method accordingly for either case of any implicitly blind .pack()-sender … goes beyond the scope of this post, yet all RPi / non-RPi platform projects are prone to this (only)-assumed-"same"-Endian caveat )Surprise :
If using
pyzmq
version 18.., which is not listed in the compatibility list of the ImageZMQ and the use-case is there doomed to crash or cause hidden troubles.So one ought rather start with
zmq.pyzmq_version()
checks (a professional policy in Configuration Management for controlled environments, isn’t it?) and catch & resolve any non-matching cases.Why the
zmq
part did not work ?Hard to say. Without a full code, one may just guess. My candidate would be a wrong / missing termination with
zmq.LINGER
not explicitly set to zero, which causes hung-upContext()
-instance(s) that block the actual resources until reboot. A proper use of the ZeroMQ tools reflects these methods of defensive-programming (LINGER
,CONFLATE
,MAXMSGSIZE
,IMMEDIATE
, white-listing and many other defensive.setsockopt()
andContext()
-parametrisation options ), because distributed-computing is complex and may broke on many places outside of your domain of control or Line-of-Sight.So, do not hesitate to become a defensively-programming designer, if your architected systems are to become robust and self-healing.
Best re-read the native ZeroMQ API documentation about tuning other ISO-OSI-L2/L3 related paramters for best performance and safest distributed-computing strategies. Worth the time doing this indeed for each new API update, all the way since the v2.1+…
A Counterexample of This ?
Look at how
.send_reply()
method is unprotected from crashing for non-REQ-REP
instances of its ownImageHub
Class.Why?
The so far non-protected call to the
.send_reply()
-method will crash the application once any such call for anyImageHub
-class instance, that was initialised in it non-defaultREQ_REP = False
( i.e. on aSUB
-side of thePUB/SUB
mode available ).