diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index 3b07fe5dcea303..a930a80d5b5d0a 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -119,6 +119,7 @@ void MSGQSubSocket::setTimeout(int t){ } MSGQSubSocket::~MSGQSubSocket(){ + msgq_close_queue(q); delete q; } @@ -149,6 +150,7 @@ int MSGQPubSocket::send(char *data, size_t size){ } MSGQPubSocket::~MSGQPubSocket(){ + msgq_close_queue(q); delete q; } diff --git a/messaging/msgq.cc b/messaging/msgq.cc index f58072a411947f..89be689854b156 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -77,17 +77,24 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ auto fd = open(full_path, O_RDWR | O_CREAT, 0777); delete[] full_path; + assert(fd >= 0); // TODO: properly handle exit codes if (fd < 0) return -1; int rc = ftruncate(fd, size + sizeof(msgq_header_t)); + assert(rc == 0); // TODO: properly handle exit codes if (rc < 0) return -1; char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + + assert(mem != NULL); // TODO: properly handle exit codes if (mem == NULL) return -1; + q->mmap_p = mem; + msgq_header_t *header = (msgq_header_t *)mem; // Setup pointers to header segment @@ -107,18 +114,34 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ q->endpoint = path; q->read_conflate = false; + q->read_fifo = -1; return 0; } +void msgq_close_queue(msgq_queue_t *q){ + if (q->read_fifo >= 0){ + close(q->read_fifo); + } + + for (uint64_t i = 0; i < NUM_READERS; i++){ + if (q->read_fifos[i] >= 0){ + close(q->read_fifos[i]); + } + } + + if (q->mmap_p != NULL){ + munmap(q->mmap_p, q->size + sizeof(msgq_header_t)); + } +} + void msgq_init_publisher(msgq_queue_t * q) { std::cout << "Starting publisher" << std::endl; - std::random_device rd; - std::default_random_engine generator(rd()); + std::random_device rd("/dev/urandom"); std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - uint64_t uid = distribution(generator); + uint64_t uid = distribution(rd); *q->write_uid = uid; *q->num_readers = 0; @@ -134,10 +157,12 @@ void msgq_init_publisher(msgq_queue_t * q) { } void msgq_init_subscriber(msgq_queue_t * q) { - std::random_device rd; - std::default_random_engine generator(rd()); + assert(q != NULL); + assert(q->num_readers != NULL); + + std::random_device rd("/dev/urandom"); std::uniform_int_distribution distribution(0,std::numeric_limits::max()); - uint64_t uid = distribution(generator); + uint64_t uid = distribution(rd); // Get reader id while (true){ @@ -179,12 +204,16 @@ void msgq_init_subscriber(msgq_queue_t * q) { std::cout << q->read_fifo_path << std::endl; int r = mkfifo(q->read_fifo_path.c_str(), 0777); + if (r != 0) + perror("Fifo: "); assert(r == 0); q->read_fifo = open(q->read_fifo_path.c_str(), O_RDWR | O_NONBLOCK); // Fysnc so the fifo shows up in the directory - fsync(open("/dev/shm", O_RDONLY)); + auto shm_fd = open("/dev/shm", O_RDONLY); + fsync(shm_fd); + close(shm_fd); std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << std::endl; msgq_reset_reader(q); @@ -271,6 +300,11 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ // Open fifo when not set, or when reader changes if (q->read_fifos[i] == -1 || q->read_fifos_uid[i] != reader_uid){ + // Close old reader fifo + if (q->read_fifos[i] >= 0){ + close(q->read_fifos[i]); + } + q->read_fifos_uid[i] = reader_uid; std::string path = "/dev/shm/fifo-"; diff --git a/messaging/msgq.hpp b/messaging/msgq.hpp index eb5c1ed940a3ce..116523e38d8250 100644 --- a/messaging/msgq.hpp +++ b/messaging/msgq.hpp @@ -27,6 +27,7 @@ struct msgq_queue_t { std::atomic *read_pointers[NUM_READERS]; std::atomic *read_valids[NUM_READERS]; std::atomic *read_uids[NUM_READERS]; + char * mmap_p; char * data; size_t size; int reader_id; @@ -62,6 +63,7 @@ int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size); int msgq_msg_close(msgq_msg_t *msg); int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size); +void msgq_close_queue(msgq_queue_t *q); void msgq_init_publisher(msgq_queue_t * q); void msgq_init_subscriber(msgq_queue_t * q);