Skip to content

Commit

Permalink
PR #8573 from Avishag: Syncer now produces frameset - always
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Mar 14, 2021
2 parents 8b6d3fa + 6280d01 commit 6e01478
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 99 deletions.
138 changes: 47 additions & 91 deletions src/proc/syncer-processing-block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,115 +10,71 @@

namespace librealsense
{
syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts,
bool log )
: processing_block( "syncer" )
, _enable_opts( enable_opts.begin(), enable_opts.end() )
{
// This callback gets called by the previous processing block when it is done with a frame. We
// call the matchers with the frame and eventually call the next callback in the list using frame_ready().
// This callback can get called from multiple threads, one thread per stream -- but always in the correct
// frame order per stream.
auto f = [this, log]( frame_holder frame, synthetic_source_interface * source ) {
// if the syncer is disabled passthrough the frame
bool enabled = false;
size_t n_opts = 0;
for( auto & wopt : _enable_opts )
syncer_process_unit::syncer_process_unit(std::initializer_list< bool_option::ptr > enable_opts, bool log)
: processing_block("syncer"), _matcher((new composite_identity_matcher({})))
, _enable_opts(enable_opts.begin(), enable_opts.end())
{
_matcher->set_callback([this](frame_holder f, syncronization_environment env)
{
auto opt = wopt.lock();
if( opt )
if (env.log)
{
++n_opts;
if( opt->is_true() )
{
enabled = true;
break;
}
LOG_DEBUG("SYNCED: " << frame_holder_to_string(f));
}
}
if( n_opts && ! enabled )
{
get_source().frame_ready( std::move( frame ) );
return;
}

{
std::lock_guard< std::mutex > lock( _mutex );
// We get here from within a dispatch() call, already protected by a mutex -- so only one thread can enqueue!
env.matches.enqueue(std::move(f));
});

if( ! _matcher )
// This callback gets called by the previous processing block when it is done with a frame. We
// call the matchers with the frame and eventually call the next callback in the list using frame_ready().
// This callback can get called from multiple threads, one thread per stream -- but always in the correct
// frame order per stream.
auto f = [&, log](frame_holder frame, synthetic_source_interface* source)
{
// if the syncer is disabled passthrough the frame
bool enabled = false;
size_t n_opts = 0;
for (auto& wopt : _enable_opts)
{
if( ! create_matcher( frame ) )
auto opt = wopt.lock();
if (opt)
{
get_source().frame_ready( std::move( frame ) );
return;
++n_opts;
if (opt->is_true())
{
enabled = true;
break;
}
}
}

auto env = syncronization_environment{ source, _matches, log };
_matcher->dispatch( std::move( frame ), env );
}

frame_holder f;
{
// Another thread has the lock, meaning will get into the following loop and dequeue all
// the frames. So there's nothing for us to do...
std::unique_lock< std::mutex > lock(_callback_mutex, std::try_to_lock);
if (!lock.owns_lock())
if (n_opts && !enabled)
{
get_source().frame_ready(std::move(frame));
return;
}

while( _matches.try_dequeue( &f ) )
{
LOG_DEBUG("dequeue: " << *f.frame);
get_source().frame_ready( std::move( f ) );
std::lock_guard<std::mutex> lock(_mutex);
_matcher->dispatch(std::move(frame), { source, _matches, log });
}
}
};

set_processing_callback( std::shared_ptr< rs2_frame_processor_callback >(
new internal_frame_processor_callback< decltype( f ) >( f ) ) );
}

bool syncer_process_unit::create_matcher( const frame_holder & frame )
{
try
{
auto sensor = frame.frame->get_sensor().get();
if (!sensor)
{
LOG_DEBUG("Sensor is not exist any more cannot create matcher the frame will passthrough ");
return false;
}

const device_interface * dev = nullptr;
dev = sensor->get_device().shared_from_this().get();
if (dev)
{
_matcher = dev->create_matcher(frame);
frame_holder f;
{
// Another thread has the lock, meaning will get into the following loop and dequeue all
// the frames. So there's nothing for us to do...
std::unique_lock< std::mutex > lock(_callback_mutex, std::try_to_lock);
if (!lock.owns_lock())
return;

_matcher->set_callback([this](frame_holder f, const syncronization_environment& env) {
if (env.log)
while (_matches.try_dequeue(&f))
{
LOG_DEBUG("SYNCED: " << *f.frame);
get_source().frame_ready(std::move(f));
}
}

// We get here from within a dispatch() call, already protected by a mutex -- so only one thread can enqueue!
env.matches.enqueue(std::move(f));
});
}
else
{
LOG_DEBUG("Device is not exist any more cannot create matcher, the frame will passthrough ");
return false;
}
};

set_processing_callback(std::shared_ptr<rs2_frame_processor_callback>(
new internal_frame_processor_callback<decltype(f)>(f)));
}
catch( const std::bad_weak_ptr & )
{
LOG_DEBUG("Device was destroyed while trying get shared ptr of it, couldn't create matcher, the frame will passthrough ");
return false;
}

return true;

}
} // namespace librealsense
2 changes: 0 additions & 2 deletions src/proc/syncer-processing-block.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ namespace librealsense
_matcher.reset();
}
private:
bool create_matcher(const frame_holder& frame);

std::shared_ptr<matcher> _matcher;
std::vector< std::weak_ptr<bool_option> > _enable_opts;

Expand Down
113 changes: 107 additions & 6 deletions src/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,96 @@ namespace librealsense

std::shared_ptr<matcher> composite_matcher::find_matcher(const frame_holder& frame)
{
std::shared_ptr<matcher> matcher;
auto stream_id = frame.frame->get_stream()->get_unique_id();
auto matcher = _matchers[stream_id];
auto stream_type = frame.frame->get_stream()->get_stream_type();

if (!matcher->get_active())
auto sensor = frame.frame->get_sensor().get(); //TODO: Potential deadlock if get_sensor() gets a hold of the last reference of that sensor

auto dev_exist = false;

if (sensor)
{
matcher->set_active(true);
_frames_queue[matcher.get()].start();

const device_interface* dev = nullptr;
try
{
dev = sensor->get_device().shared_from_this().get();
}
catch (const std::bad_weak_ptr&)
{
LOG_WARNING("Device destroyed");
}
if (dev)
{
dev_exist = true;
matcher = _matchers[stream_id];
if (!matcher)
{
std::ostringstream ss;
for (auto const & it : _matchers)
ss << ' ' << it.first;
LOG_DEBUG("stream id " << stream_id << " was not found; trying to create, existing streams=" << ss.str());
matcher = dev->create_matcher(frame);

matcher->set_callback(
[&](frame_holder f, syncronization_environment env)
{
sync(std::move(f), env);
});

for (auto stream : matcher->get_streams())
{
if (_matchers[stream])
{
_frames_queue.erase(_matchers[stream].get());
}
_matchers[stream] = matcher;
_streams_id.push_back(stream);
}
for (auto stream : matcher->get_streams_types())
{
_streams_type.push_back(stream);
}

if (std::find(_streams_type.begin(), _streams_type.end(), stream_type) == _streams_type.end())
{
LOG_ERROR("Stream matcher not found! stream=" << rs2_stream_to_string(stream_type));
}
}
else if (!matcher->get_active())
{
matcher->set_active(true);
_frames_queue[matcher.get()].start();
}
}
}
else
{
LOG_DEBUG("sensor does not exist");
}

if (!dev_exist)
{
matcher = _matchers[stream_id];
// We don't know what device this frame came from, so just store it under device NULL with ID matcher
if (!matcher)
{
if (_matchers[stream_id])
{
_frames_queue.erase(_matchers[stream_id].get());
}
_matchers[stream_id] = std::make_shared<identity_matcher>(stream_id, stream_type);
_streams_id.push_back(stream_id);
_streams_type.push_back(stream_type);
matcher = _matchers[stream_id];

matcher->set_callback([&](frame_holder f, syncronization_environment env)
{
sync(std::move(f), env);
});
}
}
return matcher;
}

Expand Down Expand Up @@ -530,7 +611,27 @@ namespace librealsense

void composite_identity_matcher::sync(frame_holder f, const syncronization_environment& env)
{
LOG_DEBUG("by_pass_composite_matcher: " << _name << " " << frame_holder_to_string(f));
_callback(std::move(f), env);
LOG_DEBUG("composite_identity_matcher: " << _name << " " << frame_holder_to_string(f));

auto composite = dynamic_cast<const composite_frame *>(f.frame);
// Syncer have to output composite frame
if (!composite)
{
std::vector<frame_holder> match;
match.push_back(std::move(f));
frame_holder composite = env.source->allocate_composite_frame(std::move(match));
if (composite.frame)
{
auto cb = begin_callback();
_callback(std::move(composite), env);
}
else
{
LOG_ERROR("composite_identity_matcher: " << _name << " " << frame_holder_to_string(f) << " faild to create composite_frame, user callback will not be called");
}
}
else
_callback(std::move(f), env);

}
} // namespace librealsense

0 comments on commit 6e01478

Please sign in to comment.