[TIP] pytest, multiprocessing and pyzmq's ioloop

Stefan Scherfke stefan.scherfke at offis.de
Wed Oct 26 01:42:40 PDT 2011


Holger, Pere,

thanks for your answers. They helped me finding a solution that works 
quite well for me and doesn't impose any additional dependencies.

I just wrapped the zmq socket with another class. Its recv-method tries 
to call zmq's recv several times in non-blocking-mode and raises an 
exception after a while. That causes py.test to stop and shows me, which 
recv-call caused the error.

Here is the updated minimal example:

---
import multiprocessing

from zmq.eventloop import ioloop, zmqstream
import zmq


class EchoServer(multiprocessing.Process):
     """Simple echo server with a zmq ioloop."""
     def __init__(self):
         super(EchoServer, self).__init__()

         self.rep_stream = None

     def run(self):
         context = zmq.Context()
         loop = ioloop.IOLoop.instance()

         rep_sock = context.socket(zmq.REP)
         rep_sock.bind('tcp://127.0.0.1:7777')
         self.rep_stream = zmqstream.ZMQStream(rep_sock, loop)
         self.rep_stream.on_recv(self.handle_request)

         loop.start()

     def handle_request(self, msg):
         # Code where something might break so that send(msg[0]) wouldn't
         # be called.
         x += 1

         self.rep_stream.send(msg[0])


class TestSocket(object):
     def __init__(self, context, sock_type):
         self._context = context
         self._sock = context.socket(sock_type)

     def connect(self, *args, **kwargs):
         return self._sock.connect(*args, **kwargs)

     def send(self, *args, **kwargs):
         return self._sock.send(*args, **kwargs)

     def recv(self, *args, **kwargs):
         for i in range(100):
             try:
                 rep = req_sock.recv(zmq.NOBLOCK)
                 break
             except:
                 import time; time.sleep(0.01)
         else:
             raise zmq.ZMQError('Got no answer.')

         return rep


def pytest_funcarg__echo_server(request):
     """Creates echo server instances for the test(s)."""
     echo_server = EchoServer()

     # Terminate the process when done with the test
     def terminate():
         echo_server.terminate()
         echo_server.join()

     request.addfinalizer(terminate)

     return echo_server


def test_echo_server(echo_server):
     """Tests if the echo server responds properly."""
     echo_server.start()
     context = zmq.Context()
     req_sock = TestSocket(context, zmq.REQ)
     req_sock.connect('tcp://127.0.0.1:7777')

     req_sock.send(b'ohai')

     rep = req_sock.recv()
     assert rep == b'ohai'

---

Cheers,
Stefan

Am 25.10.2011 22:26, schrieb holger krekel:
> Hi Stefan,
>
> On Tue, Oct 25, 2011 at 14:34 +0200, Stefan Scherfke wrote:
>> Hello,
>>
>> I'm currently playing around with PyZMQ and trying to figure out how
>> to properly test PyZMQ apps that run as a `multiprocessing.Process`
>> and use PyZMQ’s ioloop.
>>
>> To test those apps, I create and start the process for them. I then
>> create a "client" socket and connect it with the "server" socket of
>> the process. I sent requests and assert that the reply I get is what
>> I expect it to be.
>>
>> If no exceptions occurs, everything runs fine. However, if the
>> `multiprocessing.Process` raises an exception and thus never sends a
>> reply, py.test waits forever without noticing the exception. The
>> only thing I can do about this is to run py.test with "-s" so that I
>> can see exception's traceback in the captured stderr and can then
>> abort pytest using Ctrl+C.
>
> hum, if there is no other way then to get error conditions from stderr
> then maybe capturing stderr and looking for exceptions makes sense?
> Maybe this could be achieved from your EchoServer funcarg factory by
> using "capfd = request.getfuncargvalue('capfd')" and then you
> can look at the 'err' in "out, err = capfd.readouterr()" from time
> to time.
>
> The other solution i can think of is to try use execnet to control
> your subprocess (http://codespeak.net/execnet).  The equivalent code
> for your test would do a "channel.receive()" where you currently
> do "rep_sock.recv()" but channel.receive() guarantees to raise
> an exception if the remote side has died or closed the socket.
>
> HTH,
> holger
>
>> Here is a minimal example for a pytest test with a simple echo
>> server. If you uncomment the line "x += 1", the server raises an
>> exception and causes the test to hang:
>>
>> ---
>> import multiprocessing
>>
>> from zmq.eventloop import ioloop, zmqstream
>> import zmq
>>
>>
>> class EchoServer(multiprocessing.Process):
>>      """Simple echo server with a zmq ioloop."""
>>      def __init__(self):
>>          super(EchoServer, self).__init__()
>>
>>          self.rep_stream = None
>>
>>      def run(self):
>>          context = zmq.Context()
>>          loop = ioloop.IOLoop.instance()
>>
>>          rep_sock = context.socket(zmq.REP)
>>          rep_sock.bind('tcp://127.0.0.1:7777')
>>          self.rep_stream = zmqstream.ZMQStream(rep_sock, loop)
>>          self.rep_stream.on_recv(self.handle_request)
>>
>>          loop.start()
>>
>>      def handle_request(self, msg):
>>          # Code where something might break so that send(msg[0]) wouldn't
>>          # be called.
>>          # x += 1
>>
>>          self.rep_stream.send(msg[0])
>>
>>
>> def pytest_funcarg__echo_server(request):
>>      """Creates echo server instances for the test(s)."""
>>      echo_server = EchoServer()
>>
>>      # Terminate the process when done with the test
>>      request.addfinalizer(echo_server.terminate)
>>
>>      return echo_server
>>
>>
>> def test_echo_server(echo_server):
>>      """Tests if the echo server responds properly."""
>>      echo_server.start()
>>
>>      context = zmq.Context()
>>      req_sock = context.socket(zmq.REQ)
>>      req_sock.connect('tcp://127.0.0.1:7777')
>>
>>      req_sock.send(b'ohai')
>>      rep = req_sock.recv()
>>      assert rep == b'ohai'
>>
>> ---
>>
>> I tried replacing the `req_sock.recv()` call with a loop and several
>> non-blocking calls and raise an exceptions if they all fail, but
>> that didn't really work:
>>
>> # Try to recv something for 1sec
>> for i in range(100):
>>      try:
>>          rep = req_sock.recv(zmq.NOBLOCK)
>>          break
>>      except zmq.ZMQError:
>>          time.sleep(0.01)
>> else:
>>      raise RuntimeError('Remote process seems to hang.')
>>
>>
>> I spent a few hours with googling for related problems, but didn't
>> find anything useful.
>> What can I do to improve my tests?
>>
>> Regards,
>> Stefan
>>
>> _______________________________________________
>> testing-in-python mailing list
>> testing-in-python at lists.idyll.org
>> http://lists.idyll.org/listinfo/testing-in-python




More information about the testing-in-python mailing list