[TIP] pytest, multiprocessing and pyzmq's ioloop
Stefan Scherfke
stefan.scherfke at offis.de
Tue Oct 25 05:34:04 PDT 2011
Hello,
I'm currently playing around with PyZMQ and trying to figure out how to
properly test PyZMQ apps that run as a `multiprocessing.Process` and use
PyZMQ’s ioloop.
To test those apps, I create and start the process for them. I then
create a "client" socket and connect it with the "server" socket of the
process. I sent requests and assert that the reply I get is what I
expect it to be.
If no exceptions occurs, everything runs fine. However, if the
`multiprocessing.Process` raises an exception and thus never sends a
reply, py.test waits forever without noticing the exception. The only
thing I can do about this is to run py.test with "-s" so that I can see
exception's traceback in the captured stderr and can then abort pytest
using Ctrl+C.
Here is a minimal example for a pytest test with a simple echo server.
If you uncomment the line "x += 1", the server raises an exception and
causes the test to hang:
---
import multiprocessing
from zmq.eventloop import ioloop, zmqstream
import zmq
class EchoServer(multiprocessing.Process):
"""Simple echo server with a zmq ioloop."""
def __init__(self):
super(EchoServer, self).__init__()
self.rep_stream = None
def run(self):
context = zmq.Context()
loop = ioloop.IOLoop.instance()
rep_sock = context.socket(zmq.REP)
rep_sock.bind('tcp://127.0.0.1:7777')
self.rep_stream = zmqstream.ZMQStream(rep_sock, loop)
self.rep_stream.on_recv(self.handle_request)
loop.start()
def handle_request(self, msg):
# Code where something might break so that send(msg[0]) wouldn't
# be called.
# x += 1
self.rep_stream.send(msg[0])
def pytest_funcarg__echo_server(request):
"""Creates echo server instances for the test(s)."""
echo_server = EchoServer()
# Terminate the process when done with the test
request.addfinalizer(echo_server.terminate)
return echo_server
def test_echo_server(echo_server):
"""Tests if the echo server responds properly."""
echo_server.start()
context = zmq.Context()
req_sock = context.socket(zmq.REQ)
req_sock.connect('tcp://127.0.0.1:7777')
req_sock.send(b'ohai')
rep = req_sock.recv()
assert rep == b'ohai'
---
I tried replacing the `req_sock.recv()` call with a loop and several
non-blocking calls and raise an exceptions if they all fail, but that
didn't really work:
# Try to recv something for 1sec
for i in range(100):
try:
rep = req_sock.recv(zmq.NOBLOCK)
break
except zmq.ZMQError:
time.sleep(0.01)
else:
raise RuntimeError('Remote process seems to hang.')
I spent a few hours with googling for related problems, but didn't find
anything useful.
What can I do to improve my tests?
Regards,
Stefan
More information about the testing-in-python
mailing list