Skip to content
This repository has been archived by the owner on Apr 13, 2024. It is now read-only.

Commit

Permalink
add test with multiple subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed Nov 21, 2019
1 parent 84b3af5 commit f6a8e39
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 21 deletions.
2 changes: 1 addition & 1 deletion messaging/impl_zmq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Message * ZMQSubSocket::receive(bool non_blocking){
// Make a copy to ensure the data is aligned
r = new ZMQMessage;
r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
}
}

zmq_msg_close(&msg);
return r;
Expand Down
2 changes: 1 addition & 1 deletion messaging/messaging.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ cdef extern from "messaging.hpp":
@staticmethod
Poller * create()
void registerSocket(SubSocket *)
vector[SubSocket*] poll(int)
vector[SubSocket*] poll(int) nogil
9 changes: 8 additions & 1 deletion messaging/messaging_pyx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ cdef class Context:
def __cinit__(self):
self.context = cppContext.create()

def term(self):
del self.context
self.context = NULL

def __dealloc__(self):
pass
# Deleting the context will hang if sockets are still active
Expand All @@ -43,8 +47,11 @@ cdef class Poller:

def poll(self, timeout):
sockets = []
cdef int t = timeout

with nogil:
result = self.poller.poll(t)

result = self.poller.poll(timeout)
for s in result:
socket = SubSocket()
socket.setPtr(s)
Expand Down
61 changes: 43 additions & 18 deletions messaging/tests/test_poller.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import unittest
import os
import time
import cereal.messaging as messaging

from multiprocessing import Process, Pipe
import concurrent.futures


def poller(pipe):
def poller():
context = messaging.Context()

p = messaging.Poller()

sub = messaging.SubSocket()
sub.connect(context, 'controlsState')

p = messaging.Poller()
p.registerSocket(sub)

while True:
pipe.recv()
socks = p.poll(1000)
r = [s.receive(non_blocking=True) for s in socks]

socks = p.poll(1000)
pipe.send([s.receive(non_blocking=True) for s in socks])
return r


class TestPoller(unittest.TestCase):
Expand All @@ -28,19 +28,44 @@ def test_poll_once(self):
pub = messaging.PubSocket()
pub.connect(context, 'controlsState')

pipe, pipe_child = Pipe()
proc = Process(target=poller, args=(pipe_child,))
proc.start()
with concurrent.futures.ThreadPoolExecutor() as e:
poll = e.submit(poller)

time.sleep(0.1) # Slow joiner syndrome

# Send message
pub.send("a")

# Wait for poll result
result = poll.result()

del pub
context.term()

self.assertEqual(result, [b"a"])

@unittest.skipIf(os.environ.get('MSGQ'), "fails under msgq")
def test_poll_and_create_many_subscribers(self):
context = messaging.Context()

pub = messaging.PubSocket()
pub.connect(context, 'controlsState')

with concurrent.futures.ThreadPoolExecutor() as e:
poll = e.submit(poller)

time.sleep(.1)
time.sleep(0.1) # Slow joiner syndrome
c = messaging.Context()
for _ in range(10):
messaging.SubSocket().connect(c, 'controlsState')

# Start poll
pipe.send("go")
# Send message
pub.send("a")

# Send message
pub.send("a")
# Wait for poll result
result = poll.result()

result = pipe.recv()
proc.kill()
del pub
context.term()

self.assertEqual(result, [b"a"])

0 comments on commit f6a8e39

Please sign in to comment.