Skip to content

Commit

Permalink
Merge pull request #579 from Dronecode/fix-camera-status-race
Browse files Browse the repository at this point in the history
Fix camera status race
  • Loading branch information
julianoes authored Oct 26, 2018
2 parents 466e7e2 + 18cf93a commit 7dcc859
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 34 deletions.
54 changes: 30 additions & 24 deletions backend/src/plugins/camera/camera_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,17 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
std::promise<void> stream_closed_promise;
auto stream_closed_future = stream_closed_promise.get_future();

bool is_finished = false;
auto is_finished = std::make_shared<bool>(false);

_camera.subscribe_mode([this, &writer, &stream_closed_promise, &is_finished](
_camera.subscribe_mode([this, &writer, &stream_closed_promise, is_finished](
const dronecode_sdk::Camera::Mode mode) {
rpc::camera::ModeResponse rpc_mode_response;
rpc_mode_response.set_camera_mode(translateCameraMode(mode));

std::lock_guard<std::mutex> lock(_subscribe_mutex);
if (!writer->Write(rpc_mode_response) && !is_finished) {
is_finished = true;
if (!*is_finished && !writer->Write(rpc_mode_response)) {
_camera.subscribe_mode(nullptr);
*is_finished = true;
stream_closed_promise.set_value();
}
});
Expand Down Expand Up @@ -294,19 +295,20 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
std::promise<void> stream_closed_promise;
auto stream_closed_future = stream_closed_promise.get_future();

bool is_finished = false;
auto is_finished = std::make_shared<bool>(false);

_camera.subscribe_video_stream_info(
[this, &writer, &stream_closed_promise, &is_finished](
[this, &writer, &stream_closed_promise, is_finished](
const dronecode_sdk::Camera::VideoStreamInfo video_info) {
rpc::camera::VideoStreamInfoResponse rpc_video_stream_info_response;
auto video_stream_info = translateVideoStreamInfo(video_info);
rpc_video_stream_info_response.set_allocated_video_stream_info(
video_stream_info.release());

std::lock_guard<std::mutex> lock(_subscribe_mutex);
if (!writer->Write(rpc_video_stream_info_response) && !is_finished) {
is_finished = true;
if (!*is_finished && !writer->Write(rpc_video_stream_info_response)) {
_camera.subscribe_video_stream_info(nullptr);
*is_finished = true;
stream_closed_promise.set_value();
}
});
Expand All @@ -324,17 +326,18 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
std::promise<void> stream_closed_promise;
auto stream_closed_future = stream_closed_promise.get_future();

bool is_finished = false;
auto is_finished = std::make_shared<bool>(false);

_camera.subscribe_capture_info([this, &writer, &stream_closed_promise, &is_finished](
_camera.subscribe_capture_info([this, &writer, &stream_closed_promise, is_finished](
const dronecode_sdk::Camera::CaptureInfo capture_info) {
rpc::camera::CaptureInfoResponse rpc_capture_info_response;
auto rpc_capture_info = translateCaptureInfo(capture_info);
rpc_capture_info_response.set_allocated_capture_info(rpc_capture_info.release());

std::lock_guard<std::mutex> lock(_subscribe_mutex);
if (!writer->Write(rpc_capture_info_response) && !is_finished) {
is_finished = true;
if (!*is_finished && !writer->Write(rpc_capture_info_response)) {
_camera.subscribe_capture_info(nullptr);
*is_finished = true;
stream_closed_promise.set_value();
}
});
Expand Down Expand Up @@ -460,17 +463,18 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
std::promise<void> stream_closed_promise;
auto stream_closed_future = stream_closed_promise.get_future();

bool is_finished = false;
auto is_finished = std::make_shared<bool>(false);

_camera.subscribe_status([this, &writer, &stream_closed_promise, &is_finished](
_camera.subscribe_status([this, &writer, &stream_closed_promise, is_finished](
const dronecode_sdk::Camera::Status camera_status) {
rpc::camera::CameraStatusResponse rpc_camera_status_response;
auto rpc_camera_status = translateCameraStatus(camera_status);
rpc_camera_status_response.set_allocated_camera_status(rpc_camera_status.release());

std::lock_guard<std::mutex> lock(_subscribe_mutex);
if (!writer->Write(rpc_camera_status_response) && !is_finished) {
is_finished = true;
if (!*is_finished && !writer->Write(rpc_camera_status_response)) {
_camera.subscribe_status(nullptr);
*is_finished = true;
stream_closed_promise.set_value();
}
});
Expand Down Expand Up @@ -548,10 +552,10 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
std::promise<void> stream_closed_promise;
auto stream_closed_future = stream_closed_promise.get_future();

bool is_finished = false;
auto is_finished = std::make_shared<bool>(false);

_camera.subscribe_current_settings(
[this, &writer, &stream_closed_promise, &is_finished](
[this, &writer, &stream_closed_promise, is_finished](
const std::vector<dronecode_sdk::Camera::Setting> current_settings) {
rpc::camera::CurrentSettingsResponse rpc_current_setting_response;

Expand All @@ -561,8 +565,9 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
}

std::lock_guard<std::mutex> lock(_subscribe_mutex);
if (!writer->Write(rpc_current_setting_response) && !is_finished) {
is_finished = true;
if (!*is_finished && !writer->Write(rpc_current_setting_response)) {
_camera.subscribe_current_settings(nullptr);
*is_finished = true;
stream_closed_promise.set_value();
}
});
Expand Down Expand Up @@ -618,10 +623,10 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
std::promise<void> stream_closed_promise;
auto stream_closed_future = stream_closed_promise.get_future();

bool is_finished = false;
auto is_finished = std::make_shared<bool>(false);

_camera.subscribe_possible_setting_options(
[this, &writer, &stream_closed_promise, &is_finished](
[this, &writer, &stream_closed_promise, is_finished](
const std::vector<dronecode_sdk::Camera::SettingOptions> setting_options) {
rpc::camera::PossibleSettingOptionsResponse rpc_setting_options_response;

Expand All @@ -631,8 +636,9 @@ class CameraServiceImpl final : public rpc::camera::CameraService::Service {
}

std::lock_guard<std::mutex> lock(_subscribe_mutex);
if (!writer->Write(rpc_setting_options_response) && !is_finished) {
is_finished = true;
if (!*is_finished && !writer->Write(rpc_setting_options_response)) {
_camera.subscribe_possible_setting_options(nullptr);
*is_finished = true;
stream_closed_promise.set_value();
}
});
Expand Down
9 changes: 7 additions & 2 deletions backend/test/camera_service_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace {

using testing::_;
using testing::DoDefault;
using testing::NiceMock;
using testing::Return;

Expand Down Expand Up @@ -814,7 +815,9 @@ TEST_F(CameraServiceImplTest, registersToCameraStatus)
false, true, ARBITRARY_CAMERA_STORAGE_STATUS, 3.4f, 12.6f, 16.0f, 0.4f, "100E90HD");
dronecode_sdk::Camera::subscribe_status_callback_t status_callback;
EXPECT_CALL(_camera, subscribe_status(_))
.WillOnce(SaveResult(&status_callback, &_callback_saved_promise));
.Times(2)
.WillOnce(SaveResult(&status_callback, &_callback_saved_promise))
.WillOnce(DoDefault());
std::vector<dronecode_sdk::Camera::Status> camera_status_events;
auto context = std::make_shared<grpc::ClientContext>();

Expand Down Expand Up @@ -896,7 +899,9 @@ void CameraServiceImplTest::checkSendsCameraStatus(
dronecode_sdk::Camera::subscribe_status_callback_t camera_status_callback;
auto context = std::make_shared<grpc::ClientContext>();
EXPECT_CALL(_camera, subscribe_status(_))
.WillOnce(SaveResult(&camera_status_callback, &subscription_promise));
.Times(2)
.WillOnce(SaveResult(&camera_status_callback, &subscription_promise))
.WillOnce(DoDefault());

std::vector<dronecode_sdk::Camera::Status> received_camera_status_events;
auto camera_status_events_future =
Expand Down
29 changes: 22 additions & 7 deletions plugins/camera/camera_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,15 @@ CameraImpl::to_euler_angle_from_quaternion(Camera::CaptureInfo::Quaternion quate

void CameraImpl::notify_capture_info(Camera::CaptureInfo capture_info)
{
if (_capture_info.callback) {
_parent->call_user_callback(
[this, capture_info]() { _capture_info.callback(capture_info); });
// Make a copy because it is passed to the thread pool
const auto capture_info_callback = _capture_info.callback;

if (capture_info_callback == nullptr) {
return;
}

_parent->call_user_callback(
[capture_info, capture_info_callback]() { capture_info_callback(capture_info); });
}

void CameraImpl::process_camera_settings(const mavlink_message_t &message)
Expand Down Expand Up @@ -953,9 +958,14 @@ void CameraImpl::check_status()

void CameraImpl::notify_status(Camera::Status status)
{
if (_subscribe_status_callback) {
_parent->call_user_callback([this, status]() { _subscribe_status_callback(status); });
// Make a copy because it is passed to the thread pool
const auto status_callback = _subscribe_status_callback;

if (status_callback == nullptr) {
return;
}

_parent->call_user_callback([status, status_callback]() { status_callback(status); });
}

void CameraImpl::status_timeout_happened()
Expand Down Expand Up @@ -1030,9 +1040,14 @@ void CameraImpl::receive_set_mode_command_result(const MAVLinkCommands::Result c

void CameraImpl::notify_mode(const Camera::Mode mode)
{
if (_subscribe_mode_callback) {
_parent->call_user_callback([this, mode]() { _subscribe_mode_callback(mode); });
// Make a copy because it is passed to the thread pool
const auto mode_callback = _subscribe_mode_callback;

if (mode_callback == nullptr) {
return;
}

_parent->call_user_callback([mode, mode_callback]() { mode_callback(mode); });
}

void CameraImpl::receive_get_mode_command_result(MAVLinkCommands::Result command_result)
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.0
0.7.1

0 comments on commit 7dcc859

Please sign in to comment.