[TIP] pytest, multiprocessing and pyzmq's ioloop

Stefan Scherfke stefan.scherfke at offis.de
Tue Oct 25 05:34:04 PDT 2011


Hello,

I'm currently playing around with PyZMQ and trying to figure out how to 
properly test PyZMQ apps that run as a `multiprocessing.Process` and use 
PyZMQ’s ioloop.

To test those apps, I create and start the process for them. I then 
create a "client" socket and connect it with the "server" socket of the 
process. I sent requests and assert that the reply I get is what I 
expect it to be.

If no exceptions occurs, everything runs fine. However, if the 
`multiprocessing.Process` raises an exception and thus never sends a 
reply, py.test waits forever without noticing the exception. The only 
thing I can do about this is to run py.test with "-s" so that I can see 
exception's traceback in the captured stderr and can then abort pytest 
using Ctrl+C.

Here is a minimal example for a pytest test with a simple echo server. 
If you uncomment the line "x += 1", the server raises an exception and 
causes the test to hang:

---
import multiprocessing

from zmq.eventloop import ioloop, zmqstream
import zmq


class EchoServer(multiprocessing.Process):
     """Simple echo server with a zmq ioloop."""
     def __init__(self):
         super(EchoServer, self).__init__()

         self.rep_stream = None

     def run(self):
         context = zmq.Context()
         loop = ioloop.IOLoop.instance()

         rep_sock = context.socket(zmq.REP)
         rep_sock.bind('tcp://127.0.0.1:7777')
         self.rep_stream = zmqstream.ZMQStream(rep_sock, loop)
         self.rep_stream.on_recv(self.handle_request)

         loop.start()

     def handle_request(self, msg):
         # Code where something might break so that send(msg[0]) wouldn't
         # be called.
         # x += 1

         self.rep_stream.send(msg[0])


def pytest_funcarg__echo_server(request):
     """Creates echo server instances for the test(s)."""
     echo_server = EchoServer()

     # Terminate the process when done with the test
     request.addfinalizer(echo_server.terminate)

     return echo_server


def test_echo_server(echo_server):
     """Tests if the echo server responds properly."""
     echo_server.start()

     context = zmq.Context()
     req_sock = context.socket(zmq.REQ)
     req_sock.connect('tcp://127.0.0.1:7777')

     req_sock.send(b'ohai')
     rep = req_sock.recv()
     assert rep == b'ohai'

---

I tried replacing the `req_sock.recv()` call with a loop and several 
non-blocking calls and raise an exceptions if they all fail, but that 
didn't really work:

# Try to recv something for 1sec
for i in range(100):
     try:
         rep = req_sock.recv(zmq.NOBLOCK)
         break
     except zmq.ZMQError:
         time.sleep(0.01)
else:
     raise RuntimeError('Remote process seems to hang.')


I spent a few hours with googling for related problems, but didn't find 
anything useful.
What can I do to improve my tests?

Regards,
Stefan



More information about the testing-in-python mailing list