Skip to content

Commit

Permalink
Switch default to msgq (#21)
Browse files Browse the repository at this point in the history
* switch default to msgq

* SIGUSR1 is already used by the apks

* Don't return message upstream when exiting

* Remove debug print

* Remove more debug print
  • Loading branch information
pd0wm authored Jan 13, 2020
1 parent a457ffa commit c6b5c73
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 25 deletions.
17 changes: 11 additions & 6 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
msgq_msg_t msg;

MSGQMessage *r = NULL;
r = NULL;

int rc = msgq_msg_recv(&msg, q);

Expand All @@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){
}
}

if (rc > 0){
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
errno = msgq_do_exit ? EINTR : 0;

if (!non_blocking){
std::signal(SIGINT, prev_handler_sigint);
std::signal(SIGTERM, prev_handler_sigterm);
}

errno = msgq_do_exit ? EINTR : 0;

if (rc > 0){
if (msgq_do_exit){
msgq_msg_close(&msg); // Free unused message on exit
} else {
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
}

return (Message*)r;
}

Expand Down
24 changes: 12 additions & 12 deletions messaging/messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@

Context * Context::create(){
Context * c;
if (std::getenv("MSGQ")){
c = new MSGQContext();
} else {
if (std::getenv("ZMQ")){
c = new ZMQContext();
} else {
c = new MSGQContext();
}
return c;
}

SubSocket * SubSocket::create(){
SubSocket * s;
if (std::getenv("MSGQ")){
s = new MSGQSubSocket();
} else {
if (std::getenv("ZMQ")){
s = new ZMQSubSocket();
} else {
s = new MSGQSubSocket();
}
return s;
}
Expand Down Expand Up @@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri

PubSocket * PubSocket::create(){
PubSocket * s;
if (std::getenv("MSGQ")){
s = new MSGQPubSocket();
} else {
if (std::getenv("ZMQ")){
s = new ZMQPubSocket();
} else {
s = new MSGQPubSocket();
}
return s;
}
Expand All @@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){

Poller * Poller::create(){
Poller * p;
if (std::getenv("MSGQ")){
p = new MSGQPoller();
} else {
if (std::getenv("ZMQ")){
p = new ZMQPoller();
} else {
p = new MSGQPoller();
}
return p;
}
Expand Down
14 changes: 7 additions & 7 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

#include "msgq.hpp"

void sigusr1_handler(int signal) {
assert(signal == SIGUSR1);
void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);
}

uint64_t msgq_get_uid(void){
Expand Down Expand Up @@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes

std::signal(SIGUSR1, sigusr1_handler);
std::signal(SIGUSR2, sigusr2_handler);

const char * prefix = "/dev/shm/";
char * full_path = new char[strlen(path) + strlen(prefix) + 1];
Expand Down Expand Up @@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){


void msgq_init_publisher(msgq_queue_t * q) {
std::cout << "Starting publisher" << std::endl;
//std::cout << "Starting publisher" << std::endl;
uint64_t uid = msgq_get_uid();

*q->write_uid = uid;
Expand All @@ -153,9 +153,9 @@ void msgq_init_publisher(msgq_queue_t * q) {
static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR1);
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR1);
syscall(SYS_tkill, tid, SIGUSR2);
#endif
}

Expand Down Expand Up @@ -205,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
}
}

std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
//std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
msgq_reset_reader(q);
}

Expand Down

0 comments on commit c6b5c73

Please sign in to comment.