Skip to content

Commit

Permalink
import messaging and services
Browse files Browse the repository at this point in the history
  • Loading branch information
geohot committed Nov 1, 2019
1 parent 90e48c5 commit 23ad256
Show file tree
Hide file tree
Showing 14 changed files with 1,048 additions and 0 deletions.
1 change: 1 addition & 0 deletions messaging/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
demo
63 changes: 63 additions & 0 deletions messaging/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
CXX := clang++
CC := clang

BASEDIR = ../..
PHONELIBS = ../../phonelibs

CXXFLAGS := -g -O3 -fPIC -std=c++11 -Wall -Wextra -Wshadow -Weffc++ -Wstrict-aliasing -Wpedantic -Werror -MMD -I$(BASEDIR)/selfdrive

LDLIBS=-lm -lstdc++ -lrt -lpthread

UNAME_M := $(shell uname -m)

YAML_FLAGS = -I$(PHONELIBS)/yaml-cpp/include
YAML_LIB = $(abspath $(PHONELIBS)/yaml-cpp/lib/libyaml-cpp.a)

ifeq ($(UNAME_M),aarch64)
LDFLAGS += -llog -lgnustl_shared
ZMQ_LIBS = /usr/lib/libzmq.a
endif
ifeq ($(UNAME_M),x86_64)
ZMQ_FLAGS = -I$(BASEDIR)/phonelibs/zmq/x64/include
ZMQ_LIBS = $(abspath $(BASEDIR)/external/zmq/lib/libzmq.a)
YAML_DIR = $(PHONELIBS)/yaml-cpp/x64/lib/
YAML_LIB = $(abspath $(PHONELIBS)/yaml-cpp/x64/lib/libyaml-cpp.a)
endif

ifdef ASAN
CXXFLAGS += -fsanitize=address -fno-omit-frame-pointer
LDFLAGS += -fsanitize=address
endif

CXXFLAGS += $(ZMQ_FLAGS) $(YAML_FLAGS)

OBJS := messaging.o impl_zmq.o
DEPS=$(OBJS:.o=.d)

.PRECIOUS: $(OBJS)
.PHONY: all clean
all: messaging.a messaging_pyx.so

demo: messaging.a demo.o
$(CC) $(LDFLAGS) $^ $(LDLIBS) -L. -l:messaging.a -o '$@'

messaging_pyx.so: messaging.a messaging_pyx_setup.py messaging_pyx.pyx messaging.pxd
python3 messaging_pyx_setup.py build_ext --inplace
rm -rf build
rm -f messaging_pyx.cpp

%.a: $(OBJS)
@echo "[ LINK ] $@"
mkdir -p libs; \
cd libs; \
ar -x $(ZMQ_LIBS); \
ar -x $(YAML_LIB);

ar rcsD '$@' $^ libs/*.o
rm -r libs

clean:
@echo "[ CLEAN ]"
rm -rf *.so *.a demo libs $(OBJS) $(DEPS)

-include $(DEPS)
212 changes: 212 additions & 0 deletions messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import os
import subprocess

can_dir = os.path.dirname(os.path.abspath(__file__))
subprocess.check_call(["make"], cwd=can_dir)
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error

from cereal import log
from common.realtime import sec_since_boot
from selfdrive.services import service_list


context = Context()

def new_message():
dat = log.Event.new_message()
dat.logMonoTime = int(sec_since_boot() * 1e9)
dat.valid = True
return dat

def pub_sock(endpoint):
sock = PubSocket()
sock.connect(context, endpoint)
return sock

def sub_sock(endpoint, poller=None, addr="127.0.0.1", conflate=False, timeout=None):
sock = SubSocket()
sock.connect(context, endpoint, conflate)

if timeout is not None:
sock.setTimeout(timeout)

if addr != "127.0.0.1":
raise NotImplementedError("Only localhost supported")

if poller is not None:
poller.registerSocket(sock)
return sock


def drain_sock_raw(sock, wait_for_one=False):
"""Receive all message currently available on the queue"""
ret = []
while 1:
if wait_for_one and len(ret) == 0:
dat = sock.receive()
else:
dat = sock.receive(non_blocking=True)

if dat is None:
break

ret.append(dat)

return ret

def drain_sock(sock, wait_for_one=False):
"""Receive all message currently available on the queue"""
ret = []
while 1:
if wait_for_one and len(ret) == 0:
dat = sock.receive()
else:
dat = sock.receive(non_blocking=True)

if dat is None: # Timeout hit
break

dat = log.Event.from_bytes(dat)
ret.append(dat)

return ret


# TODO: print when we drop packets?
def recv_sock(sock, wait=False):
"""Same as drain sock, but only returns latest message. Consider using conflate instead."""
dat = None

while 1:
if wait and dat is None:
rcv = sock.receive()
else:
rcv = sock.receive(non_blocking=True)

if rcv is None: # Timeout hit
break

dat = rcv

if dat is not None:
dat = log.Event.from_bytes(dat)

return dat

def recv_one(sock):
return log.Event.from_bytes(sock.receive())

def recv_one_or_none(sock):
dat = sock.receive(non_blocking=True)

if dat is not None:
log.Event.from_bytes(dat)

return dat

def recv_one_retry(sock):
"""Keep receiving until we get a message"""
while True:
dat = sock.receive()
if dat is not None:
return log.Event.from_bytes(dat)

def get_one_can(logcan):
while True:
can = recv_one_retry(logcan)
if len(can.can) > 0:
return can

class SubMaster():
def __init__(self, services, ignore_alive=None, addr="127.0.0.1"):
self.poller = Poller()
self.frame = -1
self.updated = {s : False for s in services}
self.rcv_time = {s : 0. for s in services}
self.rcv_frame = {s : 0 for s in services}
self.alive = {s : False for s in services}
self.sock = {}
self.freq = {}
self.data = {}
self.logMonoTime = {}
self.valid = {}

if ignore_alive is not None:
self.ignore_alive = ignore_alive
else:
self.ignore_alive = []

for s in services:
# TODO: get address automatically from service_list
if addr is not None:
self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True)
self.freq[s] = service_list[s].frequency

data = new_message()
if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan',
'ethernetData', 'cellInfo', 'wifiScan',
'trafficEvents', 'orbObservation', 'carEvents']:
data.init(s, 0)
else:
data.init(s)
self.data[s] = getattr(data, s)
self.logMonoTime[s] = 0
self.valid[s] = data.valid

def __getitem__(self, s):
return self.data[s]

def update(self, timeout=-1):
msgs = []
for sock in self.poller.poll(timeout):
msgs.append(recv_one(sock))
self.update_msgs(sec_since_boot(), msgs)

def update_msgs(self, cur_time, msgs):
# TODO: add optional input that specify the service to wait for
self.frame += 1
self.updated = dict.fromkeys(self.updated, False)
for msg in msgs:
s = msg.which()
self.updated[s] = True
self.rcv_time[s] = cur_time
self.rcv_frame[s] = self.frame
self.data[s] = getattr(msg, s)
self.logMonoTime[s] = msg.logMonoTime
self.valid[s] = msg.valid

for s in self.data:
# arbitrary small number to avoid float comparison. If freq is 0, we can skip the check
if self.freq[s] > 1e-5:
# alive if delay is within 10x the expected frequency
self.alive[s] = (cur_time - self.rcv_time[s]) < (10. / self.freq[s])
else:
self.alive[s] = True

def all_alive(self, service_list=None):
if service_list is None: # check all
service_list = self.alive.keys()
return all(self.alive[s] for s in service_list if s not in self.ignore_alive)

def all_valid(self, service_list=None):
if service_list is None: # check all
service_list = self.valid.keys()
return all(self.valid[s] for s in service_list)

def all_alive_and_valid(self, service_list=None):
if service_list is None: # check all
service_list = self.alive.keys()
return self.all_alive(service_list=service_list) and self.all_valid(service_list=service_list)


class PubMaster():
def __init__(self, services):
self.sock = {}
for s in services:
self.sock[s] = pub_sock(s)

def send(self, s, dat):
# accept either bytes or capnp builder
if not isinstance(dat, bytes):
dat = dat.to_bytes()
self.sock[s].send(dat)
50 changes: 50 additions & 0 deletions messaging/demo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include <iostream>
#include <cstddef>
#include <chrono>
#include <thread>
#include <cassert>

#include "messaging.hpp"
#include "impl_zmq.hpp"

#define MSGS 1e5

int main() {
Context * c = Context::create();
SubSocket * sub_sock = SubSocket::create(c, "controlsState");
PubSocket * pub_sock = PubSocket::create(c, "controlsState");

char data[8];

Poller * poller = Poller::create({sub_sock});

auto start = std::chrono::steady_clock::now();

for (uint64_t i = 0; i < MSGS; i++){
*(uint64_t*)data = i;
pub_sock->send(data, 8);

auto r = poller->poll(100);

for (auto p : r){
Message * m = p->receive();
uint64_t ii = *(uint64_t*)m->getData();
assert(i == ii);
delete m;
}
}


auto end = std::chrono::steady_clock::now();
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9;
double throughput = ((double) MSGS / (double) elapsed);
std::cout << throughput << " msg/s" << std::endl;

delete poller;
delete sub_sock;
delete pub_sock;
delete c;


return 0;
}
30 changes: 30 additions & 0 deletions messaging/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import time

from messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error

MSGS = 1e5

if __name__ == "__main__":
c = Context()
sub_sock = SubSocket()
pub_sock = PubSocket()

sub_sock.connect(c, "controlsState")
pub_sock.connect(c, "controlsState")


poller = Poller()
poller.registerSocket(sub_sock)

t = time.time()
for i in range(int(MSGS)):
bts = i.to_bytes(4, 'little')
pub_sock.send(bts)

for s in poller.poll(100):
dat = s.receive()
ii = int.from_bytes(dat, 'little')
assert(i == ii)

dt = time.time() - t
print("%.1f msg/s" % (MSGS / dt))
Loading

0 comments on commit 23ad256

Please sign in to comment.