Skip to content

Commit

Permalink
Squashed 'cereal/' changes from b8382bb..01942b8
Browse files Browse the repository at this point in the history
01942b8 add TODO
b74a456 don't hardcode the lists
ed5a4bf add face stds
396a2bb add can error counter to controlsState
c6b5c73 Switch default to msgq (#21)
a457ffa Fix indentation in readme.md
a1fc8c7 explicitly mention Python for syntax colouring (#20)
19e2393 Fix expected for cameraOdometry and liveCalibration
e7d2f97 Add radar comm issue error
db64cd4 Reserve safety #21 for VAG PQ35/PQ46/NMS (#19)
79d638d separate honda safety models between Bosch Giraffe and Bosch Nidec
2614a65 better name
b6b84cd add longitudinal
78f5934 Add canRxErrs to health
6758899 qlog liveCalibration
df80b87 add more stuff to fw log in CarParams
a87805a fix doxs
4746b20 got doxed
21cf3f5 build on mac
31ac47c Add carUnrecognized event

git-subtree-dir: cereal
git-subtree-split: 01942b8
  • Loading branch information
Vehicle Researcher committed Jan 15, 2020
1 parent e3b2117 commit 9504037
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ libmessaging.*
libmessaging_shared.*
services.h
.sconsign.dblite
libcereal_shared.so
libcereal_shared.*

42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
What is cereal?
----

cereal is both a messaging spec for robotics systems as well as generic high performance IPC pub sub messaging with a single publisher and multiple subscribers.

Imagine this use case:
* A sensor process reads gyro measurements directly from an IMU and publishes a sensorEvents packet
* A calibration process subscribes to the sensorEvents packet to use the IMU
* A localization process subscribes to the sensorEvents packet to use the IMU also


Messaging Spec
----

You'll find the message types in [log.capnp](log.capnp). It uses [Cap'n proto](https://capnproto.org/capnp-tool.html) and defines one struct called Event.

All Events have a logMonoTime and a valid. Then a big union defines the packet type.


Pub Sub Backends
----

cereal supports two backends, one based on [zmq](https://zeromq.org/), the other called msgq, a custom pub sub based on shared memory that doesn't require the bytes to pass through the kernel.

Example
---
```python
import cereal.messaging as messaging

# in subscriber
sm = messaging.SubMaster(['sensorEvents'])
while 1:
sm.update()
print(sm['sensorEvents'])

# in publisher
pm = messaging.PubMaster(['sensorEvents'])
dat = messaging.new_message()
dat.init('sensorEvents', 1)
dat.sensorEvents[0] = {"gyro": {"v": [0.1, -0.1, 0.1]}}
pm.send('sensorEvents', dat)
```
4 changes: 2 additions & 2 deletions SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ cereal_objects = env.SharedObject([
])

env.Library('cereal', cereal_objects)
env.SharedLibrary('cereal_shared', cereal_objects)
env.SharedLibrary('cereal_shared', cereal_objects, LIBS=["capnp_c"])

cereal_dir = Dir('.')
services_h = env.Command(
Expand All @@ -49,7 +49,7 @@ Depends('messaging/impl_zmq.cc', services_h)

# note, this rebuilds the deps shared, zmq is statically linked to make APK happy
# TODO: get APK to load system zmq to remove the static link
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else []
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [zmq]
env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib)

env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq'])
Expand Down
20 changes: 16 additions & 4 deletions car.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ struct CarEvent @0x9b1657f34caf3ad3 {
lowMemory @63;
stockAeb @64;
ldw @65;
carUnrecognized @66;
radarCommIssue @67;
}
}

Expand Down Expand Up @@ -410,11 +412,11 @@ struct CarParams {

enum SafetyModel {
silent @0;
honda @1;
hondaNidec @1;
toyota @2;
elm327 @3;
gm @4;
hondaBosch @5;
hondaBoschGiraffe @5;
ford @6;
cadillac @7;
hyundai @8;
Expand All @@ -428,7 +430,9 @@ struct CarParams {
toyotaIpas @16;
allOutput @17;
gmAscm @18;
noOutput @19; # like silent but with silent CAN TXs
noOutput @19; # like silent but without silent CAN TXs
hondaBoschHarness @20;
volkswagenPq @21;
}

enum SteerControlType {
Expand All @@ -444,13 +448,21 @@ struct CarParams {

struct CarFw {
ecu @0 :Ecu;
fwVersion @1 :Text;
fwVersion @1 :Data;
address @2: UInt32;
subAddress @3: UInt8;
}

enum Ecu {
eps @0;
esp @1;
fwdRadar @2;
fwdCamera @3;
engine @4;
unknown @5;

# Toyota only
dsu @6;
apgs @7;
}
}
11 changes: 11 additions & 0 deletions log.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ struct HealthData {
hasGps @6 :Bool;
canSendErrs @7 :UInt32;
canFwdErrs @8 :UInt32;
canRxErrs @19 :UInt32;
gmlanSendErrs @9 :UInt32;
hwType @10 :HwType;
fanSpeedRpm @11 :UInt16;
Expand Down Expand Up @@ -484,6 +485,7 @@ struct ControlsState @0x97ff69c53601abf1 {
decelForTurn @47 :Bool;

decelForModel @54 :Bool;
canErrorCounter @57 :UInt32;

lateralControlState :union {
indiState @52 :LateralINDIState;
Expand Down Expand Up @@ -575,6 +577,7 @@ struct ModelData {
leadFuture @7 :LeadData;
speed @8 :List(Float32);
meta @10 :MetaData;
longitudinal @11 :LongitudinalData;

struct PathData {
points @0 :List(Float32);
Expand Down Expand Up @@ -605,13 +608,19 @@ struct ModelData {
yuvCorrection @5 :List(Float32);
inputTransform @6 :List(Float32);
}

struct MetaData {
engagedProb @0 :Float32;
desirePrediction @1 :List(Float32);
brakeDisengageProb @2 :Float32;
gasDisengageProb @3 :Float32;
steerOverrideProb @4 :Float32;
}

struct LongitudinalData {
speeds @0 :List(Float32);
accelerations @1 :List(Float32);
}
}

struct CalibrationFeatures {
Expand Down Expand Up @@ -1757,6 +1766,8 @@ struct DriverMonitoring {
leftBlinkProb @8 :Float32;
rightBlinkProb @9 :Float32;
irPwrDEPRECATED @10 :Float32;
faceOrientationStd @11 :List(Float32);
facePositionStd @12 :List(Float32);
}

struct Boot {
Expand Down
12 changes: 7 additions & 5 deletions messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# must be build with scons
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error
import capnp

assert MultiplePublishersError
assert MessagingError
Expand Down Expand Up @@ -116,6 +117,7 @@ def recv_one_retry(sock):
if dat is not None:
return log.Event.from_bytes(dat)

# TODO: This does not belong in messaging
def get_one_can(logcan):
while True:
can = recv_one_retry(logcan)
Expand Down Expand Up @@ -147,12 +149,12 @@ def __init__(self, services, ignore_alive=None, addr="127.0.0.1"):
self.freq[s] = service_list[s].frequency

data = new_message()
if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan',
'ethernetData', 'cellInfo', 'wifiScan',
'trafficEvents', 'orbObservation', 'carEvents']:
data.init(s, 0)
else:
try:
data.init(s)
except capnp.lib.capnp.KjException:
# lists
data.init(s, 0)

self.data[s] = getattr(data, s)
self.logMonoTime[s] = 0
self.valid[s] = data.valid
Expand Down
2 changes: 2 additions & 0 deletions messaging/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <csignal>
#include <map>

typedef void (*sighandler_t)(int sig);

#include "services.h"

#include "impl_msgq.hpp"
Expand Down
17 changes: 11 additions & 6 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
msgq_msg_t msg;

MSGQMessage *r = NULL;
r = NULL;

int rc = msgq_msg_recv(&msg, q);

Expand All @@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){
}
}

if (rc > 0){
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
errno = msgq_do_exit ? EINTR : 0;

if (!non_blocking){
std::signal(SIGINT, prev_handler_sigint);
std::signal(SIGTERM, prev_handler_sigterm);
}

errno = msgq_do_exit ? EINTR : 0;

if (rc > 0){
if (msgq_do_exit){
msgq_msg_close(&msg); // Free unused message on exit
} else {
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
}

return (Message*)r;
}

Expand Down
24 changes: 12 additions & 12 deletions messaging/messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@

Context * Context::create(){
Context * c;
if (std::getenv("MSGQ")){
c = new MSGQContext();
} else {
if (std::getenv("ZMQ")){
c = new ZMQContext();
} else {
c = new MSGQContext();
}
return c;
}

SubSocket * SubSocket::create(){
SubSocket * s;
if (std::getenv("MSGQ")){
s = new MSGQSubSocket();
} else {
if (std::getenv("ZMQ")){
s = new ZMQSubSocket();
} else {
s = new MSGQSubSocket();
}
return s;
}
Expand Down Expand Up @@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri

PubSocket * PubSocket::create(){
PubSocket * s;
if (std::getenv("MSGQ")){
s = new MSGQPubSocket();
} else {
if (std::getenv("ZMQ")){
s = new ZMQPubSocket();
} else {
s = new MSGQPubSocket();
}
return s;
}
Expand All @@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){

Poller * Poller::create(){
Poller * p;
if (std::getenv("MSGQ")){
p = new MSGQPoller();
} else {
if (std::getenv("ZMQ")){
p = new ZMQPoller();
} else {
p = new MSGQPoller();
}
return p;
}
Expand Down
24 changes: 16 additions & 8 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

#include "msgq.hpp"

void sigusr1_handler(int signal) {
assert(signal == SIGUSR1);
void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);
}

uint64_t msgq_get_uid(void){
Expand Down Expand Up @@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes

std::signal(SIGUSR1, sigusr1_handler);
std::signal(SIGUSR2, sigusr2_handler);

const char * prefix = "/dev/shm/";
char * full_path = new char[strlen(path) + strlen(prefix) + 1];
Expand Down Expand Up @@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){


void msgq_init_publisher(msgq_queue_t * q) {
std::cout << "Starting publisher" << std::endl;
//std::cout << "Starting publisher" << std::endl;
uint64_t uid = msgq_get_uid();

*q->write_uid = uid;
Expand All @@ -150,6 +150,15 @@ void msgq_init_publisher(msgq_queue_t * q) {
q->write_uid_local = uid;
}

static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
}

void msgq_init_subscriber(msgq_queue_t * q) {
assert(q != NULL);
assert(q->num_readers != NULL);
Expand All @@ -173,7 +182,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
*q->read_uids[i] = 0;

// Wake up reader in case they are in a poll
syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1);
thread_signal(old_uid & 0xFFFFFFFF);
}

continue;
Expand All @@ -196,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
}
}

std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
//std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
msgq_reset_reader(q);
}

Expand Down Expand Up @@ -278,8 +287,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
// Notify readers
for (uint64_t i = 0; i < num_readers; i++){
uint64_t reader_uid = *q->read_uids[i];

syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1);
thread_signal(reader_uid & 0xFFFFFFFF);
}

return msg->size;
Expand Down
Loading

0 comments on commit 9504037

Please sign in to comment.