diff --git a/messaging/__init__.py b/messaging/__init__.py index d5baaf467d8917..0b1204b47a550a 100644 --- a/messaging/__init__.py +++ b/messaging/__init__.py @@ -125,9 +125,8 @@ def recv_one_retry(sock: SubSocket) -> capnp.lib.capnp._DynamicStructReader: return log.Event.from_bytes(dat) class SubMaster(): - def __init__(self, services: List[str], ignore_alive: Optional[List[str]] = None, - addr:str ="127.0.0.1"): - self.poller = Poller() + def __init__(self, services: List[str], poll: Optional[List[str]] = None, + ignore_alive: Optional[List[str]] = None, addr:str ="127.0.0.1"): self.frame = -1 self.updated = {s: False for s in services} self.rcv_time = {s: 0. for s in services} @@ -136,8 +135,12 @@ def __init__(self, services: List[str], ignore_alive: Optional[List[str]] = None self.sock = {} self.freq = {} self.data = {} - self.logMonoTime = {} self.valid = {} + self.logMonoTime = {} + + self.poller = Poller() + self.non_polled_services = [s for s in services if poll is not None and + len(poll) and s not in poll] if ignore_alive is not None: self.ignore_alive = ignore_alive @@ -146,14 +149,14 @@ def __init__(self, services: List[str], ignore_alive: Optional[List[str]] = None for s in services: if addr is not None: - self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True) + p = self.poller if s not in self.non_polled_services else None + self.sock[s] = sub_sock(s, poller=p, addr=addr, conflate=True) self.freq[s] = service_list[s].frequency try: data = new_message(s) except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member - # lists - data = new_message(s, 0) + data = new_message(s, 0) # lists self.data[s] = getattr(data, s) self.logMonoTime[s] = 0 @@ -166,10 +169,13 @@ def update(self, timeout: int = 1000) -> None: msgs = [] for sock in self.poller.poll(timeout): msgs.append(recv_one_or_none(sock)) + + # non-blocking receive for non-polled sockets + for s in self.non_polled_services: + msgs.append(recv_one_or_none(self.sock[s])) self.update_msgs(sec_since_boot(), msgs) def update_msgs(self, cur_time: float, msgs: List[capnp.lib.capnp._DynamicStructReader]) -> None: - # TODO: add optional input that specify the service to wait for self.frame += 1 self.updated = dict.fromkeys(self.updated, False) for msg in msgs: