Skip to content

Commit

Permalink
zmq publisher after being opened then closed now returns to a useful …
Browse files Browse the repository at this point in the history
…state rather than dying. Fixes #110
  • Loading branch information
cboulay committed Apr 21, 2024
1 parent 2682137 commit 1f32a03
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions extensions/ezmsg-zmq/src/ezmsg/zmq/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,34 +185,35 @@ async def socket_monitor(self) -> None:
if event == zmq.EVENT_CONNECTED:
self.socket_open = True
elif event == zmq.EVENT_CLOSED:
was_open = self.socket_open
# was_open = self.socket_open
self.socket_open = False
# ZMQ seems to be sending spurious CLOSED event when we
# try to connect before the source is running. Only give up
# if we were previously connected. If we give up now, we
# will never unblock zmq_publisher.
if was_open:
break
elif event in (
zmq.EVENT_DISCONNECTED,
zmq.EVENT_MONITOR_STOPPED,
):
# if was_open:
# break
elif event == zmq.EVENT_DISCONNECTED:
self.socket_open = False
# break
elif event == zmq.EVENT_MONITOR_STOPPED:
self.socket_open = False
break

@ez.publisher(OUTPUT)
async def zmq_publisher(self) -> AsyncGenerator:
# Wait for socket connection
while not self.socket_open:
await asyncio.sleep(POLL_TIME)

while self.socket_open:
poll_result = await self.STATE.socket.poll(
self.SETTINGS.poll_time * 1000, zmq.POLLIN
)
if poll_result:
if self.SETTINGS.multipart is True:
_, data = await self.STATE.socket.recv_multipart()
else:
data = await self.STATE.socket.recv()
yield self.OUTPUT, ZMQMessage(data)
while True:
# Wait for socket connection
if not self.socket_open:
await asyncio.sleep(POLL_TIME)

if self.socket_open:
poll_result = await self.STATE.socket.poll(
self.SETTINGS.poll_time * 1000, zmq.POLLIN
)
if poll_result:
if self.SETTINGS.multipart is True:
_, data = await self.STATE.socket.recv_multipart()
else:
data = await self.STATE.socket.recv()
yield self.OUTPUT, ZMQMessage(data)

0 comments on commit 1f32a03

Please sign in to comment.