Richard Bucker

Pyzmq - not a lot of best practices

Posted at — Jun 29, 2011

ZeroMQ is a Message Queue (MQ) framework that plainly works. Two of the most interesting elements are 1) ZMQ supports a number of client languages; 2) the broker (generally an application that exists to route traffic from a producer to a consumer) is left to the programmer to implement.If you’ve read or used any of the other MQs or if you’ve done some interprocess communication (IPC) before you probably have a good or general idea how this is supposed to work. RabbitMQ does a really good job naming the different patterns and keeping the list to something manageable. While the ZMQ doc is long, detailed, absent of examples for each of the client languages, examples are buggy or old, the examples are simple; but they have many more patterns than RMQ.One idea that keeps getting trapped in my head is; How do I send a request to the broker and wait for a response. And if the response does not arrive, then what? Basically I’m looking for a best practice here.In my application design I have the following stack:json_rpc -> broker -> worker -> remote_http a user application sends my stack a json-rpc call the call is forwarded to the broker the broker routes the request to a worker the worker forwards the request to a remote service provider and whatever response is returned… bubbles up through the call stackSo in my json-rpc application I have a module that waits for requests, the message is authenticated, and the input data is validated. Since the response time can be between 250ms and 90secs we’ll keep the socket open and wait for the reply. The challenge here is getting the json-rps app to make the request, detect errors, handle certain errors, and forward the request to the broker, wait for a reply, parse the response message, and return to the caller.Here is the pseudo code:retry = 3zmq.send(socket, request, zmq.NOBLOCK)while True: # wait 7 seconds between boll timeouts socks = dict(poller.poll(7000)) try: if socks.get(service) == zmq.POLLIN: # we got a message # NOBLOCK here almost makes no sense other than we want to get # all errors and we do not want to block at all # any errors will generate an exception # we could assert against an empty response but why? rsp = zmq.recv(socket, zmq.NOBLOCK)  else: # timeout retry = retry - 1 except zmq.core.error.ZMQError, e: retry = retry - 1 if zmq.EFSM == e.errno: # wrong state pass elif zmq.EAGAIN == e.errno # no data pass else: retry = 0 if retry <= 0: # reconnect to the broker service.close() . . . retry = 3 continueThe thing to keep in mind is that ZMQ will not return from a send or receive unless something good happened. That means that you can expect an exception to be fired if your message did not get out, the response was not received, some other app in the stack restarted and thus changed the state of the socket. This makes sending messages reliably very simple… even though my post is even simpler than than.