Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove cqueue, use class SafeQueue #19774

Merged
merged 12 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion release/files_common
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ selfdrive/common/swaglog.h
selfdrive/common/swaglog.cc
selfdrive/common/util.cc
selfdrive/common/util.h
selfdrive/common/cqueue.[c,h]
selfdrive/common/queue.h
selfdrive/common/clutil.cc
selfdrive/common/clutil.h
selfdrive/common/params.h
Expand Down
17 changes: 3 additions & 14 deletions selfdrive/camerad/cameras/camera_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,7 @@ CameraBuf::~CameraBuf() {
}

bool CameraBuf::acquire() {
{
std::unique_lock<std::mutex> lk(frame_queue_mutex);
bool got_frame = frame_queue_cv.wait_for(lk, std::chrono::milliseconds(1), [this]{ return !frame_queue.empty(); });
if (!got_frame) return false;

cur_buf_idx = frame_queue.front();
frame_queue.pop();
}
if (!safe_queue.try_pop(cur_buf_idx, 1)) return false;

const FrameMetadata &frame_data = camera_bufs_metadata[cur_buf_idx];
if (frame_data.frame_id == -1) {
Expand Down Expand Up @@ -193,12 +186,8 @@ void CameraBuf::release() {
}
}

void CameraBuf::queue(size_t buf_idx){
{
std::lock_guard<std::mutex> lk(frame_queue_mutex);
frame_queue.push(buf_idx);
}
frame_queue_cv.notify_one();
void CameraBuf::queue(size_t buf_idx) {
safe_queue.push(buf_idx);
}

// common functions
Expand Down
8 changes: 2 additions & 6 deletions selfdrive/camerad/cameras/camera_common.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>

#include <stdlib.h>
#include <stdbool.h>
Expand All @@ -10,6 +7,7 @@
#include <thread>
#include "common/mat.h"
#include "common/swaglog.h"
#include "common/queue.h"
#include "visionbuf.h"
#include "common/visionimg.h"
#include "imgproc/utils.h"
Expand Down Expand Up @@ -103,9 +101,7 @@ class CameraBuf {

int cur_buf_idx;

std::mutex frame_queue_mutex;
std::condition_variable frame_queue_cv;
std::queue<size_t> frame_queue;
SafeQueue<int> safe_queue;

int frame_buf_count;
release_cb release_callback;
Expand Down
2 changes: 1 addition & 1 deletion selfdrive/common/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ if SHARED:
else:
fxn = env.Library

common_libs = ['params.cc', 'swaglog.cc', 'cqueue.c', 'util.cc', 'gpio.cc', 'i2c.cc']
common_libs = ['params.cc', 'swaglog.cc', 'util.cc', 'gpio.cc', 'i2c.cc']

_common = fxn('common', common_libs, LIBS="json11")

Expand Down
54 changes: 0 additions & 54 deletions selfdrive/common/cqueue.c

This file was deleted.

33 changes: 0 additions & 33 deletions selfdrive/common/cqueue.h

This file was deleted.

52 changes: 52 additions & 0 deletions selfdrive/common/queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <condition_variable>
#include <mutex>
#include <queue>

template <class T>
class SafeQueue {
public:
SafeQueue() = default;

void push(const T& v) {
{
std::unique_lock lk(m);
q.push(v);
}
cv.notify_one();
}

T pop() {
std::unique_lock lk(m);
cv.wait(lk, [this] { return !q.empty(); });
T v = q.front();
q.pop();
return v;
}

bool try_pop(T& v, int timeout_ms = 0) {
std::unique_lock lk(m);
if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) {
return false;
}
v = q.front();
q.pop();
return true;
}

bool empty() const {
std::scoped_lock lk(m);
return q.empty();
}

size_t size() const {
std::scoped_lock lk(m);
return q.size();
}

private:
mutable std::mutex m;
std::condition_variable cv;
std::queue<T> q;
};
52 changes: 23 additions & 29 deletions selfdrive/loggerd/loggerd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,38 +266,32 @@ void encoder_thread(int cam_idx) {
rotate_state.setStreamFrameId(extra.frame_id);

// encode a frame
{
for (int i = 0; i < encoders.size(); ++i) {
int out_segment = -1;
int out_id = encoders[0]->encode_frame(buf->y, buf->u, buf->v,
int out_id = encoders[i]->encode_frame(buf->y, buf->u, buf->v,
buf->width, buf->height,
&out_segment, extra.timestamp_eof);
if (encoders.size() > 1) {
int out_segment_alt = -1;
encoders[1]->encode_frame(buf->y, buf->u, buf->v,
buf->width, buf->height,
&out_segment_alt, extra.timestamp_eof);
}

// publish encode index
MessageBuilder msg;
// this is really ugly
auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() :
(cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx());
eidx.setFrameId(extra.frame_id);
eidx.setTimestampSof(extra.timestamp_sof);
eidx.setTimestampEof(extra.timestamp_eof);
#ifdef QCOM2
eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C);
#else
eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C);
#endif
eidx.setEncodeId(cnt);
eidx.setSegmentNum(out_segment);
eidx.setSegmentId(out_id);

if (lh) {
auto bytes = msg.toBytes();
lh_log(lh, bytes.begin(), bytes.size(), false);
if (i == 0 && out_id != -1) {
// publish encode index
MessageBuilder msg;
// this is really ugly
auto eidx = cam_idx == LOG_CAMERA_ID_DCAMERA ? msg.initEvent().initFrontEncodeIdx() :
(cam_idx == LOG_CAMERA_ID_ECAMERA ? msg.initEvent().initWideEncodeIdx() : msg.initEvent().initEncodeIdx());
eidx.setFrameId(extra.frame_id);
eidx.setTimestampSof(extra.timestamp_sof);
eidx.setTimestampEof(extra.timestamp_eof);
#ifdef QCOM2
eidx.setType(cereal::EncodeIndex::Type::FULL_H_E_V_C);
#else
eidx.setType(cam_idx == LOG_CAMERA_ID_DCAMERA ? cereal::EncodeIndex::Type::FRONT : cereal::EncodeIndex::Type::FULL_H_E_V_C);
#endif
eidx.setEncodeId(cnt);
eidx.setSegmentNum(out_segment);
eidx.setSegmentId(out_id);
if (lh) {
auto bytes = msg.toBytes();
lh_log(lh, bytes.begin(), bytes.size(), false);
}
}
}

Expand Down
34 changes: 21 additions & 13 deletions selfdrive/loggerd/omx_encoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <msm_media_info.h>

#include "common/mutex.h"
#include "common/util.h"
#include "common/swaglog.h"

#include "omx_encoder.h"
Expand All @@ -29,6 +30,8 @@
assert(OMX_ErrorNone == _expr); \
} while (0)

extern ExitHandler do_exit;

// ***** OMX callback functions *****

void OmxEncoder::wait_for_state(OMX_STATETYPE state) {
Expand Down Expand Up @@ -75,15 +78,15 @@ OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR ap
OMX_BUFFERHEADERTYPE *buffer) {
// printf("empty_buffer_done\n");
OmxEncoder *e = (OmxEncoder*)app_data;
queue_push(&e->free_in, (void*)buffer);
e->free_in.push(buffer);
return OMX_ErrorNone;
}

OMX_ERRORTYPE OmxEncoder::fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data,
OMX_BUFFERHEADERTYPE *buffer) {
// printf("fill_buffer_done\n");
OmxEncoder *e = (OmxEncoder*)app_data;
queue_push(&e->done_out, (void*)buffer);
e->done_out.push(buffer);
return OMX_ErrorNone;
}

Expand Down Expand Up @@ -170,9 +173,6 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int
this->fps = fps;
this->remuxing = !h265;

queue_init(&this->free_in);
queue_init(&this->done_out);

mutex_init_reentrant(&this->lock);
pthread_mutex_init(&this->state_lock, NULL);
pthread_cond_init(&this->state_cv, NULL);
Expand Down Expand Up @@ -330,7 +330,7 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int

// fill the input free queue
for (auto &buf : this->in_buf_headers) {
queue_push(&this->free_in, (void*)buf);
this->free_in.push(buf);
}
}

Expand Down Expand Up @@ -418,7 +418,14 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u
// this sometimes freezes... put it outside the encoder lock so we can still trigger rotates...
// THIS IS A REALLY BAD IDEA, but apparently the race has to happen 30 times to trigger this
//pthread_mutex_unlock(&this->lock);
OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in);
OMX_BUFFERHEADERTYPE* in_buf = nullptr;
while (!this->free_in.try_pop(in_buf, 20)) {
if (do_exit) {
pthread_mutex_unlock(&this->lock);
return -1;
}
}

//pthread_mutex_lock(&this->lock);

int ret = this->counter;
Expand Down Expand Up @@ -465,8 +472,8 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u

// pump output
while (true) {
OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_try_pop(&this->done_out);
if (!out_buf) {
OMX_BUFFERHEADERTYPE *out_buf;
if (!this->done_out.try_pop(out_buf)) {
break;
}
handle_out_buf(this, out_buf);
Expand Down Expand Up @@ -547,7 +554,7 @@ void OmxEncoder::encoder_close() {
if (this->dirty) {
// drain output only if there could be frames in the encoder

OMX_BUFFERHEADERTYPE* in_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->free_in);
OMX_BUFFERHEADERTYPE* in_buf = this->free_in.pop();
in_buf->nFilledLen = 0;
in_buf->nOffset = 0;
in_buf->nFlags = OMX_BUFFERFLAG_EOS;
Expand All @@ -556,7 +563,7 @@ void OmxEncoder::encoder_close() {
OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf));

while (true) {
OMX_BUFFERHEADERTYPE *out_buf = (OMX_BUFFERHEADERTYPE *)queue_pop(&this->done_out);
OMX_BUFFERHEADERTYPE *out_buf = this->done_out.pop();

handle_out_buf(this, out_buf);

Expand Down Expand Up @@ -604,8 +611,9 @@ OmxEncoder::~OmxEncoder() {

OMX_CHECK(OMX_FreeHandle(this->handle));

while (queue_try_pop(&this->free_in));
while (queue_try_pop(&this->done_out));
OMX_BUFFERHEADERTYPE *out_buf;
while (this->free_in.try_pop(out_buf));
while (this->done_out.try_pop(out_buf));

if (this->codec_config) {
free(this->codec_config);
Expand Down
Loading