diff --git a/src/proc/syncer-processing-block.cpp b/src/proc/syncer-processing-block.cpp index 8f4ff4b543..78b898860b 100644 --- a/src/proc/syncer-processing-block.cpp +++ b/src/proc/syncer-processing-block.cpp @@ -10,115 +10,71 @@ namespace librealsense { -syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts, - bool log ) - : processing_block( "syncer" ) - , _enable_opts( enable_opts.begin(), enable_opts.end() ) -{ - // This callback gets called by the previous processing block when it is done with a frame. We - // call the matchers with the frame and eventually call the next callback in the list using frame_ready(). - // This callback can get called from multiple threads, one thread per stream -- but always in the correct - // frame order per stream. - auto f = [this, log]( frame_holder frame, synthetic_source_interface * source ) { - // if the syncer is disabled passthrough the frame - bool enabled = false; - size_t n_opts = 0; - for( auto & wopt : _enable_opts ) + syncer_process_unit::syncer_process_unit(std::initializer_list< bool_option::ptr > enable_opts, bool log) + : processing_block("syncer"), _matcher((new composite_identity_matcher({}))) + , _enable_opts(enable_opts.begin(), enable_opts.end()) + { + _matcher->set_callback([this](frame_holder f, syncronization_environment env) { - auto opt = wopt.lock(); - if( opt ) + if (env.log) { - ++n_opts; - if( opt->is_true() ) - { - enabled = true; - break; - } + LOG_DEBUG("SYNCED: " << frame_holder_to_string(f)); } - } - if( n_opts && ! enabled ) - { - get_source().frame_ready( std::move( frame ) ); - return; - } - { - std::lock_guard< std::mutex > lock( _mutex ); + // We get here from within a dispatch() call, already protected by a mutex -- so only one thread can enqueue! + env.matches.enqueue(std::move(f)); + }); - if( ! _matcher ) + // This callback gets called by the previous processing block when it is done with a frame. We + // call the matchers with the frame and eventually call the next callback in the list using frame_ready(). + // This callback can get called from multiple threads, one thread per stream -- but always in the correct + // frame order per stream. + auto f = [&, log](frame_holder frame, synthetic_source_interface* source) + { + // if the syncer is disabled passthrough the frame + bool enabled = false; + size_t n_opts = 0; + for (auto& wopt : _enable_opts) { - if( ! create_matcher( frame ) ) + auto opt = wopt.lock(); + if (opt) { - get_source().frame_ready( std::move( frame ) ); - return; + ++n_opts; + if (opt->is_true()) + { + enabled = true; + break; + } } } - - auto env = syncronization_environment{ source, _matches, log }; - _matcher->dispatch( std::move( frame ), env ); - } - - frame_holder f; - { - // Another thread has the lock, meaning will get into the following loop and dequeue all - // the frames. So there's nothing for us to do... - std::unique_lock< std::mutex > lock(_callback_mutex, std::try_to_lock); - if (!lock.owns_lock()) + if (n_opts && !enabled) + { + get_source().frame_ready(std::move(frame)); return; + } - while( _matches.try_dequeue( &f ) ) { - LOG_DEBUG("dequeue: " << *f.frame); - get_source().frame_ready( std::move( f ) ); + std::lock_guard lock(_mutex); + _matcher->dispatch(std::move(frame), { source, _matches, log }); } - } - }; - set_processing_callback( std::shared_ptr< rs2_frame_processor_callback >( - new internal_frame_processor_callback< decltype( f ) >( f ) ) ); -} - -bool syncer_process_unit::create_matcher( const frame_holder & frame ) -{ - try - { - auto sensor = frame.frame->get_sensor().get(); - if (!sensor) - { - LOG_DEBUG("Sensor is not exist any more cannot create matcher the frame will passthrough "); - return false; - } - - const device_interface * dev = nullptr; - dev = sensor->get_device().shared_from_this().get(); - if (dev) - { - _matcher = dev->create_matcher(frame); + frame_holder f; + { + // Another thread has the lock, meaning will get into the following loop and dequeue all + // the frames. So there's nothing for us to do... + std::unique_lock< std::mutex > lock(_callback_mutex, std::try_to_lock); + if (!lock.owns_lock()) + return; - _matcher->set_callback([this](frame_holder f, const syncronization_environment& env) { - if (env.log) + while (_matches.try_dequeue(&f)) { - LOG_DEBUG("SYNCED: " << *f.frame); + get_source().frame_ready(std::move(f)); } + } - // We get here from within a dispatch() call, already protected by a mutex -- so only one thread can enqueue! - env.matches.enqueue(std::move(f)); - }); - } - else - { - LOG_DEBUG("Device is not exist any more cannot create matcher, the frame will passthrough "); - return false; - } + }; + set_processing_callback(std::shared_ptr( + new internal_frame_processor_callback(f))); } - catch( const std::bad_weak_ptr & ) - { - LOG_DEBUG("Device was destroyed while trying get shared ptr of it, couldn't create matcher, the frame will passthrough "); - return false; - } - - return true; - } -} // namespace librealsense diff --git a/src/proc/syncer-processing-block.h b/src/proc/syncer-processing-block.h index 85aaecfec9..9d7a052385 100644 --- a/src/proc/syncer-processing-block.h +++ b/src/proc/syncer-processing-block.h @@ -34,8 +34,6 @@ namespace librealsense _matcher.reset(); } private: - bool create_matcher(const frame_holder& frame); - std::shared_ptr _matcher; std::vector< std::weak_ptr > _enable_opts; diff --git a/src/sync.cpp b/src/sync.cpp index 052a4cefa0..cafd65561a 100644 --- a/src/sync.cpp +++ b/src/sync.cpp @@ -145,15 +145,96 @@ namespace librealsense std::shared_ptr composite_matcher::find_matcher(const frame_holder& frame) { + std::shared_ptr matcher; auto stream_id = frame.frame->get_stream()->get_unique_id(); - auto matcher = _matchers[stream_id]; + auto stream_type = frame.frame->get_stream()->get_stream_type(); - if (!matcher->get_active()) + auto sensor = frame.frame->get_sensor().get(); //TODO: Potential deadlock if get_sensor() gets a hold of the last reference of that sensor + + auto dev_exist = false; + + if (sensor) { - matcher->set_active(true); - _frames_queue[matcher.get()].start(); + + const device_interface* dev = nullptr; + try + { + dev = sensor->get_device().shared_from_this().get(); + } + catch (const std::bad_weak_ptr&) + { + LOG_WARNING("Device destroyed"); + } + if (dev) + { + dev_exist = true; + matcher = _matchers[stream_id]; + if (!matcher) + { + std::ostringstream ss; + for (auto const & it : _matchers) + ss << ' ' << it.first; + LOG_DEBUG("stream id " << stream_id << " was not found; trying to create, existing streams=" << ss.str()); + matcher = dev->create_matcher(frame); + + matcher->set_callback( + [&](frame_holder f, syncronization_environment env) + { + sync(std::move(f), env); + }); + + for (auto stream : matcher->get_streams()) + { + if (_matchers[stream]) + { + _frames_queue.erase(_matchers[stream].get()); + } + _matchers[stream] = matcher; + _streams_id.push_back(stream); + } + for (auto stream : matcher->get_streams_types()) + { + _streams_type.push_back(stream); + } + + if (std::find(_streams_type.begin(), _streams_type.end(), stream_type) == _streams_type.end()) + { + LOG_ERROR("Stream matcher not found! stream=" << rs2_stream_to_string(stream_type)); + } + } + else if (!matcher->get_active()) + { + matcher->set_active(true); + _frames_queue[matcher.get()].start(); + } + } } + else + { + LOG_DEBUG("sensor does not exist"); + } + + if (!dev_exist) + { + matcher = _matchers[stream_id]; + // We don't know what device this frame came from, so just store it under device NULL with ID matcher + if (!matcher) + { + if (_matchers[stream_id]) + { + _frames_queue.erase(_matchers[stream_id].get()); + } + _matchers[stream_id] = std::make_shared(stream_id, stream_type); + _streams_id.push_back(stream_id); + _streams_type.push_back(stream_type); + matcher = _matchers[stream_id]; + matcher->set_callback([&](frame_holder f, syncronization_environment env) + { + sync(std::move(f), env); + }); + } + } return matcher; } @@ -530,7 +611,27 @@ namespace librealsense void composite_identity_matcher::sync(frame_holder f, const syncronization_environment& env) { - LOG_DEBUG("by_pass_composite_matcher: " << _name << " " << frame_holder_to_string(f)); - _callback(std::move(f), env); + LOG_DEBUG("composite_identity_matcher: " << _name << " " << frame_holder_to_string(f)); + + auto composite = dynamic_cast(f.frame); + // Syncer have to output composite frame + if (!composite) + { + std::vector match; + match.push_back(std::move(f)); + frame_holder composite = env.source->allocate_composite_frame(std::move(match)); + if (composite.frame) + { + auto cb = begin_callback(); + _callback(std::move(composite), env); + } + else + { + LOG_ERROR("composite_identity_matcher: " << _name << " " << frame_holder_to_string(f) << " faild to create composite_frame, user callback will not be called"); + } + } + else + _callback(std::move(f), env); + } } // namespace librealsense