From 4eebb95304594c063bf6cd79b1f95e4fd07daa87 Mon Sep 17 00:00:00 2001 From: deanlee Date: Fri, 15 Jan 2021 20:08:11 +0800 Subject: [PATCH 01/12] class SafeQueue --- release/files_common | 2 +- selfdrive/camerad/cameras/camera_common.cc | 22 +++------ selfdrive/camerad/cameras/camera_common.h | 5 +- selfdrive/common/SConscript | 2 +- selfdrive/common/cqueue.c | 54 ---------------------- selfdrive/common/cqueue.h | 33 ------------- selfdrive/common/queue.h | 38 +++++++++++++++ selfdrive/loggerd/omx_encoder.cc | 22 ++++----- selfdrive/loggerd/omx_encoder.h | 4 +- 9 files changed, 61 insertions(+), 121 deletions(-) delete mode 100644 selfdrive/common/cqueue.c delete mode 100644 selfdrive/common/cqueue.h create mode 100644 selfdrive/common/queue.h diff --git a/release/files_common b/release/files_common index fe12037f8fb468..05152663c62e12 100644 --- a/release/files_common +++ b/release/files_common @@ -193,7 +193,7 @@ selfdrive/common/swaglog.h selfdrive/common/swaglog.cc selfdrive/common/util.cc selfdrive/common/util.h -selfdrive/common/cqueue.[c,h] +selfdrive/common/queue.h selfdrive/common/clutil.cc selfdrive/common/clutil.h selfdrive/common/params.h diff --git a/selfdrive/camerad/cameras/camera_common.cc b/selfdrive/camerad/cameras/camera_common.cc index 8d63921f4530d3..93bb06c4e4ffb5 100644 --- a/selfdrive/camerad/cameras/camera_common.cc +++ b/selfdrive/camerad/cameras/camera_common.cc @@ -121,14 +121,7 @@ CameraBuf::~CameraBuf() { } bool CameraBuf::acquire() { - { - std::unique_lock lk(frame_queue_mutex); - bool got_frame = frame_queue_cv.wait_for(lk, std::chrono::milliseconds(1), [this]{ return !frame_queue.empty(); }); - if (!got_frame) return false; - - cur_buf_idx = frame_queue.front(); - frame_queue.pop(); - } + if (!safe_queue.try_pop(cur_buf_idx)) return false; const FrameMetadata &frame_data = camera_bufs_metadata[cur_buf_idx]; if (frame_data.frame_id == -1) { @@ -193,12 +186,8 @@ void CameraBuf::release() { } } -void CameraBuf::queue(size_t buf_idx){ - { - std::lock_guard lk(frame_queue_mutex); - frame_queue.push(buf_idx); - } - frame_queue_cv.notify_one(); +void CameraBuf::queue(size_t buf_idx) { + safe_queue.push(buf_idx); } // common functions @@ -342,7 +331,10 @@ void *processing_thread(MultiCameraState *cameras, const char *tname, set_thread_name(tname); for (int cnt = 0; !do_exit; cnt++) { - if (!cs->buf.acquire()) continue; + if (!cs->buf.acquire()) { + util::sleep_for(1); + continue; + } callback(cameras, cs, cnt); diff --git a/selfdrive/camerad/cameras/camera_common.h b/selfdrive/camerad/cameras/camera_common.h index 9582ca53953ce9..47c1edde830aec 100644 --- a/selfdrive/camerad/cameras/camera_common.h +++ b/selfdrive/camerad/cameras/camera_common.h @@ -10,6 +10,7 @@ #include #include "common/mat.h" #include "common/swaglog.h" +#include "common/queue.h" #include "visionbuf.h" #include "common/visionimg.h" #include "imgproc/utils.h" @@ -103,9 +104,7 @@ class CameraBuf { int cur_buf_idx; - std::mutex frame_queue_mutex; - std::condition_variable frame_queue_cv; - std::queue frame_queue; + SafeQueue safe_queue; int frame_buf_count; release_cb release_callback; diff --git a/selfdrive/common/SConscript b/selfdrive/common/SConscript index 567706f60a8bfc..ec3603d3af618f 100644 --- a/selfdrive/common/SConscript +++ b/selfdrive/common/SConscript @@ -5,7 +5,7 @@ if SHARED: else: fxn = env.Library -common_libs = ['params.cc', 'swaglog.cc', 'cqueue.c', 'util.cc', 'gpio.cc', 'i2c.cc'] +common_libs = ['params.cc', 'swaglog.cc', 'util.cc', 'gpio.cc', 'i2c.cc'] _common = fxn('common', common_libs, LIBS="json11") diff --git a/selfdrive/common/cqueue.c b/selfdrive/common/cqueue.c deleted file mode 100644 index 582a55fa510721..00000000000000 --- a/selfdrive/common/cqueue.c +++ /dev/null @@ -1,54 +0,0 @@ -#include -#include -#include - -#include "cqueue.h" - -// TODO: replace by C++ queue and CV. See camerad - -void queue_init(Queue *q) { - memset(q, 0, sizeof(*q)); - TAILQ_INIT(&q->q); - pthread_mutex_init(&q->lock, NULL); - pthread_cond_init(&q->cv, NULL); -} - -void* queue_pop(Queue *q) { - pthread_mutex_lock(&q->lock); - while (TAILQ_EMPTY(&q->q)) { - pthread_cond_wait(&q->cv, &q->lock); - } - QueueEntry *entry = TAILQ_FIRST(&q->q); - TAILQ_REMOVE(&q->q, entry, entries); - pthread_mutex_unlock(&q->lock); - - void* r = entry->data; - free(entry); - return r; -} - -void* queue_try_pop(Queue *q) { - pthread_mutex_lock(&q->lock); - - void* r = NULL; - if (!TAILQ_EMPTY(&q->q)) { - QueueEntry *entry = TAILQ_FIRST(&q->q); - TAILQ_REMOVE(&q->q, entry, entries); - r = entry->data; - free(entry); - } - - pthread_mutex_unlock(&q->lock); - return r; -} - -void queue_push(Queue *q, void *data) { - QueueEntry *entry = calloc(1, sizeof(QueueEntry)); - assert(entry); - entry->data = data; - - pthread_mutex_lock(&q->lock); - TAILQ_INSERT_TAIL(&q->q, entry, entries); - pthread_cond_signal(&q->cv); - pthread_mutex_unlock(&q->lock); -} diff --git a/selfdrive/common/cqueue.h b/selfdrive/common/cqueue.h deleted file mode 100644 index f2613660b21405..00000000000000 --- a/selfdrive/common/cqueue.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef COMMON_CQUEUE_H -#define COMMON_CQUEUE_H - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -// a blocking queue - -typedef struct QueueEntry { - TAILQ_ENTRY(QueueEntry) entries; - void *data; -} QueueEntry; - -typedef struct Queue { - TAILQ_HEAD(queue, QueueEntry) q; - pthread_mutex_t lock; - pthread_cond_t cv; -} Queue; - -void queue_init(Queue *q); -void* queue_pop(Queue *q); -void* queue_try_pop(Queue *q); -void queue_push(Queue *q, void *data); - -#ifdef __cplusplus -} // extern "C" -#endif - -#endif diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h new file mode 100644 index 00000000000000..f6775fa2964fab --- /dev/null +++ b/selfdrive/common/queue.h @@ -0,0 +1,38 @@ +#pragma once +#include +#include +#include + +template +class SafeQueue { +public: + SafeQueue() = default; + ~SafeQueue() {} + void push(T v) { + { + std::unique_lock lk(m); + q.push(v); + } + cv.notify_one(); + } + T pop() { + std::unique_lock lk(m); + cv.wait(lk, [this] { return !q.empty(); }); + T v = q.front(); + q.pop(); + return v; + } + bool try_pop(T& v) { + std::unique_lock lk(m); + if (q.empty()) return false; + + v = q.front(); + q.pop(); + return true; + } + +private: + mutable std::mutex m; + std::condition_variable cv; + std::queue q; +}; diff --git a/selfdrive/loggerd/omx_encoder.cc b/selfdrive/loggerd/omx_encoder.cc index 40b16739f9650f..a360a71bf8cf3a 100644 --- a/selfdrive/loggerd/omx_encoder.cc +++ b/selfdrive/loggerd/omx_encoder.cc @@ -75,7 +75,7 @@ OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR ap OMX_BUFFERHEADERTYPE *buffer) { // printf("empty_buffer_done\n"); OmxEncoder *e = (OmxEncoder*)app_data; - queue_push(&e->free_in, (void*)buffer); + e->free_in.push(buffer); return OMX_ErrorNone; } @@ -83,7 +83,7 @@ OMX_ERRORTYPE OmxEncoder::fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app OMX_BUFFERHEADERTYPE *buffer) { // printf("fill_buffer_done\n"); OmxEncoder *e = (OmxEncoder*)app_data; - queue_push(&e->done_out, (void*)buffer); + e->done_out.push(buffer); return OMX_ErrorNone; } @@ -170,9 +170,6 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int this->fps = fps; this->remuxing = !h265; - queue_init(&this->free_in); - queue_init(&this->done_out); - mutex_init_reentrant(&this->lock); pthread_mutex_init(&this->state_lock, NULL); pthread_cond_init(&this->state_cv, NULL); @@ -418,7 +415,7 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u // this sometimes freezes... put it outside the encoder lock so we can still trigger rotates... // THIS IS A REALLY BAD IDEA, but apparently the race has to happen 30 times to trigger this //pthread_mutex_unlock(&this->lock); - OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in); + OMX_BUFFERHEADERTYPE* in_buf = this->free_in.pop(); //pthread_mutex_lock(&this->lock); int ret = this->counter; @@ -465,8 +462,8 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u // pump output while (true) { - OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_try_pop(&this->done_out); - if (!out_buf) { + OMX_BUFFERHEADERTYPE *out_buf; + if (!this->done_out.try_pop(out_buf)) { break; } handle_out_buf(this, out_buf); @@ -547,7 +544,7 @@ void OmxEncoder::encoder_close() { if (this->dirty) { // drain output only if there could be frames in the encoder - OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in); + OMX_BUFFERHEADERTYPE* in_buf = this->free_in.pop(); in_buf->nFilledLen = 0; in_buf->nOffset = 0; in_buf->nFlags = OMX_BUFFERFLAG_EOS; @@ -556,7 +553,7 @@ void OmxEncoder::encoder_close() { OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf)); while (true) { - OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->done_out); + OMX_BUFFERHEADERTYPE *out_buf = this->done_out.pop(); handle_out_buf(this, out_buf); @@ -604,8 +601,9 @@ OmxEncoder::~OmxEncoder() { OMX_CHECK(OMX_FreeHandle(this->handle)); - while (queue_try_pop(&this->free_in)); - while (queue_try_pop(&this->done_out)); + OMX_BUFFERHEADERTYPE *out_buf; + while (this->free_in.try_pop(out_buf)); + while (this->done_out.try_pop(out_buf)); if (this->codec_config) { free(this->codec_config); diff --git a/selfdrive/loggerd/omx_encoder.h b/selfdrive/loggerd/omx_encoder.h index 62912a65454c54..1a45a06be52242 100644 --- a/selfdrive/loggerd/omx_encoder.h +++ b/selfdrive/loggerd/omx_encoder.h @@ -65,8 +65,8 @@ class OmxEncoder : public VideoEncoder { uint64_t last_t; - Queue free_in; - Queue done_out; + SafeQueue free_in; + SafeQueue done_out; AVFormatContext *ofmt_ctx; AVCodecContext *codec_ctx; From 65fb8a42b2e474ab3000bcb98b4ec4f032c374b1 Mon Sep 17 00:00:00 2001 From: deanlee Date: Fri, 15 Jan 2021 20:24:45 +0800 Subject: [PATCH 02/12] cleanup includes --- selfdrive/camerad/cameras/camera_common.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/selfdrive/camerad/cameras/camera_common.h b/selfdrive/camerad/cameras/camera_common.h index 47c1edde830aec..fb7018a2c2e3e4 100644 --- a/selfdrive/camerad/cameras/camera_common.h +++ b/selfdrive/camerad/cameras/camera_common.h @@ -1,7 +1,4 @@ #pragma once -#include -#include -#include #include #include From 2aca526656789c749bad20222a003775a1fc204a Mon Sep 17 00:00:00 2001 From: deanlee Date: Fri, 15 Jan 2021 22:59:48 +0800 Subject: [PATCH 03/12] space --- selfdrive/loggerd/omx_encoder.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selfdrive/loggerd/omx_encoder.h b/selfdrive/loggerd/omx_encoder.h index 1a45a06be52242..c9f43352282d54 100644 --- a/selfdrive/loggerd/omx_encoder.h +++ b/selfdrive/loggerd/omx_encoder.h @@ -65,7 +65,7 @@ class OmxEncoder : public VideoEncoder { uint64_t last_t; - SafeQueue free_in; + SafeQueue free_in; SafeQueue done_out; AVFormatContext *ofmt_ctx; From aa1e5cda5bc8c8874e0a5242e273d6acf096629f Mon Sep 17 00:00:00 2001 From: deanlee Date: Sat, 16 Jan 2021 12:30:55 +0800 Subject: [PATCH 04/12] add timeout use try_poll --- selfdrive/camerad/cameras/camera_common.cc | 7 ++----- selfdrive/common/queue.h | 7 ++++--- selfdrive/loggerd/loggerd.cc | 13 +++++++++---- selfdrive/loggerd/omx_encoder.cc | 12 +++++++++++- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/selfdrive/camerad/cameras/camera_common.cc b/selfdrive/camerad/cameras/camera_common.cc index 93bb06c4e4ffb5..b862ee1e6126b9 100644 --- a/selfdrive/camerad/cameras/camera_common.cc +++ b/selfdrive/camerad/cameras/camera_common.cc @@ -121,7 +121,7 @@ CameraBuf::~CameraBuf() { } bool CameraBuf::acquire() { - if (!safe_queue.try_pop(cur_buf_idx)) return false; + if (!safe_queue.try_pop(cur_buf_idx, 20)) return false; const FrameMetadata &frame_data = camera_bufs_metadata[cur_buf_idx]; if (frame_data.frame_id == -1) { @@ -331,10 +331,7 @@ void *processing_thread(MultiCameraState *cameras, const char *tname, set_thread_name(tname); for (int cnt = 0; !do_exit; cnt++) { - if (!cs->buf.acquire()) { - util::sleep_for(1); - continue; - } + if (!cs->buf.acquire()) continue; callback(cameras, cs, cnt); diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h index f6775fa2964fab..c458f3fc5e57c5 100644 --- a/selfdrive/common/queue.h +++ b/selfdrive/common/queue.h @@ -22,10 +22,11 @@ class SafeQueue { q.pop(); return v; } - bool try_pop(T& v) { + bool try_pop(T& v, int timeout_ms = 0) { std::unique_lock lk(m); - if (q.empty()) return false; - + if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { + return false; + } v = q.front(); q.pop(); return true; diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 862d958047bd04..60a181264b1b33 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -270,12 +270,17 @@ void encoder_thread(int cam_idx) { int out_segment = -1; int out_id = encoders[0]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, - &out_segment, extra.timestamp_eof); + &out_segment, &extra); + if (out_id == -1) { + continue; + } if (encoders.size() > 1) { int out_segment_alt = -1; - encoders[1]->encode_frame(buf->y, buf->u, buf->v, - buf->width, buf->height, - &out_segment_alt, extra.timestamp_eof); + if (-1 == encoders[1]->encode_frame(buf->y, buf->u, buf->v, + buf->width, buf->height, + &out_segment_alt, &extra)) { + continue; + } } // publish encode index diff --git a/selfdrive/loggerd/omx_encoder.cc b/selfdrive/loggerd/omx_encoder.cc index a360a71bf8cf3a..9879bcb6825ce1 100644 --- a/selfdrive/loggerd/omx_encoder.cc +++ b/selfdrive/loggerd/omx_encoder.cc @@ -19,6 +19,7 @@ #include #include "common/mutex.h" +#include "common/util.h" #include "common/swaglog.h" #include "omx_encoder.h" @@ -29,6 +30,8 @@ assert(OMX_ErrorNone == _expr); \ } while (0) +extern ExitHandler do_exit; + // ***** OMX callback functions ***** void OmxEncoder::wait_for_state(OMX_STATETYPE state) { @@ -415,7 +418,14 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u // this sometimes freezes... put it outside the encoder lock so we can still trigger rotates... // THIS IS A REALLY BAD IDEA, but apparently the race has to happen 30 times to trigger this //pthread_mutex_unlock(&this->lock); - OMX_BUFFERHEADERTYPE* in_buf = this->free_in.pop(); + OMX_BUFFERHEADERTYPE* in_buf = nullptr; + while (!this->free_in.try_pop(in_buf, 20)) { + if (do_exit) { + pthread_mutex_unlock(&this->lock); + return -1; + } + } + //pthread_mutex_lock(&this->lock); int ret = this->counter; From d2dd618ed8a48f2d87546b958c73715f06710263 Mon Sep 17 00:00:00 2001 From: deanlee Date: Sat, 16 Jan 2021 20:46:06 +0800 Subject: [PATCH 05/12] add function empty() & size() --- selfdrive/common/queue.h | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h index c458f3fc5e57c5..39c29ec6dd64e3 100644 --- a/selfdrive/common/queue.h +++ b/selfdrive/common/queue.h @@ -1,20 +1,21 @@ #pragma once -#include -#include #include +#include +#include template class SafeQueue { public: SafeQueue() = default; - ~SafeQueue() {} - void push(T v) { + + void push(const T& v) { { std::unique_lock lk(m); q.push(v); } cv.notify_one(); } + T pop() { std::unique_lock lk(m); cv.wait(lk, [this] { return !q.empty(); }); @@ -22,6 +23,7 @@ class SafeQueue { q.pop(); return v; } + bool try_pop(T& v, int timeout_ms = 0) { std::unique_lock lk(m); if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { @@ -32,6 +34,16 @@ class SafeQueue { return true; } + bool empty() const { + std::scoped_lock lk(m); + return q.empty(); + } + + size_t size() const { + std::scoped_lock lk(m); + return q.size(); + } + private: mutable std::mutex m; std::condition_variable cv; From ac4be66bb2319eb8d3f106a6235ecd794bd29f3c Mon Sep 17 00:00:00 2001 From: deanlee Date: Fri, 15 Jan 2021 20:08:11 +0800 Subject: [PATCH 06/12] class SafeQueue --- selfdrive/camerad/cameras/camera_common.cc | 7 +++++-- selfdrive/common/queue.h | 17 +++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/selfdrive/camerad/cameras/camera_common.cc b/selfdrive/camerad/cameras/camera_common.cc index b862ee1e6126b9..80b3d6aa6d96cc 100644 --- a/selfdrive/camerad/cameras/camera_common.cc +++ b/selfdrive/camerad/cameras/camera_common.cc @@ -121,7 +121,7 @@ CameraBuf::~CameraBuf() { } bool CameraBuf::acquire() { - if (!safe_queue.try_pop(cur_buf_idx, 20)) return false; + if (!safe_queue.try_pop(cur_buf_idx, 1)) return false; const FrameMetadata &frame_data = camera_bufs_metadata[cur_buf_idx]; if (frame_data.frame_id == -1) { @@ -331,7 +331,10 @@ void *processing_thread(MultiCameraState *cameras, const char *tname, set_thread_name(tname); for (int cnt = 0; !do_exit; cnt++) { - if (!cs->buf.acquire()) continue; + if (!cs->buf.acquire()) { + util::sleep_for(1); + continue; + } callback(cameras, cs, cnt); diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h index 39c29ec6dd64e3..c5b353ccbc3043 100644 --- a/selfdrive/common/queue.h +++ b/selfdrive/common/queue.h @@ -1,21 +1,20 @@ #pragma once -#include -#include #include +#include +#include template class SafeQueue { public: SafeQueue() = default; - - void push(const T& v) { + ~SafeQueue() {} + void push(T v) { { std::unique_lock lk(m); q.push(v); } cv.notify_one(); } - T pop() { std::unique_lock lk(m); cv.wait(lk, [this] { return !q.empty(); }); @@ -23,12 +22,10 @@ class SafeQueue { q.pop(); return v; } - - bool try_pop(T& v, int timeout_ms = 0) { + bool try_pop(T& v) { std::unique_lock lk(m); - if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { - return false; - } + if (q.empty()) return false; + v = q.front(); q.pop(); return true; From bfdc81a6195fc4b9643a64649bba303fbca28827 Mon Sep 17 00:00:00 2001 From: deanlee Date: Sat, 16 Jan 2021 12:30:55 +0800 Subject: [PATCH 07/12] add timeout use try_poll --- selfdrive/camerad/cameras/camera_common.cc | 5 +---- selfdrive/common/queue.h | 7 ++++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/selfdrive/camerad/cameras/camera_common.cc b/selfdrive/camerad/cameras/camera_common.cc index 80b3d6aa6d96cc..b3b44aadf547f0 100644 --- a/selfdrive/camerad/cameras/camera_common.cc +++ b/selfdrive/camerad/cameras/camera_common.cc @@ -331,10 +331,7 @@ void *processing_thread(MultiCameraState *cameras, const char *tname, set_thread_name(tname); for (int cnt = 0; !do_exit; cnt++) { - if (!cs->buf.acquire()) { - util::sleep_for(1); - continue; - } + if (!cs->buf.acquire()) continue; callback(cameras, cs, cnt); diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h index c5b353ccbc3043..f6d9ac0aa198eb 100644 --- a/selfdrive/common/queue.h +++ b/selfdrive/common/queue.h @@ -22,10 +22,11 @@ class SafeQueue { q.pop(); return v; } - bool try_pop(T& v) { + bool try_pop(T& v, int timeout_ms = 0) { std::unique_lock lk(m); - if (q.empty()) return false; - + if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { + return false; + } v = q.front(); q.pop(); return true; From d9b998adca6e1f33b44576ed12368db06ac31294 Mon Sep 17 00:00:00 2001 From: deanlee Date: Sat, 16 Jan 2021 20:46:06 +0800 Subject: [PATCH 08/12] add function empty() & size() --- selfdrive/common/queue.h | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h index f6d9ac0aa198eb..39c29ec6dd64e3 100644 --- a/selfdrive/common/queue.h +++ b/selfdrive/common/queue.h @@ -1,20 +1,21 @@ #pragma once -#include -#include #include +#include +#include template class SafeQueue { public: SafeQueue() = default; - ~SafeQueue() {} - void push(T v) { + + void push(const T& v) { { std::unique_lock lk(m); q.push(v); } cv.notify_one(); } + T pop() { std::unique_lock lk(m); cv.wait(lk, [this] { return !q.empty(); }); @@ -22,6 +23,7 @@ class SafeQueue { q.pop(); return v; } + bool try_pop(T& v, int timeout_ms = 0) { std::unique_lock lk(m); if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { From f2008245010fc5186f76cb41ea66f87afadba9f6 Mon Sep 17 00:00:00 2001 From: deanlee Date: Mon, 18 Jan 2021 17:32:39 +0800 Subject: [PATCH 09/12] rebase master --- selfdrive/common/queue.h | 1 + selfdrive/loggerd/omx_encoder.cc | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/selfdrive/common/queue.h b/selfdrive/common/queue.h index 39c29ec6dd64e3..b3558b11a47f49 100644 --- a/selfdrive/common/queue.h +++ b/selfdrive/common/queue.h @@ -1,4 +1,5 @@ #pragma once + #include #include #include diff --git a/selfdrive/loggerd/omx_encoder.cc b/selfdrive/loggerd/omx_encoder.cc index 9879bcb6825ce1..b96d340f014477 100644 --- a/selfdrive/loggerd/omx_encoder.cc +++ b/selfdrive/loggerd/omx_encoder.cc @@ -330,7 +330,7 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int // fill the input free queue for (auto &buf : this->in_buf_headers) { - queue_push(&this->free_in, (void*)buf); + this->free_in.push(buf); } } From 0560d31acd046b4efd38e782c2d83d00647ff793 Mon Sep 17 00:00:00 2001 From: deanlee Date: Sun, 24 Jan 2021 13:34:53 +0800 Subject: [PATCH 10/12] rebase master --- selfdrive/loggerd/loggerd.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 60a181264b1b33..65fe618e912935 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -270,7 +270,7 @@ void encoder_thread(int cam_idx) { int out_segment = -1; int out_id = encoders[0]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, - &out_segment, &extra); + &out_segment, extra.timestamp_eof); if (out_id == -1) { continue; } @@ -278,7 +278,7 @@ void encoder_thread(int cam_idx) { int out_segment_alt = -1; if (-1 == encoders[1]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, - &out_segment_alt, &extra)) { + &out_segment_alt, extra.timestamp_eof)) { continue; } } From 22c806072c903867c6e9644bffab037e721fc3dc Mon Sep 17 00:00:00 2001 From: deanlee Date: Wed, 27 Jan 2021 21:22:21 +0800 Subject: [PATCH 11/12] for loop --- selfdrive/loggerd/loggerd.cc | 57 +++++++++++++++--------------------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 65fe618e912935..7453f3cc22cfc8 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -266,44 +266,35 @@ void encoder_thread(int cam_idx) { rotate_state.setStreamFrameId(extra.frame_id); // encode a frame - { + for (int i = 0; i < encoders.size(); ++i) { int out_segment = -1; - int out_id = encoders[0]->encode_frame(buf->y, buf->u, buf->v, + int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, &out_segment, extra.timestamp_eof); - if (out_id == -1) { - continue; - } - if (encoders.size() > 1) { - int out_segment_alt = -1; - if (-1 == encoders[1]->encode_frame(buf->y, buf->u, buf->v, - buf->width, buf->height, - &out_segment_alt, extra.timestamp_eof)) { - continue; + if (!out_id) break; + + if (i == 0) { + // publish encode index + MessageBuilder msg; + // this is really ugly + auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() : + (cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx()); + eidx.setFrameId(extra.frame_id); + eidx.setTimestampSof(extra.timestamp_sof); + eidx.setTimestampEof(extra.timestamp_eof); + #ifdef QCOM2 + eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); + #else + eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); + #endif + eidx.setEncodeId(cnt); + eidx.setSegmentNum(out_segment); + eidx.setSegmentId(out_id); + if (lh) { + auto bytes = msg.toBytes(); + lh_log(lh, bytes.begin(), bytes.size(), false); } } - - // publish encode index - MessageBuilder msg; - // this is really ugly - auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() : - (cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx()); - eidx.setFrameId(extra.frame_id); - eidx.setTimestampSof(extra.timestamp_sof); - eidx.setTimestampEof(extra.timestamp_eof); - #ifdef QCOM2 - eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C); - #else - eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C); - #endif - eidx.setEncodeId(cnt); - eidx.setSegmentNum(out_segment); - eidx.setSegmentId(out_id); - - if (lh) { - auto bytes = msg.toBytes(); - lh_log(lh, bytes.begin(), bytes.size(), false); - } } cnt++; From fe4171386407a21885f14ecc62c57008e427fa9a Mon Sep 17 00:00:00 2001 From: deanlee Date: Thu, 28 Jan 2021 05:10:23 +0800 Subject: [PATCH 12/12] fix bug --- selfdrive/loggerd/loggerd.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 7453f3cc22cfc8..6b7520de560c83 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -271,9 +271,7 @@ void encoder_thread(int cam_idx) { int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v, buf->width, buf->height, &out_segment, extra.timestamp_eof); - if (!out_id) break; - - if (i == 0) { + if (i == 0 && out_id != -1) { // publish encode index MessageBuilder msg; // this is really ugly