From abb47656db10e9e08c3569d05d6c9d19a7faae7e Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Wed, 9 May 2018 22:28:47 -0400 Subject: [PATCH 1/9] core: mavlink command sync fixes These are a bunch of fixes to prevent promises getting set more than once which triggers an abort. This needs further cleanup, the changes are rather ugly. --- core/mavlink_commands.cpp | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/core/mavlink_commands.cpp b/core/mavlink_commands.cpp index a6be83d5d1..db318039a1 100644 --- a/core/mavlink_commands.cpp +++ b/core/mavlink_commands.cpp @@ -43,19 +43,18 @@ MAVLinkCommands::send_command(const MAVLinkCommands::CommandInt &command) std::shared_ptr> prom = std::make_shared>(); + std::future res = prom->get_future(); + queue_command_async(command, - [prom](Result result, float progress) { + [&prom](Result result, float progress) { PromiseResult promise_result {}; promise_result.result = result; promise_result.progress = progress; prom->set_value(promise_result); }); - std::future res = prom->get_future(); while (true) { // Block now to wait for result. - res.wait(); - PromiseResult promise_result = res.get(); if (promise_result.result == Result::IN_PROGRESS) { @@ -78,19 +77,18 @@ MAVLinkCommands::send_command(const MAVLinkCommands::CommandLong &command) std::shared_ptr> prom = std::make_shared>(); + std::future res = prom->get_future(); + queue_command_async(command, - [prom](Result result, float progress) { + [&prom](Result result, float progress) { PromiseResult promise_result {}; promise_result.result = result; promise_result.progress = progress; prom->set_value(promise_result); }); - std::future res = prom->get_future(); while (true) { // Block now to wait for result. - res.wait(); - PromiseResult promise_result = res.get(); if (promise_result.result == Result::IN_PROGRESS) { @@ -186,6 +184,7 @@ void MAVLinkCommands::receive_command_ack(mavlink_message_t message) if (work.callback) { work.callback(Result::SUCCESS, 1.0f); } + _work_queue.pop_front(); break; case MAV_RESULT_DENIED: @@ -194,6 +193,7 @@ void MAVLinkCommands::receive_command_ack(mavlink_message_t message) if (work.callback) { work.callback(Result::COMMAND_DENIED, NAN); } + _work_queue.pop_front(); break; case MAV_RESULT_UNSUPPORTED: @@ -202,6 +202,7 @@ void MAVLinkCommands::receive_command_ack(mavlink_message_t message) if (work.callback) { work.callback(Result::COMMAND_DENIED, NAN); } + _work_queue.pop_front(); break; case MAV_RESULT_TEMPORARILY_REJECTED: @@ -210,6 +211,7 @@ void MAVLinkCommands::receive_command_ack(mavlink_message_t message) if (work.callback) { work.callback(Result::COMMAND_DENIED, NAN); } + _work_queue.pop_front(); break; case MAV_RESULT_FAILED: @@ -218,6 +220,7 @@ void MAVLinkCommands::receive_command_ack(mavlink_message_t message) if (work.callback) { work.callback(Result::COMMAND_DENIED, NAN); } + _work_queue.pop_front(); break; case MAV_RESULT_IN_PROGRESS: @@ -285,6 +288,7 @@ void MAVLinkCommands::receive_timeout() } } _state = State::FAILED; + _work_queue.pop_front(); } } } @@ -305,7 +309,6 @@ void MAVLinkCommands::do_work() // FALLTHROUGH case State::FAILED: _parent.unregister_timeout_handler(_timeout_cookie); - _work_queue.pop_front(); _state = State::NONE; break; } @@ -344,6 +347,8 @@ void MAVLinkCommands::do_work() case State::DONE: // FALLTHROUGH case State::FAILED: + _work_queue.pop_front(); + _state = State::NONE; break; } } From 4e3dbfa009da38e222591da0d0d5a63e7e3d1aa3 Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 11:34:13 -0400 Subject: [PATCH 2/9] core: add test for locked queue, add borrowing This adds tests for the locked queue and also implements the mechanism to borrow and return an item from the queue. This allows to work on front without having to pop it. --- core/CMakeLists.txt | 1 + core/locked_queue.h | 40 ++++++++++----- core/locked_queue_test.cpp | 102 +++++++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 14 deletions(-) create mode 100644 core/locked_queue_test.cpp diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 44cbbbe437..f0321f4595 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -93,5 +93,6 @@ list(APPEND UNIT_TEST_SOURCES ${CMAKE_SOURCE_DIR}/core/curl_test.cpp ${CMAKE_SOURCE_DIR}/core/any_test.cpp ${CMAKE_SOURCE_DIR}/core/cli_arg_test.cpp + ${CMAKE_SOURCE_DIR}/core/locked_queue_test.cpp ) set(UNIT_TEST_SOURCES ${UNIT_TEST_SOURCES} PARENT_SCOPE) diff --git a/core/locked_queue.h b/core/locked_queue.h index d00cfa6f3c..3be4724ab6 100644 --- a/core/locked_queue.h +++ b/core/locked_queue.h @@ -1,8 +1,8 @@ #pragma once - #include #include +#include namespace dronecore { @@ -10,43 +10,55 @@ template class LockedQueue { public: - LockedQueue() : - _queue(), - _mutex() - {}; - + LockedQueue() {}; + ~LockedQueue() {}; void push_back(T item) { std::lock_guard lock(_mutex); - _queue.push_back(item); } - T &front() + // This allows to get access to the front and keep others + // from using it. This blocks if the front is already borrowed. + std::shared_ptr borrow_front() { - std::lock_guard lock(_mutex); + _mutex.lock(); + if (_queue.size() == 0) { + // We couldn't borrow anything, therefore don't keep the lock. + _mutex.unlock(); + return nullptr; + } + return std::make_shared(_queue.front()); + } - return _queue.front(); + // This allows to return a borrowed queue. + void return_front() + { + _mutex.unlock(); } void pop_front() { - std::lock_guard lock(_mutex); + // In case it's not returned, do that now. + return_front(); + std::lock_guard lock(_mutex); + if (_queue.size() == 0) { + return; + } _queue.pop_front(); } size_t size() { std::lock_guard lock(_mutex); - return _queue.size(); } private: - std::deque _queue; - std::mutex _mutex; + std::deque _queue {}; + std::mutex _mutex {}; }; } // namespace dronecore diff --git a/core/locked_queue_test.cpp b/core/locked_queue_test.cpp new file mode 100644 index 0000000000..48d56c0988 --- /dev/null +++ b/core/locked_queue_test.cpp @@ -0,0 +1,102 @@ + +#include "locked_queue.h" + +#include +#include +#include + +using namespace dronecore; + +TEST(LockedQueue, FillAndEmpty) +{ + int one = 1; + int two = 2; + int three = 3; + + LockedQueue locked_queue {}; + + locked_queue.push_back(one); + EXPECT_EQ(locked_queue.size(), 1); + locked_queue.push_back(two); + locked_queue.push_back(three); + EXPECT_EQ(locked_queue.size(), 3); + + locked_queue.pop_front(); + EXPECT_EQ(locked_queue.size(), 2); + locked_queue.pop_front(); + locked_queue.pop_front(); + EXPECT_EQ(locked_queue.size(), 0); + + // Popping an empty queue should just fail silently. + locked_queue.pop_front(); + EXPECT_EQ(locked_queue.size(), 0); +} + +TEST(LockedQueue, BorrowAndReturn) +{ + int one = 1; + int two = 2; + int three = 3; + + LockedQueue locked_queue {}; + + locked_queue.push_back(one); + locked_queue.push_back(two); + locked_queue.push_back(three); + + auto borrowed_item = locked_queue.borrow_front(); + EXPECT_EQ(*borrowed_item, 1); + locked_queue.return_front(); + locked_queue.pop_front(); + + borrowed_item = locked_queue.borrow_front(); + EXPECT_EQ(*borrowed_item, 2); + locked_queue.return_front(); + // Double returning shouldn't matter. + locked_queue.return_front(); + locked_queue.pop_front(); + + borrowed_item = locked_queue.borrow_front(); + EXPECT_EQ(*borrowed_item, 3); + // Popping without returning should automatically return it. + locked_queue.pop_front(); + EXPECT_EQ(locked_queue.size(), 0); + + borrowed_item = locked_queue.borrow_front(); + EXPECT_EQ(borrowed_item, nullptr); +} + +TEST(LockedQueue, ConcurrantAccess) +{ + int one = 1; + int two = 2; + + LockedQueue locked_queue {}; + + locked_queue.push_back(one); + locked_queue.push_back(two); + + auto borrowed_item = locked_queue.borrow_front(); + EXPECT_EQ(*borrowed_item, 1); + + auto prom = std::make_shared>(); + auto fut = prom->get_future(); + + auto some_future = std::async(std::launch::async | std::launch::deferred, + [&prom, &locked_queue]() { + // This will wait in the lock until the first item is returned. + auto second_borrowed_item = locked_queue.borrow_front(); + locked_queue.return_front(); + prom->set_value(); + }); + + // The promise should not be fulfilled yet because we have not + // returned the borrowed item. + auto status = fut.wait_for(std::chrono::milliseconds(10)); + EXPECT_EQ(status, std::future_status::timeout); + + locked_queue.return_front(); + + status = fut.wait_for(std::chrono::milliseconds(10)); + EXPECT_EQ(status, std::future_status::ready); +} From 518353160048c565f73b941cff403ea2354e699d Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 11:35:31 -0400 Subject: [PATCH 3/9] mavlink_commands: adapt to new locked queue This takes advantage of the new locked queue where we can borrow and return the front. Also, it removes the FAILED and DONE states which were not really having any purpose. This should properly fix the race conditions that we were seeing. --- core/mavlink_commands.cpp | 253 ++++++++++++++++---------------------- core/mavlink_commands.h | 12 +- 2 files changed, 113 insertions(+), 152 deletions(-) diff --git a/core/mavlink_commands.cpp b/core/mavlink_commands.cpp index db318039a1..04517d2254 100644 --- a/core/mavlink_commands.cpp +++ b/core/mavlink_commands.cpp @@ -34,69 +34,44 @@ MAVLinkCommands::~MAVLinkCommands() MAVLinkCommands::Result MAVLinkCommands::send_command(const MAVLinkCommands::CommandInt &command) { - struct PromiseResult { - Result result; - float progress; - }; - // We wrap the async call with a promise and future. - std::shared_ptr> prom = - std::make_shared>(); - - std::future res = prom->get_future(); + auto prom = std::make_shared>(); + auto res = prom->get_future(); queue_command_async(command, [&prom](Result result, float progress) { - PromiseResult promise_result {}; - promise_result.result = result; - promise_result.progress = progress; - prom->set_value(promise_result); + UNUSED(progress); + // We can only fulfill the promise once in C++11. + // Therefore we have to ignore the IN_PROGRESS state and wait + // for the final result. + if (result != Result::IN_PROGRESS) { + prom->set_value(result); + } }); - while (true) { - // Block now to wait for result. - PromiseResult promise_result = res.get(); - - if (promise_result.result == Result::IN_PROGRESS) { - LogInfo() << "In progress: " << promise_result.progress; - continue; - } - return promise_result.result; - } + // Block now to wait for result. + return res.get(); } MAVLinkCommands::Result MAVLinkCommands::send_command(const MAVLinkCommands::CommandLong &command) { - struct PromiseResult { - Result result; - float progress; - }; - // We wrap the async call with a promise and future. - std::shared_ptr> prom = - std::make_shared>(); - - std::future res = prom->get_future(); + auto prom = std::make_shared>(); + auto res = prom->get_future(); queue_command_async(command, [&prom](Result result, float progress) { - PromiseResult promise_result {}; - promise_result.result = result; - promise_result.progress = progress; - prom->set_value(promise_result); + UNUSED(progress); + // We can only fulfill the promise once in C++11. + // Therefore we have to ignore the IN_PROGRESS state and wait + // for the final result. + if (result != Result::IN_PROGRESS) { + prom->set_value(result); + } }); - while (true) { - // Block now to wait for result. - PromiseResult promise_result = res.get(); - - if (promise_result.result == Result::IN_PROGRESS) { - LogInfo() << "In progress: " << promise_result.progress; - continue; - } - return promise_result.result; - } + return res.get(); } void @@ -159,79 +134,83 @@ MAVLinkCommands::queue_command_async(const CommandLong &command, void MAVLinkCommands::receive_command_ack(mavlink_message_t message) { - // If nothing is in the queue, we ignore the message all together. - if (_work_queue.size() == 0) { - return; - } - - Work &work = _work_queue.front(); - mavlink_command_ack_t command_ack; mavlink_msg_command_ack_decode(&message, &command_ack); - // LogDebug() << "We got an ack: " << command_ack.command; + LogDebug() << "We got an ack: " << command_ack.command; + + auto work = _work_queue.borrow_front(); + if (!work) { + return; + } - if (work.mavlink_command != command_ack.command) { + if (work->mavlink_command != command_ack.command) { // If the command does not match with our current command, ignore it. - LogWarn() << "Command ack not matching our current command: " << work.mavlink_command; + LogWarn() << "Command ack not matching our current command: " << work->mavlink_command; + _work_queue.return_front(); return; } std::lock_guard lock(_state_mutex); switch (command_ack.result) { case MAV_RESULT_ACCEPTED: - _state = State::DONE; - if (work.callback) { - work.callback(Result::SUCCESS, 1.0f); + _state = State::NONE; + if (work->callback) { + work->callback(Result::SUCCESS, 1.0f); } _work_queue.pop_front(); + _parent.unregister_timeout_handler(_timeout_cookie); break; case MAV_RESULT_DENIED: - LogWarn() << "command denied (" << work.mavlink_command << ")."; - _state = State::FAILED; - if (work.callback) { - work.callback(Result::COMMAND_DENIED, NAN); + LogWarn() << "command denied (" << work->mavlink_command << ")."; + _state = State::NONE; + if (work->callback) { + work->callback(Result::COMMAND_DENIED, NAN); } _work_queue.pop_front(); + _parent.unregister_timeout_handler(_timeout_cookie); break; case MAV_RESULT_UNSUPPORTED: - LogWarn() << "command unsupported (" << work.mavlink_command << ")."; - _state = State::FAILED; - if (work.callback) { - work.callback(Result::COMMAND_DENIED, NAN); + LogWarn() << "command unsupported (" << work->mavlink_command << ")."; + _state = State::NONE; + if (work->callback) { + work->callback(Result::COMMAND_DENIED, NAN); } _work_queue.pop_front(); + _parent.unregister_timeout_handler(_timeout_cookie); break; case MAV_RESULT_TEMPORARILY_REJECTED: - LogWarn() << "command temporarily rejected (" << work.mavlink_command << ")."; - _state = State::FAILED; - if (work.callback) { - work.callback(Result::COMMAND_DENIED, NAN); + LogWarn() << "command temporarily rejected (" << work->mavlink_command << ")."; + _state = State::NONE; + if (work->callback) { + work->callback(Result::COMMAND_DENIED, NAN); } _work_queue.pop_front(); + _parent.unregister_timeout_handler(_timeout_cookie); break; case MAV_RESULT_FAILED: - LogWarn() << "command failed (" << work.mavlink_command << ")."; - _state = State::FAILED; - if (work.callback) { - work.callback(Result::COMMAND_DENIED, NAN); + LogWarn() << "command failed (" << work->mavlink_command << ")."; + _state = State::NONE; + if (work->callback) { + work->callback(Result::COMMAND_DENIED, NAN); } _work_queue.pop_front(); + _parent.unregister_timeout_handler(_timeout_cookie); break; case MAV_RESULT_IN_PROGRESS: if (static_cast(command_ack.progress) != 255) { LogInfo() << "progress: " << static_cast(command_ack.progress) - << " % (" << work.mavlink_command << ")."; + << " % (" << work->mavlink_command << ")."; } // FIXME: We can only call callbacks with promises once, so let's not do it // on IN_PROGRESS. - //if (work.callback) { - // work.callback(Result::IN_PROGRESS, command_ack.progress / 100.0f); + //if (work->callback) { + // work->callback(Result::IN_PROGRESS, command_ack.progress / 100.0f); //} _state = State::IN_PROGRESS; // If we get a progress update, we can raise the timeout @@ -242,113 +221,97 @@ void MAVLinkCommands::receive_command_ack(mavlink_message_t message) _parent.unregister_timeout_handler(_timeout_cookie); _parent.register_timeout_handler( std::bind(&MAVLinkCommands::receive_timeout, this), - work.retries_to_do * work.timeout_s, &_timeout_cookie); + work->retries_to_do * work->timeout_s, &_timeout_cookie); break; } } void MAVLinkCommands::receive_timeout() { - // If nothing is in the queue, we ignore the timeout. - if (_work_queue.size() == 0) { + std::lock_guard lock(_state_mutex); + + if (_state != State::WAITING) { + LogWarn() << "Received command timeout but not waiting."; return; } - Work &work = _work_queue.front(); - - std::lock_guard lock(_state_mutex); + // If we're not waiting, we ignore this. + auto work = _work_queue.borrow_front(); - if (_state == State::WAITING) { + if (!work) { + // Nevermind, there is nothing to do. + return; + } - if (work.retries_to_do > 0) { + if (work->retries_to_do > 0) { - LogInfo() << "sending again, retries to do: " << work.retries_to_do - << " (" << work.mavlink_command << ")."; - // We're not sure the command arrived, let's retransmit. - if (!_parent.send_message(work.mavlink_message)) { - LogErr() << "connection send error in retransmit (" << work.mavlink_command << ")."; - if (work.callback) { - work.callback(Result::CONNECTION_ERROR, NAN); - } - _state = State::FAILED; - } else { - --work.retries_to_do; - _parent.register_timeout_handler( - std::bind(&MAVLinkCommands::receive_timeout, this), - work.timeout_s, &_timeout_cookie); + // We're not sure the command arrived, let's retransmit. + LogWarn() << "sending again, retries to do: " << work->retries_to_do + << " (" << work->mavlink_command << ")."; + if (!_parent.send_message(work->mavlink_message)) { + LogErr() << "connection send error in retransmit (" << work->mavlink_command << ")."; + if (work->callback) { + work->callback(Result::CONNECTION_ERROR, NAN); } + _state = State::NONE; + _work_queue.pop_front(); - } else { - // We have tried retransmitting, giving up now. - LogErr() << "Retrying failed (" << work.mavlink_command << ")"; + } else { + --work->retries_to_do; + _parent.register_timeout_handler( + std::bind(&MAVLinkCommands::receive_timeout, this), + work->timeout_s, &_timeout_cookie); + _work_queue.return_front(); + } - if (work.callback) { - if (_state == State::WAITING) { - work.callback(Result::TIMEOUT, NAN); - } + } else { + // We have tried retransmitting, giving up now. + LogErr() << "Retrying failed (" << work->mavlink_command << ")"; + + if (work->callback) { + if (_state == State::WAITING) { + work->callback(Result::TIMEOUT, NAN); } - _state = State::FAILED; - _work_queue.pop_front(); } + _state = State::NONE; + _work_queue.pop_front(); } } void MAVLinkCommands::do_work() { - std::lock_guard lock(_state_mutex); - - // Clean up first - switch (_state) { - case State::NONE: - // FALLTHROUGH - case State::WAITING: - // FALLTHROUGH - case State::IN_PROGRESS: - break; - case State::DONE: - // FALLTHROUGH - case State::FAILED: - _parent.unregister_timeout_handler(_timeout_cookie); - _state = State::NONE; - break; - } - - // Check if there is work to do. - if (_work_queue.size() == 0) { + auto work = _work_queue.borrow_front(); + if (!work) { // Nothing to do. return; } - // If so, let's get the latest. - Work &work = _work_queue.front(); + std::lock_guard lock(_state_mutex); // If the work state is none, we can start the next command. switch (_state) { case State::NONE: - // LogDebug() << "sending it the first time (" << work.mavlink_command << ")"; - if (!_parent.send_message(work.mavlink_message)) { - LogErr() << "connection send error (" << work.mavlink_command << ")"; - if (work.callback) { - work.callback(Result::CONNECTION_ERROR, NAN); + // LogDebug() << "sending it the first time (" << work->mavlink_command << ")"; + if (!_parent.send_message(work->mavlink_message)) { + LogErr() << "connection send error (" << work->mavlink_command << ")"; + if (work->callback) { + work->callback(Result::CONNECTION_ERROR, NAN); } - _state = State::FAILED; + _work_queue.pop_front(); + _state = State::NONE; break; } else { _state = State::WAITING; _parent.register_timeout_handler( std::bind(&MAVLinkCommands::receive_timeout, this), - work.timeout_s, &_timeout_cookie); + work->timeout_s, &_timeout_cookie); + _work_queue.return_front(); } break; case State::WAITING: - case State::IN_PROGRESS: - // LogWarn() << "wait until we can deal with this"; - break; - case State::DONE: // FALLTHROUGH - case State::FAILED: - _work_queue.pop_front(); - _state = State::NONE; + case State::IN_PROGRESS: + _work_queue.return_front(); break; } } diff --git a/core/mavlink_commands.h b/core/mavlink_commands.h index c1cc210255..db351987ed 100644 --- a/core/mavlink_commands.h +++ b/core/mavlink_commands.h @@ -111,16 +111,14 @@ class MAVLinkCommands enum class State { NONE, WAITING, - IN_PROGRESS, - DONE, - FAILED - } _state = State::NONE; + IN_PROGRESS + } _state {State::NONE}; std::mutex _state_mutex {}; struct Work { - int retries_to_do = 3; - double timeout_s = 0.5; - uint16_t mavlink_command = 0; + int retries_to_do {3}; + double timeout_s {0.5}; + uint16_t mavlink_command {0}; mavlink_message_t mavlink_message {}; command_result_callback_t callback {}; }; From fc39f6cdabe2c541fc5132fbf75f31aad1ef590c Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 11:36:53 -0400 Subject: [PATCH 4/9] mavlink_parameters: adapt to new locked_queue API We now need to borrow and return the front. This should resolve some possible races for front. --- core/mavlink_parameters.cpp | 147 ++++++++++++++++++++---------------- 1 file changed, 80 insertions(+), 67 deletions(-) diff --git a/core/mavlink_parameters.cpp b/core/mavlink_parameters.cpp index 313961b1c5..57654a1220 100644 --- a/core/mavlink_parameters.cpp +++ b/core/mavlink_parameters.cpp @@ -92,21 +92,29 @@ void MAVLinkParameters::do_work() return; } - if (_set_param_queue.size() > 0) { - // There params to set which we always do first - SetParamWork &work = _set_param_queue.front(); + auto set_param_work = _set_param_queue.borrow_front(); + auto get_param_work = _get_param_queue.borrow_front(); + + if (!set_param_work && !get_param_work) { + return; + } + + // There params to set which we always do first + if (set_param_work) { + // We don't need get_param_work and can give it back. + _get_param_queue.return_front(); // We need to wait for this param to get sent back as confirmation. _state = State::SET_PARAM_BUSY; char param_id[PARAM_ID_LEN] = {}; - STRNCPY(param_id, work.param_name.c_str(), sizeof(param_id)); + STRNCPY(param_id, set_param_work->param_name.c_str(), sizeof(param_id)); mavlink_message_t message = {}; - if (work.extended) { + if (set_param_work->extended) { char param_value_buf[128] = {}; - work.param_value.get_128_bytes(param_value_buf); + set_param_work->param_value.get_128_bytes(param_value_buf); // FIXME: extended currently always go to the camera component mavlink_msg_param_ext_set_pack(GCSClient::system_id, @@ -116,7 +124,7 @@ void MAVLinkParameters::do_work() MAV_COMP_ID_CAMERA, param_id, param_value_buf, - work.param_value.get_mav_param_ext_type()); + set_param_work->param_value.get_mav_param_ext_type()); } else { // Param set is intended for Autopilot only. mavlink_msg_param_set_pack(GCSClient::system_id, @@ -125,16 +133,17 @@ void MAVLinkParameters::do_work() _parent.get_system_id(), _parent.get_autopilot_id(), param_id, - work.param_value.get_4_float_bytes(), - work.param_value.get_mav_param_type()); + set_param_work->param_value.get_4_float_bytes(), + set_param_work->param_value.get_mav_param_type()); } if (!_parent.send_message(message)) { LogErr() << "Error: Send message failed"; - if (work.callback) { - work.callback(false); + if (set_param_work->callback) { + set_param_work->callback(false); } _state = State::NONE; + _set_param_queue.pop_front(); return; } @@ -145,21 +154,23 @@ void MAVLinkParameters::do_work() 0.5, &_timeout_cookie); - } else if (_get_param_queue.size() > 0) { + _set_param_queue.return_front(); + + } else { - GetParamWork work = _get_param_queue.front(); + _set_param_queue.return_front(); // The busy flag gets reset when the param comes in // or after a timeout. _state = State::GET_PARAM_BUSY; char param_id[PARAM_ID_LEN] = {}; - STRNCPY(param_id, work.param_name.c_str(), sizeof(param_id)); + STRNCPY(param_id, get_param_work->param_name.c_str(), sizeof(param_id)); - // LogDebug() << "now getting: " << work.param_name; + // LogDebug() << "now getting: " << get_param_work->param_name; mavlink_message_t message = {}; - if (work.extended) { + if (get_param_work->extended) { mavlink_msg_param_ext_request_read_pack(GCSClient::system_id, GCSClient::component_id, &message, @@ -187,11 +198,12 @@ void MAVLinkParameters::do_work() if (!_parent.send_message(message)) { LogErr() << "Error: Send message failed"; - if (work.callback) { + if (get_param_work->callback) { ParamValue empty_param; - work.callback(false, empty_param); + get_param_work->callback(false, empty_param); } _state = State::NONE; + _get_param_queue.pop_front(); return; } @@ -201,6 +213,8 @@ void MAVLinkParameters::do_work() _parent.register_timeout_handler(std::bind(&MAVLinkParameters::receive_timeout, this), 0.5, &_timeout_cookie); + + _get_param_queue.return_front(); } } @@ -219,43 +233,44 @@ void MAVLinkParameters::process_param_value(const mavlink_message_t &message) if (_state == State::GET_PARAM_BUSY) { - // This means we should have a queue entry to use - if (_get_param_queue.size() > 0) { - GetParamWork &work = _get_param_queue.front(); - - if (strncmp(work.param_name.c_str(), param_value.param_id, PARAM_ID_LEN) == 0) { + auto work = _get_param_queue.borrow_front(); + if (work) { + if (strncmp(work->param_name.c_str(), param_value.param_id, PARAM_ID_LEN) == 0) { - if (work.callback) { + if (work->callback) { ParamValue value; value.set_from_mavlink_param_value(param_value); - work.callback(true, value); + work->callback(true, value); } _state = State::NONE; _parent.unregister_timeout_handler(_timeout_cookie); // LogDebug() << "time taken: " << _parent.get_time().elapsed_since_s(_last_request_time); _get_param_queue.pop_front(); + } else { + // No match, let's just return the borrowed work item. + _get_param_queue.return_front(); } } } else if (_state == State::SET_PARAM_BUSY) { - // This means we should have a queue entry to use - if (_set_param_queue.size() > 0) { - SetParamWork &work = _set_param_queue.front(); - + auto work = _set_param_queue.borrow_front(); + if (work) { // Now it still needs to match the param name - if (strncmp(work.param_name.c_str(), param_value.param_id, PARAM_ID_LEN) == 0) { + if (strncmp(work->param_name.c_str(), param_value.param_id, PARAM_ID_LEN) == 0) { // We are done, inform caller and go back to idle - if (work.callback) { - work.callback(true); + if (work->callback) { + work->callback(true); } _state = State::NONE; _parent.unregister_timeout_handler(_timeout_cookie); // LogDebug() << "time taken: " << _parent.get_time().elapsed_since_s(_last_request_time); _set_param_queue.pop_front(); + } else { + _set_param_queue.return_front(); } } } @@ -275,21 +290,21 @@ void MAVLinkParameters::process_param_ext_value(const mavlink_message_t &message if (_state == State::GET_PARAM_BUSY) { - // This means we should have a queue entry to use - if (_get_param_queue.size() > 0) { - GetParamWork &work = _get_param_queue.front(); - - if (strncmp(work.param_name.c_str(), param_ext_value.param_id, PARAM_ID_LEN) == 0) { + auto work = _get_param_queue.borrow_front(); + if (work) { + if (strncmp(work->param_name.c_str(), param_ext_value.param_id, PARAM_ID_LEN) == 0) { - if (work.callback) { + if (work->callback) { ParamValue value; value.set_from_mavlink_param_ext_value(param_ext_value); - work.callback(true, value); + work->callback(true, value); } _state = State::NONE; _parent.unregister_timeout_handler(_timeout_cookie); // LogDebug() << "time taken: " << _parent.get_time().elapsed_since_s(_last_request_time); _get_param_queue.pop_front(); + } else { + _get_param_queue.return_front(); } } } @@ -297,22 +312,22 @@ void MAVLinkParameters::process_param_ext_value(const mavlink_message_t &message #if 0 else if (_state == State::SET_PARAM_BUSY) { - // This means we should have a queue entry to use - if (_set_param_queue.size() > 0) { - SetParamWork &work = _set_param_queue.front(); - + auto work = _set_param_queue.borrow_front(); + if (work) { // Now it still needs to match the param name - if (strncmp(work.param_name.c_str(), param_ext_value.param_id, PARAM_ID_LEN) == 0) { + if (strncmp(work->param_name.c_str(), param_ext_value.param_id, PARAM_ID_LEN) == 0) { // We are done, inform caller and go back to idle - if (work.callback) { - work.callback(true); + if (work->callback) { + work->callback(true); } _state = State::NONE; _parent.unregister_timeout_handler(_timeout_cookie); // LogDebug() << "time taken: " << _parent.get_time().elapsed_since_s(_last_request_time); _set_param_queue.pop_front(); + } else { + _set_param_queue.return_front(); } } } @@ -332,17 +347,15 @@ void MAVLinkParameters::process_param_ext_ack(const mavlink_message_t &message) return; } - // This means we should have a queue entry to use - if (_set_param_queue.size() > 0) { - SetParamWork &work = _set_param_queue.front(); - + auto work = _set_param_queue.borrow_front(); + if (work) { // Now it still needs to match the param name - if (strncmp(work.param_name.c_str(), param_ext_ack.param_id, PARAM_ID_LEN) == 0) { + if (strncmp(work->param_name.c_str(), param_ext_ack.param_id, PARAM_ID_LEN) == 0) { if (param_ext_ack.param_result == PARAM_ACK_ACCEPTED) { // We are done, inform caller and go back to idle - if (work.callback) { - work.callback(true); + if (work->callback) { + work->callback(true); } _state = State::NONE; @@ -354,6 +367,7 @@ void MAVLinkParameters::process_param_ext_ack(const mavlink_message_t &message) // Reset timeout and wait again. _parent.refresh_timeout_handler(_timeout_cookie); + _set_param_queue.return_front(); } else { @@ -361,8 +375,8 @@ void MAVLinkParameters::process_param_ext_ack(const mavlink_message_t &message) // We are done but unsuccessful // TODO: we need better error feedback - if (work.callback) { - work.callback(false); + if (work->callback) { + work->callback(false); } _state = State::NONE; @@ -370,7 +384,8 @@ void MAVLinkParameters::process_param_ext_ack(const mavlink_message_t &message) // LogDebug() << "time taken: " << _parent.get_time().elapsed_since_s(_last_request_time); _set_param_queue.pop_front(); } - + } else { + _set_param_queue.return_front(); } } } @@ -386,16 +401,14 @@ void MAVLinkParameters::receive_timeout() if (_state == State::GET_PARAM_BUSY) { - // This means work has been going on that we should try again - if (_get_param_queue.size() > 0) { - GetParamWork &work = _get_param_queue.front(); - - if (work.callback) { + auto work = _get_param_queue.borrow_front(); + if (work) { + if (work->callback) { ParamValue empty_value; // Notify about timeout - LogErr() << "Error: get param busy timeout: " << work.param_name; + LogErr() << "Error: get param busy timeout: " << work->param_name; // LogErr() << "Got it after: " << _parent.get_time().elapsed_since_s(_last_request_time); - work.callback(false, empty_value); + work->callback(false, empty_value); } _state = State::NONE; _get_param_queue.pop_front(); @@ -406,13 +419,13 @@ void MAVLinkParameters::receive_timeout() // This means work has been going on that we should try again if (_set_param_queue.size() > 0) { - SetParamWork &work = _set_param_queue.front(); + auto work = _set_param_queue.borrow_front(); - if (work.callback) { + if (work->callback) { // Notify about timeout - LogErr() << "Error: set param busy timeout: " << work.param_name; + LogErr() << "Error: set param busy timeout: " << work->param_name; // LogErr() << "Got it after: " << _parent.get_time().elapsed_since_s(_last_request_time); - work.callback(false); + work->callback(false); } _state = State::NONE; _set_param_queue.pop_front(); From cb842926fd32593227d4f51f0ec4b0678e090005 Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 11:37:34 -0400 Subject: [PATCH 5/9] safe_queue: move initializers into declarations --- core/safe_queue.h | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/safe_queue.h b/core/safe_queue.h index 24e1de4753..261d8b59d0 100644 --- a/core/safe_queue.h +++ b/core/safe_queue.h @@ -16,11 +16,7 @@ template class SafeQueue { public: - SafeQueue() : - _queue(), - _mutex(), - _condition_var() - {} + SafeQueue() {} ~SafeQueue() {} void enqueue(T item) @@ -55,10 +51,10 @@ class SafeQueue } private: - std::queue _queue; - mutable std::mutex _mutex; - std::condition_variable _condition_var; - bool _should_exit = false; + std::queue _queue {}; + mutable std::mutex _mutex {}; + std::condition_variable _condition_var {}; + bool _should_exit {false}; }; } // namespace dronecore From d4ae2fadb415e0530b80883d8d237ee82cbe5ebd Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 12:51:36 -0400 Subject: [PATCH 6/9] locked_queue: fix unlocking on Windows It turns out that double unlocking a mutex is not possible on Windows. Therefore, we need to try to lock it before unlocking it. --- core/locked_queue.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/locked_queue.h b/core/locked_queue.h index 3be4724ab6..996a92eaa2 100644 --- a/core/locked_queue.h +++ b/core/locked_queue.h @@ -35,6 +35,9 @@ class LockedQueue // This allows to return a borrowed queue. void return_front() { + // We don't know if the mutex is locked and Windows doesn't let us + // unlock an unowned mutex. + _mutex.try_lock(); _mutex.unlock(); } From 6176c89380ccae25d3d60e39856b1b6662f6bfb0 Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 12:55:09 -0400 Subject: [PATCH 7/9] locked_queue: wait longer to pass CI --- core/locked_queue_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/locked_queue_test.cpp b/core/locked_queue_test.cpp index 48d56c0988..3cca17859c 100644 --- a/core/locked_queue_test.cpp +++ b/core/locked_queue_test.cpp @@ -97,6 +97,6 @@ TEST(LockedQueue, ConcurrantAccess) locked_queue.return_front(); - status = fut.wait_for(std::chrono::milliseconds(10)); + status = fut.wait_for(std::chrono::milliseconds(100)); EXPECT_EQ(status, std::future_status::ready); } From d27044024b57d66444ecb130d1c0621d4ddd1b10 Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 13:15:17 -0400 Subject: [PATCH 8/9] core: fix style --- core/locked_queue_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/locked_queue_test.cpp b/core/locked_queue_test.cpp index 3cca17859c..d93532e48c 100644 --- a/core/locked_queue_test.cpp +++ b/core/locked_queue_test.cpp @@ -83,7 +83,7 @@ TEST(LockedQueue, ConcurrantAccess) auto fut = prom->get_future(); auto some_future = std::async(std::launch::async | std::launch::deferred, - [&prom, &locked_queue]() { + [&prom, &locked_queue]() { // This will wait in the lock until the first item is returned. auto second_borrowed_item = locked_queue.borrow_front(); locked_queue.return_front(); From 30ed2d64223c465066d0c86a780042bf768ee38a Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 11 May 2018 15:13:01 -0400 Subject: [PATCH 9/9] locked_queue: we can't use std::deferred It seems like with std::deferred this doesn't work as intended with older compilers. --- core/locked_queue_test.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/locked_queue_test.cpp b/core/locked_queue_test.cpp index d93532e48c..36594469a3 100644 --- a/core/locked_queue_test.cpp +++ b/core/locked_queue_test.cpp @@ -82,7 +82,7 @@ TEST(LockedQueue, ConcurrantAccess) auto prom = std::make_shared>(); auto fut = prom->get_future(); - auto some_future = std::async(std::launch::async | std::launch::deferred, + auto some_future = std::async(std::launch::async, [&prom, &locked_queue]() { // This will wait in the lock until the first item is returned. auto second_borrowed_item = locked_queue.borrow_front(); @@ -96,7 +96,6 @@ TEST(LockedQueue, ConcurrantAccess) EXPECT_EQ(status, std::future_status::timeout); locked_queue.return_front(); - - status = fut.wait_for(std::chrono::milliseconds(100)); + status = fut.wait_for(std::chrono::milliseconds(10)); EXPECT_EQ(status, std::future_status::ready); }