Skip to content

Commit

Permalink
allow prioritization of services in SubMaster (commaai#84)
Browse files Browse the repository at this point in the history
* allow blocking on a service in SubMaster update

* more descriptive name

* more generic

* fix

* fix mypy

* priority poller

* only use one poller
  • Loading branch information
adeebshihadeh authored Sep 1, 2020
1 parent 1538183 commit 626679d
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions messaging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ def recv_one_retry(sock: SubSocket) -> capnp.lib.capnp._DynamicStructReader:
return log.Event.from_bytes(dat)

class SubMaster():
def __init__(self, services: List[str], ignore_alive: Optional[List[str]] = None,
addr:str ="127.0.0.1"):
self.poller = Poller()
def __init__(self, services: List[str], poll: Optional[List[str]] = None,
ignore_alive: Optional[List[str]] = None, addr:str ="127.0.0.1"):
self.frame = -1
self.updated = {s: False for s in services}
self.rcv_time = {s: 0. for s in services}
Expand All @@ -136,8 +135,12 @@ def __init__(self, services: List[str], ignore_alive: Optional[List[str]] = None
self.sock = {}
self.freq = {}
self.data = {}
self.logMonoTime = {}
self.valid = {}
self.logMonoTime = {}

self.poller = Poller()
self.non_polled_services = [s for s in services if poll is not None and
len(poll) and s not in poll]

if ignore_alive is not None:
self.ignore_alive = ignore_alive
Expand All @@ -146,14 +149,14 @@ def __init__(self, services: List[str], ignore_alive: Optional[List[str]] = None

for s in services:
if addr is not None:
self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True)
p = self.poller if s not in self.non_polled_services else None
self.sock[s] = sub_sock(s, poller=p, addr=addr, conflate=True)
self.freq[s] = service_list[s].frequency

try:
data = new_message(s)
except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member
# lists
data = new_message(s, 0)
data = new_message(s, 0) # lists

self.data[s] = getattr(data, s)
self.logMonoTime[s] = 0
Expand All @@ -166,10 +169,13 @@ def update(self, timeout: int = 1000) -> None:
msgs = []
for sock in self.poller.poll(timeout):
msgs.append(recv_one_or_none(sock))

# non-blocking receive for non-polled sockets
for s in self.non_polled_services:
msgs.append(recv_one_or_none(self.sock[s]))
self.update_msgs(sec_since_boot(), msgs)

def update_msgs(self, cur_time: float, msgs: List[capnp.lib.capnp._DynamicStructReader]) -> None:
# TODO: add optional input that specify the service to wait for
self.frame += 1
self.updated = dict.fromkeys(self.updated, False)
for msg in msgs:
Expand Down

0 comments on commit 626679d

Please sign in to comment.