diff --git a/messaging/.gitignore b/messaging/.gitignore new file mode 100644 index 00000000000000..1549b67ca5936e --- /dev/null +++ b/messaging/.gitignore @@ -0,0 +1 @@ +demo diff --git a/messaging/Makefile b/messaging/Makefile new file mode 100644 index 00000000000000..cb1bdea0e33589 --- /dev/null +++ b/messaging/Makefile @@ -0,0 +1,63 @@ +CXX := clang++ +CC := clang + +BASEDIR = ../.. +PHONELIBS = ../../phonelibs + +CXXFLAGS := -g -O3 -fPIC -std=c++11 -Wall -Wextra -Wshadow -Weffc++ -Wstrict-aliasing -Wpedantic -Werror -MMD -I$(BASEDIR)/selfdrive + +LDLIBS=-lm -lstdc++ -lrt -lpthread + +UNAME_M := $(shell uname -m) + +YAML_FLAGS = -I$(PHONELIBS)/yaml-cpp/include +YAML_LIB = $(abspath $(PHONELIBS)/yaml-cpp/lib/libyaml-cpp.a) + +ifeq ($(UNAME_M),aarch64) + LDFLAGS += -llog -lgnustl_shared + ZMQ_LIBS = /usr/lib/libzmq.a +endif +ifeq ($(UNAME_M),x86_64) + ZMQ_FLAGS = -I$(BASEDIR)/phonelibs/zmq/x64/include + ZMQ_LIBS = $(abspath $(BASEDIR)/external/zmq/lib/libzmq.a) + YAML_DIR = $(PHONELIBS)/yaml-cpp/x64/lib/ + YAML_LIB = $(abspath $(PHONELIBS)/yaml-cpp/x64/lib/libyaml-cpp.a) +endif + +ifdef ASAN + CXXFLAGS += -fsanitize=address -fno-omit-frame-pointer + LDFLAGS += -fsanitize=address +endif + +CXXFLAGS += $(ZMQ_FLAGS) $(YAML_FLAGS) + +OBJS := messaging.o impl_zmq.o +DEPS=$(OBJS:.o=.d) + +.PRECIOUS: $(OBJS) +.PHONY: all clean +all: messaging.a messaging_pyx.so + +demo: messaging.a demo.o + $(CC) $(LDFLAGS) $^ $(LDLIBS) -L. -l:messaging.a -o '$@' + +messaging_pyx.so: messaging.a messaging_pyx_setup.py messaging_pyx.pyx messaging.pxd + python3 messaging_pyx_setup.py build_ext --inplace + rm -rf build + rm -f messaging_pyx.cpp + +%.a: $(OBJS) + @echo "[ LINK ] $@" + mkdir -p libs; \ + cd libs; \ + ar -x $(ZMQ_LIBS); \ + ar -x $(YAML_LIB); + + ar rcsD '$@' $^ libs/*.o + rm -r libs + +clean: + @echo "[ CLEAN ]" + rm -rf *.so *.a demo libs $(OBJS) $(DEPS) + +-include $(DEPS) diff --git a/messaging/__init__.py b/messaging/__init__.py new file mode 100644 index 00000000000000..0b9db08d2aa4e2 --- /dev/null +++ b/messaging/__init__.py @@ -0,0 +1,212 @@ +import os +import subprocess + +can_dir = os.path.dirname(os.path.abspath(__file__)) +subprocess.check_call(["make"], cwd=can_dir) +from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error + +from cereal import log +from common.realtime import sec_since_boot +from selfdrive.services import service_list + + +context = Context() + +def new_message(): + dat = log.Event.new_message() + dat.logMonoTime = int(sec_since_boot() * 1e9) + dat.valid = True + return dat + +def pub_sock(endpoint): + sock = PubSocket() + sock.connect(context, endpoint) + return sock + +def sub_sock(endpoint, poller=None, addr="127.0.0.1", conflate=False, timeout=None): + sock = SubSocket() + sock.connect(context, endpoint, conflate) + + if timeout is not None: + sock.setTimeout(timeout) + + if addr != "127.0.0.1": + raise NotImplementedError("Only localhost supported") + + if poller is not None: + poller.registerSocket(sock) + return sock + + +def drain_sock_raw(sock, wait_for_one=False): + """Receive all message currently available on the queue""" + ret = [] + while 1: + if wait_for_one and len(ret) == 0: + dat = sock.receive() + else: + dat = sock.receive(non_blocking=True) + + if dat is None: + break + + ret.append(dat) + + return ret + +def drain_sock(sock, wait_for_one=False): + """Receive all message currently available on the queue""" + ret = [] + while 1: + if wait_for_one and len(ret) == 0: + dat = sock.receive() + else: + dat = sock.receive(non_blocking=True) + + if dat is None: # Timeout hit + break + + dat = log.Event.from_bytes(dat) + ret.append(dat) + + return ret + + +# TODO: print when we drop packets? +def recv_sock(sock, wait=False): + """Same as drain sock, but only returns latest message. Consider using conflate instead.""" + dat = None + + while 1: + if wait and dat is None: + rcv = sock.receive() + else: + rcv = sock.receive(non_blocking=True) + + if rcv is None: # Timeout hit + break + + dat = rcv + + if dat is not None: + dat = log.Event.from_bytes(dat) + + return dat + +def recv_one(sock): + return log.Event.from_bytes(sock.receive()) + +def recv_one_or_none(sock): + dat = sock.receive(non_blocking=True) + + if dat is not None: + log.Event.from_bytes(dat) + + return dat + +def recv_one_retry(sock): + """Keep receiving until we get a message""" + while True: + dat = sock.receive() + if dat is not None: + return log.Event.from_bytes(dat) + +def get_one_can(logcan): + while True: + can = recv_one_retry(logcan) + if len(can.can) > 0: + return can + +class SubMaster(): + def __init__(self, services, ignore_alive=None, addr="127.0.0.1"): + self.poller = Poller() + self.frame = -1 + self.updated = {s : False for s in services} + self.rcv_time = {s : 0. for s in services} + self.rcv_frame = {s : 0 for s in services} + self.alive = {s : False for s in services} + self.sock = {} + self.freq = {} + self.data = {} + self.logMonoTime = {} + self.valid = {} + + if ignore_alive is not None: + self.ignore_alive = ignore_alive + else: + self.ignore_alive = [] + + for s in services: + # TODO: get address automatically from service_list + if addr is not None: + self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True) + self.freq[s] = service_list[s].frequency + + data = new_message() + if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan', + 'ethernetData', 'cellInfo', 'wifiScan', + 'trafficEvents', 'orbObservation', 'carEvents']: + data.init(s, 0) + else: + data.init(s) + self.data[s] = getattr(data, s) + self.logMonoTime[s] = 0 + self.valid[s] = data.valid + + def __getitem__(self, s): + return self.data[s] + + def update(self, timeout=-1): + msgs = [] + for sock in self.poller.poll(timeout): + msgs.append(recv_one(sock)) + self.update_msgs(sec_since_boot(), msgs) + + def update_msgs(self, cur_time, msgs): + # TODO: add optional input that specify the service to wait for + self.frame += 1 + self.updated = dict.fromkeys(self.updated, False) + for msg in msgs: + s = msg.which() + self.updated[s] = True + self.rcv_time[s] = cur_time + self.rcv_frame[s] = self.frame + self.data[s] = getattr(msg, s) + self.logMonoTime[s] = msg.logMonoTime + self.valid[s] = msg.valid + + for s in self.data: + # arbitrary small number to avoid float comparison. If freq is 0, we can skip the check + if self.freq[s] > 1e-5: + # alive if delay is within 10x the expected frequency + self.alive[s] = (cur_time - self.rcv_time[s]) < (10. / self.freq[s]) + else: + self.alive[s] = True + + def all_alive(self, service_list=None): + if service_list is None: # check all + service_list = self.alive.keys() + return all(self.alive[s] for s in service_list if s not in self.ignore_alive) + + def all_valid(self, service_list=None): + if service_list is None: # check all + service_list = self.valid.keys() + return all(self.valid[s] for s in service_list) + + def all_alive_and_valid(self, service_list=None): + if service_list is None: # check all + service_list = self.alive.keys() + return self.all_alive(service_list=service_list) and self.all_valid(service_list=service_list) + + +class PubMaster(): + def __init__(self, services): + self.sock = {} + for s in services: + self.sock[s] = pub_sock(s) + + def send(self, s, dat): + # accept either bytes or capnp builder + if not isinstance(dat, bytes): + dat = dat.to_bytes() + self.sock[s].send(dat) diff --git a/messaging/demo.cc b/messaging/demo.cc new file mode 100644 index 00000000000000..cfdf42209415fc --- /dev/null +++ b/messaging/demo.cc @@ -0,0 +1,50 @@ +#include +#include +#include +#include +#include + +#include "messaging.hpp" +#include "impl_zmq.hpp" + +#define MSGS 1e5 + +int main() { + Context * c = Context::create(); + SubSocket * sub_sock = SubSocket::create(c, "controlsState"); + PubSocket * pub_sock = PubSocket::create(c, "controlsState"); + + char data[8]; + + Poller * poller = Poller::create({sub_sock}); + + auto start = std::chrono::steady_clock::now(); + + for (uint64_t i = 0; i < MSGS; i++){ + *(uint64_t*)data = i; + pub_sock->send(data, 8); + + auto r = poller->poll(100); + + for (auto p : r){ + Message * m = p->receive(); + uint64_t ii = *(uint64_t*)m->getData(); + assert(i == ii); + delete m; + } + } + + + auto end = std::chrono::steady_clock::now(); + double elapsed = std::chrono::duration_cast(end - start).count() / 1e9; + double throughput = ((double) MSGS / (double) elapsed); + std::cout << throughput << " msg/s" << std::endl; + + delete poller; + delete sub_sock; + delete pub_sock; + delete c; + + + return 0; +} diff --git a/messaging/demo.py b/messaging/demo.py new file mode 100644 index 00000000000000..7906a41e203cdf --- /dev/null +++ b/messaging/demo.py @@ -0,0 +1,30 @@ +import time + +from messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error + +MSGS = 1e5 + +if __name__ == "__main__": + c = Context() + sub_sock = SubSocket() + pub_sock = PubSocket() + + sub_sock.connect(c, "controlsState") + pub_sock.connect(c, "controlsState") + + + poller = Poller() + poller.registerSocket(sub_sock) + + t = time.time() + for i in range(int(MSGS)): + bts = i.to_bytes(4, 'little') + pub_sock.send(bts) + + for s in poller.poll(100): + dat = s.receive() + ii = int.from_bytes(dat, 'little') + assert(i == ii) + + dt = time.time() - t + print("%.1f msg/s" % (MSGS / dt)) diff --git a/messaging/impl_zmq.cc b/messaging/impl_zmq.cc new file mode 100644 index 00000000000000..73962afae949a7 --- /dev/null +++ b/messaging/impl_zmq.cc @@ -0,0 +1,182 @@ +#include +#include +#include +#include + +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wshadow" +#include +#pragma GCC diagnostic pop + +#include "impl_zmq.hpp" + +static int get_port(std::string endpoint) { + char * base_dir_ptr = std::getenv("BASEDIR"); + + if (base_dir_ptr == NULL){ + base_dir_ptr = std::getenv("PYTHONPATH"); + } + + assert(base_dir_ptr); + std::string base_dir = base_dir_ptr; + std::string service_list_path = base_dir + "/selfdrive/service_list.yaml"; + YAML::Node service_list = YAML::LoadFile(service_list_path); + + int port = -1; + for (const auto& it : service_list) { + auto name = it.first.as(); + + if (name == endpoint){ + port = it.second[0].as(); + break; + } + } + + if (port == -1){ + std::cout << "Service " << endpoint << " not found" << std::endl; + } + + assert(port >= 0); + return port; +} + +ZMQContext::ZMQContext() { + context = zmq_ctx_new(); +} + +ZMQContext::~ZMQContext() { + zmq_ctx_term(context); +} + +void ZMQMessage::init(size_t sz) { + size = sz; + data = new char[size]; +} + +void ZMQMessage::init(char * d, size_t sz) { + size = sz; + data = new char[size]; + memcpy(data, d, size); +} + +void ZMQMessage::close() { + if (size > 0){ + delete[] data; + } + size = 0; +} + +ZMQMessage::~ZMQMessage() { + this->close(); +} + + +void ZMQSubSocket::connect(Context *context, std::string endpoint, bool conflate){ + sock = zmq_socket(context->getRawContext(), ZMQ_SUB); + assert(sock); + + zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0); + + if (conflate){ + int arg = 1; + zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int)); + } + + int reconnect_ivl = 500; + zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl)); + + full_endpoint = "tcp://127.0.0.1:"; + full_endpoint += std::to_string(get_port(endpoint)); + + std::cout << "ZMQ SUB: " << full_endpoint << std::endl; + + assert(zmq_connect(sock, full_endpoint.c_str()) == 0); +} + + +Message * ZMQSubSocket::receive(bool non_blocking){ + zmq_msg_t msg; + assert(zmq_msg_init(&msg) == 0); + + int flags = non_blocking ? ZMQ_DONTWAIT : 0; + int rc = zmq_msg_recv(&msg, sock, flags); + Message *r = NULL; + + if (rc >= 0){ + // Make a copy to ensure the data is aligned + r = new ZMQMessage; + r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); + } + // else { + // std::cout << "endpoint: " << full_endpoint << std::endl; + // std::cout << "Receive error: " << zmq_strerror(errno) << std::endl; + // std::cout << "non_blocking: " << non_blocking << std::endl; + // int timeout = 123; + // size_t sz = sizeof(int); + // zmq_getsockopt(sock, ZMQ_RCVTIMEO, &timeout, &sz); + // std::cout << "timeout: " << timeout << std::endl; + // } + + zmq_msg_close(&msg); + return r; +} + +void ZMQSubSocket::setTimeout(int timeout){ + zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int)); +} + +ZMQSubSocket::~ZMQSubSocket(){ + zmq_close(sock); +} + +void ZMQPubSocket::connect(Context *context, std::string endpoint){ + sock = zmq_socket(context->getRawContext(), ZMQ_PUB); + + full_endpoint = "tcp://*:"; + full_endpoint += std::to_string(get_port(endpoint)); + + std::cout << "ZMQ PUB: " << full_endpoint << std::endl; + + assert(zmq_bind(sock, full_endpoint.c_str()) == 0); +} + +int ZMQPubSocket::sendMessage(Message *message){ + return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT); +} + +int ZMQPubSocket::send(char *data, size_t size){ + return zmq_send(sock, data, size, ZMQ_DONTWAIT); +} + +ZMQPubSocket::~ZMQPubSocket(){ + zmq_close(sock); +} + + +void ZMQPoller::registerSocket(SubSocket * socket){ + assert(num_polls + 1 < MAX_POLLERS); + polls[num_polls].socket = socket->getRawSocket(); + polls[num_polls].events = ZMQ_POLLIN; + + sockets.push_back(socket); + num_polls++; +} + +std::vector ZMQPoller::poll(int timeout){ + std::vector r; + + int rc = zmq_poll(polls, num_polls, timeout); + if (rc < 0){ + return r; + } + + for (size_t i = 0; i < num_polls; i++){ + if (polls[i].revents){ + r.push_back(sockets[i]); + } + } + + return r; +} diff --git a/messaging/impl_zmq.hpp b/messaging/impl_zmq.hpp new file mode 100644 index 00000000000000..cfe105f4422ea3 --- /dev/null +++ b/messaging/impl_zmq.hpp @@ -0,0 +1,63 @@ +#pragma once +#include "messaging.hpp" +#include +#include + +#define MAX_POLLERS 128 + +class ZMQContext : public Context { +private: + void * context = NULL; +public: + ZMQContext(); + void * getRawContext() {return context;} + ~ZMQContext(); +}; + +class ZMQMessage : public Message { +private: + char * data; + size_t size; +public: + void init(size_t size); + void init(char *data, size_t size); + size_t getSize(){return size;} + char * getData(){return data;} + void close(); + ~ZMQMessage(); +}; + +class ZMQSubSocket : public SubSocket { +private: + void * sock; + std::string full_endpoint; +public: + void connect(Context *context, std::string endpoint, bool conflate=false); + void setTimeout(int timeout); + void * getRawSocket() {return sock;} + Message *receive(bool non_blocking=false); + ~ZMQSubSocket(); +}; + +class ZMQPubSocket : public PubSocket { +private: + void * sock; + std::string full_endpoint; +public: + void connect(Context *context, std::string endpoint); + int sendMessage(Message *message); + int send(char *data, size_t size); + ~ZMQPubSocket(); +}; + +class ZMQPoller : public Poller { +private: + std::vector sockets; + zmq_pollitem_t polls[MAX_POLLERS]; + size_t num_polls = 0; + +public: + void registerSocket(SubSocket *socket); + std::vector poll(int timeout); + ~ZMQPoller(){}; +}; diff --git a/messaging/messaging.cc b/messaging/messaging.cc new file mode 100644 index 00000000000000..352b04a6d969d9 --- /dev/null +++ b/messaging/messaging.cc @@ -0,0 +1,44 @@ +#include "messaging.hpp" +#include "impl_zmq.hpp" + +Context * Context::create(){ + Context * c = new ZMQContext(); + return c; +} + +SubSocket * SubSocket::create(){ + SubSocket * s = new ZMQSubSocket(); + return s; +} + +SubSocket * SubSocket::create(Context * context, std::string endpoint){ + SubSocket *s = SubSocket::create(); + s->connect(context, endpoint); + + return s; +} + +PubSocket * PubSocket::create(){ + PubSocket * s = new ZMQPubSocket(); + return s; +} + +PubSocket * PubSocket::create(Context * context, std::string endpoint){ + PubSocket *s = PubSocket::create(); + s->connect(context, endpoint); + return s; +} + +Poller * Poller::create(){ + Poller * p = new ZMQPoller(); + return p; +} + +Poller * Poller::create(std::vector sockets){ + Poller * p = Poller::create(); + + for (auto s : sockets){ + p->registerSocket(s); + } + return p; +} diff --git a/messaging/messaging.hpp b/messaging/messaging.hpp new file mode 100644 index 00000000000000..f379dc14343e62 --- /dev/null +++ b/messaging/messaging.hpp @@ -0,0 +1,52 @@ +#pragma once +#include +#include +#include + +class Context { +public: + virtual void * getRawContext() = 0; + static Context * create(); + virtual ~Context(){}; +}; + +class Message { +public: + virtual void init(size_t size) = 0; + virtual void init(char * data, size_t size) = 0; + virtual void close() = 0; + virtual size_t getSize() = 0; + virtual char * getData() = 0; + virtual ~Message(){}; +}; + + +class SubSocket { +public: + virtual void connect(Context *context, std::string endpoint, bool conflate=false) = 0; + virtual void setTimeout(int timeout) = 0; + virtual Message *receive(bool non_blocking=false) = 0; + virtual void * getRawSocket() = 0; + static SubSocket * create(); + static SubSocket * create(Context * context, std::string endpoint); + virtual ~SubSocket(){}; +}; + +class PubSocket { +public: + virtual void connect(Context *context, std::string endpoint) = 0; + virtual int sendMessage(Message *message) = 0; + virtual int send(char *data, size_t size) = 0; + static PubSocket * create(); + static PubSocket * create(Context * context, std::string endpoint); + virtual ~PubSocket(){}; +}; + +class Poller { +public: + virtual void registerSocket(SubSocket *socket) = 0; + virtual std::vector poll(int timeout) = 0; + static Poller * create(); + static Poller * create(std::vector sockets); + virtual ~Poller(){}; +}; diff --git a/messaging/messaging.pxd b/messaging/messaging.pxd new file mode 100644 index 00000000000000..cb8ca07eab0f04 --- /dev/null +++ b/messaging/messaging.pxd @@ -0,0 +1,42 @@ +# distutils: language = c++ +#cython: language_level=3 + +from libcpp.string cimport string +from libcpp.vector cimport vector +from libcpp cimport bool + + +cdef extern from "messaging.hpp": + cdef cppclass Context: + @staticmethod + Context * create() + + cdef cppclass Message: + void init(size_t) + void init(char *, size_t) + void close() + size_t getSize() + char *getData() + + + + cdef cppclass SubSocket: + @staticmethod + SubSocket * create() + void connect(Context *, string, bool) + Message * receive(bool) + void setTimeout(int) + + + cdef cppclass PubSocket: + @staticmethod + PubSocket * create() + void connect(Context *, string) + int sendMessage(Message *) + int send(char *, size_t) + + cdef cppclass Poller: + @staticmethod + Poller * create() + void registerSocket(SubSocket *) + vector[SubSocket*] poll(int) diff --git a/messaging/messaging_pyx.pyx b/messaging/messaging_pyx.pyx new file mode 100644 index 00000000000000..976e2d4cc222dc --- /dev/null +++ b/messaging/messaging_pyx.pyx @@ -0,0 +1,92 @@ +# distutils: language = c++ +# cython: c_string_encoding=ascii, language_level=3 + +from libcpp.string cimport string +from libcpp cimport bool + + +from messaging cimport Context as cppContext +from messaging cimport SubSocket as cppSubSocket +from messaging cimport PubSocket as cppPubSocket +from messaging cimport Poller as cppPoller +from messaging cimport Message as cppMessage + + +cdef class Context: + cdef cppContext * context + def __cinit__(self): + self.context = cppContext.create() + + def __dealloc___(self): + del self.context + + +cdef class Poller: + cdef cppPoller * poller + + def __cinit__(self): + self.poller = cppPoller.create() + + def __dealloc___(self): + del self.poller + + def registerSocket(self, SubSocket socket): + self.poller.registerSocket(socket.socket) + + def poll(self, timeout): + sockets = [] + + result = self.poller.poll(timeout) + for s in result: + socket = SubSocket() + socket.setPtr(s) + sockets.append(socket) + + return sockets + +cdef class SubSocket: + cdef cppSubSocket * socket + + def __cinit__(self): + self.socket = cppSubSocket.create() + + def __dealloc___(self): + del self.socket + + cdef setPtr(self, cppSubSocket * ptr): + del self.socket + self.socket = ptr + + def connect(self, Context context, string endpoint, bool conflate=False): + self.socket.connect(context.context, endpoint, conflate) + + def setTimeout(self, int timeout): + self.socket.setTimeout(timeout) + + + def receive(self, bool non_blocking=False): + msg = self.socket.receive(non_blocking) + + if msg == NULL: + return None + else: + sz = msg.getSize() + m = msg.getData()[:sz] + del msg + + return m + + +cdef class PubSocket: + cdef cppPubSocket * socket + def __cinit__(self): + self.socket = cppPubSocket.create() + + def __dealloc___(self): + del self.socket + + def connect(self, Context context, string endpoint): + self.socket.connect(context.context, endpoint) + + def send(self, string data): + return self.socket.send(data.c_str(), len(data)) diff --git a/messaging/messaging_pyx_setup.py b/messaging/messaging_pyx_setup.py new file mode 100644 index 00000000000000..53cd73c7d76d3a --- /dev/null +++ b/messaging/messaging_pyx_setup.py @@ -0,0 +1,34 @@ +import os +import subprocess +from distutils.core import Extension, setup # pylint: disable=import-error,no-name-in-module + +from Cython.Build import cythonize + +from common.basedir import BASEDIR +from common.cython_hacks import BuildExtWithoutPlatformSuffix + +sourcefiles = ['messaging_pyx.pyx'] +extra_compile_args = ["-std=c++11"] +libraries = [] +ARCH = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip() # pylint: disable=unexpected-keyword-arg + +if ARCH == "aarch64": + extra_compile_args += ["-Wno-deprecated-register"] + libraries += ['gnustl_shared'] + +setup(name='CAN parser', + cmdclass={'build_ext': BuildExtWithoutPlatformSuffix}, + ext_modules=cythonize( + Extension( + "messaging_pyx", + language="c++", + sources=sourcefiles, + extra_compile_args=extra_compile_args, + libraries=libraries, + extra_objects=[ + os.path.join(BASEDIR, 'selfdrive', 'messaging', 'messaging.a'), + ] + ) + ), + nthreads=4, +) diff --git a/service_list.yaml b/service_list.yaml new file mode 100644 index 00000000000000..a38c6ad714ac60 --- /dev/null +++ b/service_list.yaml @@ -0,0 +1,163 @@ +# TODO: these port numbers are hardcoded in c, fix this + +# LogRotate: 8001 is a PUSH PULL socket between loggerd and visiond + +# all ZMQ pub sub: port, should_log, frequency, (qlog_decimation) + +# frame syncing packet +frame: [8002, true, 20., 1] +# accel, gyro, and compass +sensorEvents: [8003, true, 100., 100] +# GPS data, also global timestamp +gpsNMEA: [8004, true, 9.] # 9 msgs each sec +# CPU+MEM+GPU+BAT temps +thermal: [8005, true, 2., 1] +# List(CanData), list of can messages +can: [8006, true, 100.] +controlsState: [8007, true, 100., 100] +#liveEvent: [8008, true, 0.] +model: [8009, true, 20., 5] +features: [8010, true, 0.] +health: [8011, true, 2., 1] +radarState: [8012, true, 20.] +#liveUI: [8014, true, 0.] +encodeIdx: [8015, true, 20.] +liveTracks: [8016, true, 20.] +sendcan: [8017, true, 100.] +logMessage: [8018, true, 0.] +liveCalibration: [8019, true, 5.] +androidLog: [8020, true, 0.] +carState: [8021, true, 100., 10] +# 8022 is reserved for sshd +carControl: [8023, true, 100., 10] +plan: [8024, true, 20.] +liveLocation: [8025, true, 0.] +gpsLocation: [8026, true, 1., 1] +ethernetData: [8027, true, 0.] +navUpdate: [8028, true, 0.] +qcomGnss: [8029, true, 0.] +lidarPts: [8030, true, 0.] +procLog: [8031, true, 0.5] +gpsLocationExternal: [8032, true, 10., 1] +ubloxGnss: [8033, true, 10.] +clocks: [8034, true, 1., 1] +liveMpc: [8035, false, 20.] +liveLongitudinalMpc: [8036, false, 20.] +plusFrame: [8037, false, 0.] +navStatus: [8038, true, 0.] +gpsLocationTrimble: [8039, true, 0.] +trimbleGnss: [8041, true, 0.] +ubloxRaw: [8042, true, 20.] +gpsPlannerPoints: [8043, true, 0.] +gpsPlannerPlan: [8044, true, 0.] +applanixRaw: [8046, true, 0.] +orbLocation: [8047, true, 0.] +trafficEvents: [8048, true, 0.] +liveLocationTiming: [8049, true, 0.] +orbslamCorrection: [8050, true, 0.] +liveLocationCorrected: [8051, true, 0.] +orbObservation: [8052, true, 0.] +applanixLocation: [8053, true, 0.] +liveLocationKalman: [8054, true, 0.] +uiNavigationEvent: [8055, true, 0.] +orbOdometry: [8057, true, 0.] +orbFeatures: [8058, false, 0.] +orbKeyFrame: [8059, true, 0.] +uiLayoutState: [8060, true, 0.] +frontEncodeIdx: [8061, true, 5.] +orbFeaturesSummary: [8062, true, 0.] +driverMonitoring: [8063, true, 5., 1] +liveParameters: [8064, true, 10.] +liveMapData: [8065, true, 0.] +cameraOdometry: [8066, true, 5.] +pathPlan: [8067, true, 20.] +kalmanOdometry: [8068, true, 0.] +thumbnail: [8069, true, 0.2, 1] +carEvents: [8070, true, 1., 1] +carParams: [8071, true, 0.02, 1] + +testModel: [8040, false, 0.] +testLiveLocation: [8045, false, 0.] +testJoystick: [8056, false, 0.] + +# 8080 is reserved for slave testing daemon +# 8762 is reserved for logserver + +# manager -- base process to manage starting and stopping of all others +# subscribes: thermal + +# **** processes that communicate with the outside world **** + +# thermald -- decides when to start and stop onroad +# subscribes: health, location +# publishes: thermal + +# boardd -- communicates with the car +# subscribes: sendcan +# publishes: can, health, ubloxRaw + +# sensord -- publishes IMU and Magnetometer +# publishes: sensorEvents + +# gpsd -- publishes EON's gps +# publishes: gpsNMEA + +# visiond -- talks to the cameras, runs the model, saves the videos +# publishes: frame, model, driverMonitoring, cameraOdometry, thumbnail + +# **** stateful data transformers **** + +# plannerd -- decides where to drive the car +# subscribes: carState, model, radarState, controlsState, liveParameters +# publishes: plan, pathPlan, liveMpc, liveLongitudinalMpc + +# controlsd -- drives the car by sending CAN messages to panda +# subscribes: can, thermal, health, plan, pathPlan, driverMonitoring, liveCalibration +# publishes: carState, carControl, sendcan, controlsState, carEvents, carParams + +# radard -- processes the radar and vision data +# subscribes: can, controlsState, model, liveParameters +# publishes: radarState, liveTracks + +# params_learner -- learns vehicle params by observing the vehicle dynamics +# subscribes: controlsState, sensorEvents, cameraOdometry +# publishes: liveParameters + +# calibrationd -- reads posenet and applies a temporal filter on the frame region to look at +# subscribes: cameraOdometry +# publishes: liveCalibration + +# ubloxd -- read raw ublox data and converts them in readable format +# subscribes: ubloxRaw +# publishes: ubloxGnss + +# **** LOGGING SERVICE **** + +# loggerd +# subscribes: EVERYTHING + +# **** NON VITAL SERVICES **** + +# ui +# subscribes: thermal, model, controlsState, uiLayout, liveCalibration, radarState, liveMpc, plusFrame, liveMapData + +# uploader +# communicates through file system with loggerd + +# deleter +# communicates through file system with loggerd and uploader + +# logmessaged -- central logging service, can log to cloud +# publishes: logMessage + +# logcatd -- fetches logcat info from android +# publishes: androidLog + +# proclogd -- fetches process information +# publishes: procLog + +# tombstoned -- reports native crashes + +# athenad -- on request, open a sub socket and return the value + +# updated -- waits for network access and tries to update every hour diff --git a/services.py b/services.py new file mode 100644 index 00000000000000..24a96c4eb2e4e1 --- /dev/null +++ b/services.py @@ -0,0 +1,20 @@ +import os +import yaml + +class Service(): + def __init__(self, port, should_log, frequency, decimation=None): + self.port = port + self.should_log = should_log + self.frequency = frequency + self.decimation = decimation + +service_list_path = os.path.join(os.path.dirname(__file__), "service_list.yaml") + +service_list = {} +with open(service_list_path, "r") as f: + for k, v in yaml.safe_load(f).items(): + decimation = None + if len(v) == 4: + decimation = v[3] + + service_list[k] = Service(v[0], v[1], v[2], decimation)