diff --git a/Dockerfile b/Dockerfile index f681dce68ff4b5..c399e19a64e5ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ from ubuntu:16.04 -RUN apt-get update && apt-get install -y libzmq3-dev clang wget git autoconf libtool curl make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl +RUN apt-get update && apt-get install -y libzmq3-dev capnproto libcapnp-dev clang wget git autoconf libtool curl make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl RUN curl -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer | bash ENV PATH="/root/.pyenv/bin:/root/.pyenv/shims:${PATH}" @@ -10,8 +10,6 @@ RUN pyenv rehash RUN pip3 install pyyaml==5.1.2 Cython==0.29.14 scons==3.1.1 pycapnp==0.6.4 WORKDIR /project/cereal -COPY install_capnp.sh . -RUN ./install_capnp.sh ENV PYTHONPATH=/project diff --git a/SConscript b/SConscript index ed63c1717b797f..a724dfa6b938a4 100644 --- a/SConscript +++ b/SConscript @@ -20,6 +20,7 @@ if shutil.which('capnpc-java'): cereal_objects = env.SharedObject([ 'gen/cpp/car.capnp.c++', 'gen/cpp/log.capnp.c++', + 'messaging/socketmaster.cc', ]) env.Library('cereal', cereal_objects) @@ -43,8 +44,10 @@ Depends('messaging/impl_zmq.cc', services_h) # note, this rebuilds the deps shared, zmq is statically linked to make APK happy # TODO: get APK to load system zmq to remove the static link -shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [zmq] -env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) +if arch == "aarch64": + zmq_static = FindFile("libzmq.a", "/usr/lib") + shared_lib_shared_lib = [zmq_static, 'm', 'stdc++', "gnustl_shared"] + env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq']) Depends('messaging/bridge.cc', services_h) diff --git a/car.capnp b/car.capnp index 3ef7117384c311..f2ed72b2d6fef7 100644 --- a/car.capnp +++ b/car.capnp @@ -13,12 +13,12 @@ struct CarEvent @0x9b1657f34caf3ad3 { name @0 :EventName; enable @1 :Bool; noEntry @2 :Bool; - warning @3 :Bool; + warning @3 :Bool; # alerts presented only when enabled or soft disabling userDisable @4 :Bool; softDisable @5 :Bool; immediateDisable @6 :Bool; preEnable @7 :Bool; - permanent @8 :Bool; + permanent @8 :Bool; # alerts presented regardless of openpilot state enum EventName @0xbaa8c5d505f727de { # TODO: copy from error list @@ -38,7 +38,7 @@ struct CarEvent @0x9b1657f34caf3ad3 { pedalPressed @13; cruiseDisabled @14; radarCanError @15; - dataNeeded @16; + dataNeededDEPRECATED @16; speedTooLow @17; outOfSpace @18; overheat @19; @@ -55,23 +55,23 @@ struct CarEvent @0x9b1657f34caf3ad3 { manualRestart @30; lowSpeedLockout @31; plannerError @32; - ipasOverride @33; + ipasOverrideDEPRECATED @33; debugAlert @34; steerTempUnavailableMute @35; resumeRequired @36; preDriverDistracted @37; promptDriverDistracted @38; driverDistracted @39; - geofence @40; - driverMonitorOn @41; - driverMonitorOff @42; + geofenceDEPRECATED @40; + driverMonitorOnDEPRECATED @41; + driverMonitorOffDEPRECATED @42; preDriverUnresponsive @43; promptDriverUnresponsive @44; driverUnresponsive @45; belowSteerSpeed @46; - calibrationProgress @47; + calibrationProgressDEPRECATED @47; lowBattery @48; - invalidGiraffeHonda @49; + invalidGiraffeHondaDEPRECATED @49; vehicleModelInvalid @50; controlsFailed @51; sensorDataInvalid @52; @@ -93,10 +93,18 @@ struct CarEvent @0x9b1657f34caf3ad3 { driverMonitorLowAcc @68; invalidLkasSetting @69; speedTooHigh @70; - laneChangeBlocked @71; + laneChangeBlockedDEPRECATED @71; relayMalfunction @72; gasPressed @73; stockFcw @74; + startup @75; + startupNoCar @76; + startupNoControl @77; + startupMaster @78; + fcw @79; + steerSaturated @80; + whitePandaUnsupported @81; + startupWhitePanda @82; } } diff --git a/install_capnp.sh b/install_capnp.sh index cc570b60ffc884..2d2d43d6141ce9 100755 --- a/install_capnp.sh +++ b/install_capnp.sh @@ -10,31 +10,3 @@ CXXFLAGS="-fPIC" ./configure make -j$(nproc) make install - -# manually build binaries statically -g++ -std=gnu++11 -I./src -I./src -DKJ_HEADER_WARNINGS -DCAPNP_HEADER_WARNINGS -DCAPNP_INCLUDE_DIR=\"/usr/local/include\" -pthread -O2 -DNDEBUG -pthread -pthread -o .libs/capnp src/capnp/compiler/module-loader.o src/capnp/compiler/capnp.o ./.libs/libcapnpc.a ./.libs/libcapnp.a ./.libs/libkj.a -lpthread -pthread - -g++ -std=gnu++11 -I./src -I./src -DKJ_HEADER_WARNINGS -DCAPNP_HEADER_WARNINGS -DCAPNP_INCLUDE_DIR=\"/usr/local/include\" -pthread -O2 -DNDEBUG -pthread -pthread -o .libs/capnpc-c++ src/capnp/compiler/capnpc-c++.o ./.libs/libcapnp.a ./.libs/libkj.a -lpthread -pthread - -g++ -std=gnu++11 -I./src -I./src -DKJ_HEADER_WARNINGS -DCAPNP_HEADER_WARNINGS -DCAPNP_INCLUDE_DIR=\"/usr/local/include\" -pthread -O2 -DNDEBUG -pthread -pthread -o .libs/capnpc-capnp src/capnp/compiler/capnpc-capnp.o ./.libs/libcapnp.a ./.libs/libkj.a -lpthread -pthread - -cp .libs/capnp /usr/local/bin/ -cp .libs/capnpc-c++ /usr/local/bin/ -cp .libs/capnpc-capnp /usr/local/bin/ -cp .libs/*.a /usr/local/lib - -cd /tmp -echo "Installing c-capnp" -git clone https://github.com/commaai/c-capnproto.git -cd c-capnproto -git submodule update --init --recursive -autoreconf -f -i -s -CXXFLAGS="-fPIC" ./configure -make -j$(nproc) -make install - -# manually build binaries statically -gcc -fPIC -o .libs/capnpc-c compiler/capnpc-c.o compiler/schema.capnp.o compiler/str.o ./.libs/libcapnp_c.a - -cp .libs/capnpc-c /usr/local/bin/ -cp .libs/*.a /usr/local/lib diff --git a/log.capnp b/log.capnp index 3744c46126f7fb..c766ef78ec878c 100644 --- a/log.capnp +++ b/log.capnp @@ -855,6 +855,8 @@ struct LiveLocationKalman { gpsTimeOfWeek @14 :Float64; status @15 :Status; unixTimestampMillis @16 :Int64; + inputsOK @17 :Bool = true; + posenetOK @18 :Bool = true; enum Status { uninitialized @0; diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index d37b8c986d7237..0a51d12c17ff4e 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -143,7 +143,11 @@ int MSGQPubSocket::connect(Context *context, std::string endpoint){ assert(context); q = new msgq_queue_t; - msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); + int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); + if (r != 0){ + return r; + } + msgq_init_publisher(q); return 0; diff --git a/messaging/messaging.cc b/messaging/messaging.cc index 1a9f860cd35666..9e2ae409d1dc08 100644 --- a/messaging/messaging.cc +++ b/messaging/messaging.cc @@ -2,9 +2,15 @@ #include "impl_zmq.hpp" #include "impl_msgq.hpp" +#ifdef __APPLE__ +const bool MUST_USE_ZMQ = true; +#else +const bool MUST_USE_ZMQ = false; +#endif + Context * Context::create(){ Context * c; - if (std::getenv("ZMQ")){ + if (std::getenv("ZMQ") || MUST_USE_ZMQ){ c = new ZMQContext(); } else { c = new MSGQContext(); @@ -14,7 +20,7 @@ Context * Context::create(){ SubSocket * SubSocket::create(){ SubSocket * s; - if (std::getenv("ZMQ")){ + if (std::getenv("ZMQ") || MUST_USE_ZMQ){ s = new ZMQSubSocket(); } else { s = new MSGQSubSocket(); @@ -60,7 +66,7 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri PubSocket * PubSocket::create(){ PubSocket * s; - if (std::getenv("ZMQ")){ + if (std::getenv("ZMQ") || MUST_USE_ZMQ){ s = new ZMQPubSocket(); } else { s = new MSGQPubSocket(); @@ -82,7 +88,7 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){ Poller * Poller::create(){ Poller * p; - if (std::getenv("ZMQ")){ + if (std::getenv("ZMQ") || MUST_USE_ZMQ){ p = new ZMQPoller(); } else { p = new MSGQPoller(); diff --git a/messaging/messaging.hpp b/messaging/messaging.hpp index 79968c97986f01..20bb35213c02ae 100644 --- a/messaging/messaging.hpp +++ b/messaging/messaging.hpp @@ -1,7 +1,10 @@ #pragma once #include -#include +#include #include +#include +#include +#include "../gen/cpp/log.capnp.h" #define MSG_MULTIPLE_PUBLISHERS 100 @@ -54,3 +57,36 @@ class Poller { static Poller * create(std::vector sockets); virtual ~Poller(){}; }; + +class SubMaster { + public: + SubMaster(const std::initializer_list &service_list, + const char *address = nullptr, const std::initializer_list &ignore_alive = {}); + int update(int timeout = 1000); + inline bool allAlive(const std::initializer_list &service_list = {}) { return all_(service_list, false, true); } + inline bool allValid(const std::initializer_list &service_list = {}) { return all_(service_list, true, false); } + inline bool allAliveAndValid(const std::initializer_list &service_list = {}) { return all_(service_list, true, true); } + bool updated(const char *name) const; + void drain(); + cereal::Event::Reader &operator[](const char *name); + ~SubMaster(); + + private: + bool all_(const std::initializer_list &service_list, bool valid, bool alive); + Poller *poller_ = nullptr; + uint64_t frame_ = 0; + struct SubMessage; + std::map messages_; + std::map services_; +}; + +class PubMaster { + public: + PubMaster(const std::initializer_list &service_list); + inline int send(const char *name, capnp::byte *data, size_t size) { return sockets_.at(name)->send((char *)data, size); } + int send(const char *name, capnp::MessageBuilder &msg); + ~PubMaster(); + + private: + std::map sockets_; +}; diff --git a/messaging/messaging_pyx_setup.py b/messaging/messaging_pyx_setup.py index f5962d0c2b2310..3e1c41ce15f58b 100644 --- a/messaging/messaging_pyx_setup.py +++ b/messaging/messaging_pyx_setup.py @@ -30,7 +30,7 @@ def get_ext_filename(self, ext_name): sourcefiles = ['messaging_pyx.pyx'] -extra_compile_args = ["-std=c++11"] +extra_compile_args = ["-std=c++14"] libraries = ['zmq'] ARCH = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip() # pylint: disable=unexpected-keyword-arg diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 4ccd13df44527a..7c6210ead16c22 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -90,8 +90,10 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ auto fd = open(full_path, O_RDWR | O_CREAT, 0777); delete[] full_path; - if (fd < 0) + if (fd < 0) { + std::cout << "Warning, could not open: " << full_path << std::endl; return -1; + } int rc = ftruncate(fd, size + sizeof(msgq_header_t)); if (rc < 0) diff --git a/messaging/socketmaster.cc b/messaging/socketmaster.cc new file mode 100644 index 00000000000000..66b7696cf299dc --- /dev/null +++ b/messaging/socketmaster.cc @@ -0,0 +1,162 @@ +#include +#include +#include "messaging.hpp" +#include "services.h" +#ifdef __APPLE__ +#define CLOCK_BOOTTIME CLOCK_MONOTONIC +#endif +static inline uint64_t nanos_since_boot() { + struct timespec t; + clock_gettime(CLOCK_BOOTTIME, &t); + return t.tv_sec * 1000000000ULL + t.tv_nsec; +} +static const service *get_service(const char *name) { + for (const auto &it : services) { + if (strcmp(it.name, name) == 0) return ⁢ + } + return nullptr; +} +static inline bool inList(const std::initializer_list &list, const char *value) { + for (auto &v : list) { + if (strcmp(value, v) == 0) return true; + } + return false; +} + +class MessageContext { + public: + MessageContext() { ctx_ = Context::create(); } + ~MessageContext() { delete ctx_; } + Context *ctx_; +}; +MessageContext ctx; + +struct SubMaster::SubMessage { + std::string name; + SubSocket *socket = nullptr; + int freq = 0; + bool updated = false, alive = false, valid = false, ignore_alive; + uint64_t rcv_time = 0, rcv_frame = 0; + void *allocated_msg_reader = nullptr; + capnp::FlatArrayMessageReader *msg_reader = nullptr; + kj::Array buf; + cereal::Event::Reader event; +}; + +SubMaster::SubMaster(const std::initializer_list &service_list, const char *address, + const std::initializer_list &ignore_alive) { + poller_ = Poller::create(); + for (auto name : service_list) { + const service *serv = get_service(name); + assert(serv != nullptr); + SubSocket *socket = SubSocket::create(ctx.ctx_, name, address ? address : "127.0.0.1", true); + assert(socket != 0); + poller_->registerSocket(socket); + SubMessage *m = new SubMessage{ + .socket = socket, + .freq = serv->frequency, + .ignore_alive = inList(ignore_alive, name), + .allocated_msg_reader = malloc(sizeof(capnp::FlatArrayMessageReader)), + .buf = kj::heapArray(1024)}; + messages_[socket] = m; + services_[name] = m; + } +} + +int SubMaster::update(int timeout) { + if (++frame_ == UINT64_MAX) frame_ = 1; + for (auto &kv : messages_) kv.second->updated = false; + + int updated = 0; + auto sockets = poller_->poll(timeout); + uint64_t current_time = nanos_since_boot(); + for (auto s : sockets) { + Message *msg = s->receive(true); + if (msg == nullptr) continue; + + SubMessage *m = messages_.at(s); + const size_t size = (msg->getSize() / sizeof(capnp::word)) + 1; + if (m->buf.size() < size) { + m->buf = kj::heapArray(size); + } + memcpy(m->buf.begin(), msg->getData(), msg->getSize()); + delete msg; + + if (m->msg_reader) { + m->msg_reader->~FlatArrayMessageReader(); + } + m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(kj::ArrayPtr(m->buf.begin(), size)); + m->event = m->msg_reader->getRoot(); + m->updated = true; + m->rcv_time = current_time; + m->rcv_frame = frame_; + m->valid = m->event.getValid(); + + ++updated; + } + + for (auto &kv : messages_) { + SubMessage *m = kv.second; + m->alive = (m->freq <= (1e-5) || ((current_time - m->rcv_time) * (1e-9)) < (10.0 / m->freq)); + } + return updated; +} + +bool SubMaster::all_(const std::initializer_list &service_list, bool valid, bool alive) { + int found = 0; + for (auto &kv : messages_) { + SubMessage *m = kv.second; + if (service_list.size() == 0 || inList(service_list, m->name.c_str())) { + found += (!valid || m->valid) && (!alive || (m->alive && !m->ignore_alive)); + } + } + return service_list.size() == 0 ? found == messages_.size() : found == service_list.size(); +} + +void SubMaster::drain() { + while (true) { + auto polls = poller_->poll(0); + if (polls.size() == 0) + break; + + for (auto sock : polls) { + Message *msg = sock->receive(true); + delete msg; + } + } +} + +bool SubMaster::updated(const char *name) const { return services_.at(name)->updated; } +cereal::Event::Reader &SubMaster::operator[](const char *name) { return services_.at(name)->event; }; + +SubMaster::~SubMaster() { + delete poller_; + for (auto &kv : messages_) { + SubMessage *m = kv.second; + if (m->msg_reader) { + m->msg_reader->~FlatArrayMessageReader(); + } + free(m->allocated_msg_reader); + delete m->socket; + delete m; + } +} + +PubMaster::PubMaster(const std::initializer_list &service_list) { + for (auto name : service_list) { + assert(get_service(name) != nullptr); + PubSocket *socket = PubSocket::create(ctx.ctx_, name); + assert(socket); + sockets_[name] = socket; + } +} + +int PubMaster::send(const char *name, capnp::MessageBuilder &msg) { + auto words = capnp::messageToFlatArray(msg); + auto bytes = words.asBytes(); + return send(name, bytes.begin(), bytes.size()); +} + +PubMaster::~PubMaster() { + for (auto s : sockets_) delete s.second; +}