Skip to content

Commit

Permalink
add all msgq files, but dont use as default
Browse files Browse the repository at this point in the history
  • Loading branch information
pd0wm committed Nov 5, 2019
1 parent a68a38f commit d35515a
Show file tree
Hide file tree
Showing 10 changed files with 879 additions and 8 deletions.
1 change: 1 addition & 0 deletions messaging/.gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
demo
bridge
*.o
*.d
*.a
Expand Down
27 changes: 20 additions & 7 deletions messaging/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,46 @@ endif

CXXFLAGS += $(ZMQ_FLAGS) $(YAML_FLAGS)

OBJS := messaging.o impl_zmq.o
OBJS := messaging.o impl_zmq.o impl_msgq.o msgq.o
DEPS=$(OBJS:.o=.d)

.PRECIOUS: $(OBJS)
.PHONY: all clean
all: messaging.a messaging_pyx.so
all: bridge messaging.a messaging_pyx.so messaging.so

demo: messaging.a demo.o
$(CC) $(LDFLAGS) $^ $(LDLIBS) -L. -l:messaging.a -o '$@'

bridge: messaging.a bridge.o
$(CC) $(LDFLAGS) $^ $(LDLIBS) -L. -l:messaging.a -o '$@'

messaging_pyx.so: messaging.a messaging_pyx_setup.py messaging_pyx.pyx messaging.pxd
python3 messaging_pyx_setup.py build_ext --inplace
rm -rf build
rm -f messaging_pyx.cpp

messaging.so: $(OBJS)
@echo "[ LINK ] $@"
mkdir -p libs_so; \
cd libs_so; \
ar -x $(ZMQ_LIBS); \
ar -x $(YAML_LIB);

$(CXX) -shared $(LDFLAGS) $^ $(LDLIBS) libs_so/*.o -o '$@'
rm -r libs_so

%.a: $(OBJS)
@echo "[ LINK ] $@"
mkdir -p libs; \
cd libs; \
mkdir -p libs_a; \
cd libs_a; \
ar -x $(ZMQ_LIBS); \
ar -x $(YAML_LIB);

ar rcsD '$@' $^ libs/*.o
rm -r libs
ar rcsD '$@' $^ libs_a/*.o
rm -r libs_a

clean:
@echo "[ CLEAN ]"
rm -rf *.so *.a demo libs $(OBJS) $(DEPS)
rm -rf *.so *.a bridge demo libs_a libs_so $(OBJS) $(DEPS)

-include $(DEPS)
2 changes: 1 addition & 1 deletion messaging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(self, services, ignore_alive=None, addr="127.0.0.1"):
def __getitem__(self, s):
return self.data[s]

def update(self, timeout=-1):
def update(self, timeout=1000):
msgs = []
for sock in self.poller.poll(timeout):
msgs.append(recv_one(sock))
Expand Down
77 changes: 77 additions & 0 deletions messaging/bridge.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <iostream>
#include <string>
#include <cassert>
#include <csignal>
#include <map>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wshadow"
#include <yaml-cpp/yaml.h>
#pragma GCC diagnostic pop

#include "impl_msgq.hpp"
#include "impl_zmq.hpp"

void sigpipe_handler(int sig) {
assert(sig == SIGPIPE);
std::cout << "SIGPIPE received" << std::endl;
}

static std::vector<std::string> get_services() {
char * base_dir_ptr = std::getenv("BASEDIR");

if (base_dir_ptr == NULL){
base_dir_ptr = std::getenv("PYTHONPATH");
}

assert(base_dir_ptr);
std::string base_dir = base_dir_ptr;
std::string service_list_path = base_dir + "/selfdrive/service_list.yaml";
YAML::Node service_list = YAML::LoadFile(service_list_path);

std::vector<std::string> name_list;

for (const auto& it : service_list) {
auto name = it.first.as<std::string>();
if (name == "plusFrame" || name == "uiLayoutState") continue;
name_list.push_back(name);
}

return name_list;

}


int main(void){
signal(SIGPIPE, (sighandler_t)sigpipe_handler);

auto endpoints = get_services();

std::map<SubSocket*, PubSocket*> sub2pub;

Context *zmq_context = new ZMQContext();
Context *msgq_context = new MSGQContext();
Poller *poller = new MSGQPoller();

for (auto endpoint: endpoints){
SubSocket * msgq_sock = new MSGQSubSocket();
msgq_sock->connect(msgq_context, endpoint, "127.0.0.1", false);
poller->registerSocket(msgq_sock);

PubSocket * zmq_sock = new ZMQPubSocket();
zmq_sock->connect(zmq_context, endpoint);

sub2pub[msgq_sock] = zmq_sock;
}


while (true){
for (auto sub_sock : poller->poll(100)){
Message * msg = sub_sock->receive();
if (msg == NULL) continue;
sub2pub[sub_sock]->sendMessage(msg);
delete msg;
}
}
return 0;
}
175 changes: 175 additions & 0 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#include <cassert>
#include <cstring>
#include <iostream>
#include <cstdlib>
#include <csignal>
#include <cerrno>


#include "impl_msgq.hpp"

volatile sig_atomic_t msgq_do_exit = 0;

void sig_handler(int signal) {
assert(signal == SIGINT || signal == SIGTERM);
msgq_do_exit = 1;
}


MSGQContext::MSGQContext() {
}

MSGQContext::~MSGQContext() {
}

void MSGQMessage::init(size_t sz) {
size = sz;
data = new char[size];
}

void MSGQMessage::init(char * d, size_t sz) {
size = sz;
data = new char[size];
memcpy(data, d, size);
}

void MSGQMessage::takeOwnership(char * d, size_t sz) {
size = sz;
data = d;
}

void MSGQMessage::close() {
if (size > 0){
delete[] data;
}
size = 0;
}

MSGQMessage::~MSGQMessage() {
this->close();
}


void MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){
assert(context);
assert(address == "127.0.0.1");

q = new msgq_queue_t;
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
msgq_init_subscriber(q);

if (conflate){
q->read_conflate = true;
}

timeout = -1;

std::cout << "MSGQ SUB: " << endpoint << std::endl;
}


Message * MSGQSubSocket::receive(bool non_blocking){
msgq_do_exit = 0;

void (*prev_handler_sigint)(int);
void (*prev_handler_sigterm)(int);
if (!non_blocking){
prev_handler_sigint = std::signal(SIGINT, sig_handler);
prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
}

msgq_msg_t msg;

MSGQMessage *r = NULL;
r = NULL;

int rc = msgq_msg_recv(&msg, q);

// Hack to implement blocking read with a poller. Don't use this
while (!non_blocking && rc == 0 && msgq_do_exit == 0){
msgq_pollitem_t items[1];
items[0].q = q;

int t = (timeout != -1) ? timeout : 100;

msgq_poll(items, 1, t);
rc = msgq_msg_recv(&msg, q);

if (timeout != -1){
break;
}
}

if (rc > 0){
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
errno = msgq_do_exit ? EINTR : 0;

if (!non_blocking){
std::signal(SIGINT, prev_handler_sigint);
std::signal(SIGTERM, prev_handler_sigterm);
}

return (Message*)r;
}

void MSGQSubSocket::setTimeout(int t){
timeout = t;
}

MSGQSubSocket::~MSGQSubSocket(){
delete q;
}

void MSGQPubSocket::connect(Context *context, std::string endpoint){
assert(context);

q = new msgq_queue_t;
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE);
msgq_init_publisher(q);

std::cout << "MSGQ PUB: " << endpoint << std::endl;
}

int MSGQPubSocket::sendMessage(Message *message){
msgq_msg_t msg;
msg.data = message->getData();
msg.size = message->getSize();

return msgq_msg_send(&msg, q);
}

int MSGQPubSocket::send(char *data, size_t size){
msgq_msg_t msg;
msg.data = data;
msg.size = size;

return msgq_msg_send(&msg, q);
}

MSGQPubSocket::~MSGQPubSocket(){
delete q;
}


void MSGQPoller::registerSocket(SubSocket * socket){
assert(num_polls + 1 < MAX_POLLERS);
polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket();

sockets.push_back(socket);
num_polls++;
}

std::vector<SubSocket*> MSGQPoller::poll(int timeout){
std::vector<SubSocket*> r;

msgq_poll(polls, num_polls, timeout);
for (size_t i = 0; i < num_polls; i++){
if (polls[i].revents){
r.push_back(sockets[i]);
}
}

return r;
}
64 changes: 64 additions & 0 deletions messaging/impl_msgq.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once
#include "messaging.hpp"
#include "msgq.hpp"
#include <zmq.h>
#include <string>

#define MAX_POLLERS 128

class MSGQContext : public Context {
private:
void * context = NULL;
public:
MSGQContext();
void * getRawContext() {return context;}
~MSGQContext();
};

class MSGQMessage : public Message {
private:
char * data;
size_t size;
public:
void init(size_t size);
void init(char *data, size_t size);
void takeOwnership(char *data, size_t size);
size_t getSize(){return size;}
char * getData(){return data;}
void close();
~MSGQMessage();
};

class MSGQSubSocket : public SubSocket {
private:
msgq_queue_t * q;
int timeout;
public:
void connect(Context *context, std::string endpoint, std::string address, bool conflate=false);
void setTimeout(int timeout);
void * getRawSocket() {return (void*)q;}
Message *receive(bool non_blocking=false);
~MSGQSubSocket();
};

class MSGQPubSocket : public PubSocket {
private:
msgq_queue_t * q;
public:
void connect(Context *context, std::string endpoint);
int sendMessage(Message *message);
int send(char *data, size_t size);
~MSGQPubSocket();
};

class MSGQPoller : public Poller {
private:
std::vector<SubSocket*> sockets;
msgq_pollitem_t polls[MAX_POLLERS];
size_t num_polls = 0;

public:
void registerSocket(SubSocket *socket);
std::vector<SubSocket*> poll(int timeout);
~MSGQPoller(){};
};
Loading

0 comments on commit d35515a

Please sign in to comment.