diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index bf017d900b6e3f..d37b8c986d7237 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){ msgq_msg_t msg; MSGQMessage *r = NULL; - r = NULL; int rc = msgq_msg_recv(&msg, q); @@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){ } } - if (rc > 0){ - r = new MSGQMessage; - r->takeOwnership(msg.data, msg.size); - } - errno = msgq_do_exit ? EINTR : 0; if (!non_blocking){ std::signal(SIGINT, prev_handler_sigint); std::signal(SIGTERM, prev_handler_sigterm); } + errno = msgq_do_exit ? EINTR : 0; + + if (rc > 0){ + if (msgq_do_exit){ + msgq_msg_close(&msg); // Free unused message on exit + } else { + r = new MSGQMessage; + r->takeOwnership(msg.data, msg.size); + } + } + return (Message*)r; } diff --git a/messaging/messaging.cc b/messaging/messaging.cc index 9b12b298495112..1a9f860cd35666 100644 --- a/messaging/messaging.cc +++ b/messaging/messaging.cc @@ -4,20 +4,20 @@ Context * Context::create(){ Context * c; - if (std::getenv("MSGQ")){ - c = new MSGQContext(); - } else { + if (std::getenv("ZMQ")){ c = new ZMQContext(); + } else { + c = new MSGQContext(); } return c; } SubSocket * SubSocket::create(){ SubSocket * s; - if (std::getenv("MSGQ")){ - s = new MSGQSubSocket(); - } else { + if (std::getenv("ZMQ")){ s = new ZMQSubSocket(); + } else { + s = new MSGQSubSocket(); } return s; } @@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri PubSocket * PubSocket::create(){ PubSocket * s; - if (std::getenv("MSGQ")){ - s = new MSGQPubSocket(); - } else { + if (std::getenv("ZMQ")){ s = new ZMQPubSocket(); + } else { + s = new MSGQPubSocket(); } return s; } @@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){ Poller * Poller::create(){ Poller * p; - if (std::getenv("MSGQ")){ - p = new MSGQPoller(); - } else { + if (std::getenv("ZMQ")){ p = new ZMQPoller(); + } else { + p = new MSGQPoller(); } return p; } diff --git a/messaging/msgq.cc b/messaging/msgq.cc index e185b65e69663a..4ccd13df44527a 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -23,8 +23,8 @@ #include "msgq.hpp" -void sigusr1_handler(int signal) { - assert(signal == SIGUSR1); +void sigusr2_handler(int signal) { + assert(signal == SIGUSR2); } uint64_t msgq_get_uid(void){ @@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes - std::signal(SIGUSR1, sigusr1_handler); + std::signal(SIGUSR2, sigusr2_handler); const char * prefix = "/dev/shm/"; char * full_path = new char[strlen(path) + strlen(prefix) + 1]; @@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){ void msgq_init_publisher(msgq_queue_t * q) { - std::cout << "Starting publisher" << std::endl; + //std::cout << "Starting publisher" << std::endl; uint64_t uid = msgq_get_uid(); *q->write_uid = uid; @@ -153,9 +153,9 @@ void msgq_init_publisher(msgq_queue_t * q) { static void thread_signal(uint32_t tid) { #ifndef SYS_tkill // TODO: this won't work for multithreaded programs - kill(tid, SIGUSR1); + kill(tid, SIGUSR2); #else - syscall(SYS_tkill, tid, SIGUSR1); + syscall(SYS_tkill, tid, SIGUSR2); #endif } @@ -205,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) { } } - std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; + //std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; msgq_reset_reader(q); }