Skip to content

Commit

Permalink
C++ SubMaster: make readers always valid (commaai#139)
Browse files Browse the repository at this point in the history
* refactor submaster

* cleanup
  • Loading branch information
deanlee authored May 15, 2021
1 parent 4ecee78 commit a6f4b63
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
2 changes: 1 addition & 1 deletion messaging/messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SubMaster {
bool valid(const char *name) const;
uint64_t rcv_frame(const char *name) const;
uint64_t rcv_time(const char *name) const;
cereal::Event::Reader &operator[](const char *name);
cereal::Event::Reader &operator[](const char *name) const;

private:
bool all_(const std::initializer_list<const char *> &service_list, bool valid, bool alive);
Expand Down
27 changes: 16 additions & 11 deletions messaging/socketmaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <assert.h>
#include <stdlib.h>
#include <string>
#include <mutex>

#include "services.h"
#include "messaging.h"
Expand Down Expand Up @@ -30,11 +31,18 @@ static inline bool inList(const std::initializer_list<const char *> &list, const

class MessageContext {
public:
MessageContext() { ctx_ = Context::create(); }
MessageContext() : ctx_(nullptr) {};
~MessageContext() { delete ctx_; }
inline Context *context() {
std::call_once(init_flag, [=]() { ctx_ = Context::create(); });
return ctx_;
}
private:
Context *ctx_;
std::once_flag init_flag;
};
MessageContext ctx;

MessageContext message_context;

struct SubMaster::SubMessage {
std::string name;
Expand All @@ -54,7 +62,7 @@ SubMaster::SubMaster(const std::initializer_list<const char *> &service_list, co
for (auto name : service_list) {
const service *serv = get_service(name);
assert(serv != nullptr);
SubSocket *socket = SubSocket::create(ctx.ctx_, name, address ? address : "127.0.0.1", true);
SubSocket *socket = SubSocket::create(message_context.context(), name, address ? address : "127.0.0.1", true);
assert(socket != 0);
poller_->registerSocket(socket);
SubMessage *m = new SubMessage{
Expand All @@ -63,6 +71,7 @@ SubMaster::SubMaster(const std::initializer_list<const char *> &service_list, co
.freq = serv->frequency,
.ignore_alive = inList(ignore_alive, name),
.allocated_msg_reader = malloc(sizeof(capnp::FlatArrayMessageReader))};
m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader({});
messages_[socket] = m;
services_[name] = m;
}
Expand All @@ -82,9 +91,7 @@ void SubMaster::update(int timeout) {

SubMessage *m = messages_.at(s);

if (m->msg_reader) {
m->msg_reader->~FlatArrayMessageReader();
}
m->msg_reader->~FlatArrayMessageReader();
m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(m->aligned_buf.align(msg));
delete msg;
messages.push_back({m->name, m->msg_reader->getRoot<cereal::Event>()});
Expand Down Expand Up @@ -162,17 +169,15 @@ uint64_t SubMaster::rcv_time(const char *name) const {
return services_.at(name)->rcv_time;
}

cereal::Event::Reader &SubMaster::operator[](const char *name) {
cereal::Event::Reader &SubMaster::operator[](const char *name) const {
return services_.at(name)->event;
};

SubMaster::~SubMaster() {
delete poller_;
for (auto &kv : messages_) {
SubMessage *m = kv.second;
if (m->msg_reader) {
m->msg_reader->~FlatArrayMessageReader();
}
m->msg_reader->~FlatArrayMessageReader();
free(m->allocated_msg_reader);
delete m->socket;
delete m;
Expand All @@ -182,7 +187,7 @@ SubMaster::~SubMaster() {
PubMaster::PubMaster(const std::initializer_list<const char *> &service_list) {
for (auto name : service_list) {
assert(get_service(name) != nullptr);
PubSocket *socket = PubSocket::create(ctx.ctx_, name);
PubSocket *socket = PubSocket::create(message_context.context(), name);
assert(socket);
sockets_[name] = socket;
}
Expand Down

0 comments on commit a6f4b63

Please sign in to comment.