Skip to content

Commit

Permalink
PR #9210 from Eran: Fix no composite frame on playback (from Nir)
Browse files Browse the repository at this point in the history
  • Loading branch information
maloel authored Jun 13, 2021
2 parents 5d1f9f3 + ac03d9c commit 522605c
Show file tree
Hide file tree
Showing 11 changed files with 498 additions and 113 deletions.
1 change: 1 addition & 0 deletions common/utilities/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ target_sources(${LRS_TARGET}
"${CMAKE_CURRENT_LIST_DIR}/time/stopwatch.h"
"${CMAKE_CURRENT_LIST_DIR}/time/timer.h"
"${CMAKE_CURRENT_LIST_DIR}/time/periodic_timer.h"
"${CMAKE_CURRENT_LIST_DIR}/time/waiting-on.h"
"${CMAKE_CURRENT_LIST_DIR}/time/work_week.h"
"${CMAKE_CURRENT_LIST_DIR}/time/work_week.cpp"
"${CMAKE_CURRENT_LIST_DIR}/time/l500/get-mfr-ww.h"
Expand Down
152 changes: 152 additions & 0 deletions common/utilities/time/waiting-on.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2021 Intel Corporation. All Rights Reserved.

#pragma once

#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>


namespace utilities {
namespace time {

// Helper class -- encapsulate a variable of type T that we want to wait on: another thread will set
// it and signal when we can continue...
//
// We use the least amount of synchronization mechanisms: no effort is made to synchronize (usually,
// it's not needed: only one thread will be writing to T, and the owner of T will be waiting on it)
// so it's the responsibility of the user to do so if needed.
//
template< class T >
class waiting_on
{
// We need to be careful with timeouts: if we time out then the local waiting_on can go out of
// scope and then the thread cannot set anything or signal anyone! We get around this by using a
// shared_ptr & weak_ptr to manage access:
//
public:
class wait_state_t
{
T _value;
std::condition_variable _cv;

friend class waiting_on;

public:
wait_state_t() = default; // allow default ctor
wait_state_t( T const & t )
: _value( t )
{
}

operator T &() { return _value; }
operator T const &() const { return _value; }

T* operator->() { return &_value; }
T const* operator->() const { return &_value; }

// Set a new value and signal
void signal( T const & t )
{
_value = t;
signal();
}
// Signal with the current value
void signal()
{
_cv.notify_one();
}
void signal_all()
{
_cv.notify_all();
}
};
private:
std::shared_ptr< wait_state_t > _ptr;

// When we declare the signalling lambda for the other thread, we need to pass it the weak_ptr.
// This class wraps it up nicely, so you can write:
// waiting_on< bool > invoked( false )
// auto thread_function = [invoked = invoked.in_thread()]() {
// invoked.signal( true );
// }
//
public:
class in_thread_
{
std::weak_ptr< class wait_state_t > const _ptr;

public:
in_thread_( waiting_on const& local )
: _ptr( local._ptr )
{
}
#if 0 // TODO this causes major slowdowns! left in here for Eran to break his head against...
~in_thread_()
{
// We get here when the lambda we're in is destroyed -- so either we've already run
// (and signalled once) or we've never run. We signal anyway -- if anything's waiting
// they'll get woken up; otherwise nothing'll happen...
if( auto wait_state = still_alive() )
wait_state->signal_all();
}
#endif

std::shared_ptr< wait_state_t > still_alive() const { return _ptr.lock(); }

// Wake up the local function (which is using wait_until(), presumable) with a new
// T value
void signal( T const& t ) const
{
if( auto wait_state = still_alive() )
wait_state->signal( t );
}
};

public:
waiting_on()
: _ptr( std::make_shared< wait_state_t >() )
{
}
waiting_on( T const & value )
: _ptr( std::make_shared< wait_state_t >( value ) )
{
}

// Convert to the in-thread representation
in_thread_ in_thread() const { return in_thread_( *this ); }

operator T const &() const { return *_ptr; }

// struct value_t { double x; int k; };
// waiting_on< value_t > output({ 1., -1 });
// output->x = 2.;
T * operator->() { return &_ptr->_value; }
T const * operator->() const { return &_ptr->_value; }

// Wait until either the timeout occurs, or the predicate evaluates to true.
// Equivalent to:
// while( ! pred() )
// {
// wait( timeout );
// if( timed-out )
// break;
// }
template < class U, class L >
void wait_until( U const& timeout, L const& pred )
{
// Note that the mutex is useless and used only for the wait -- we assume here that access
// to the T data does not need mutual exclusion
std::mutex m;
// Following will issue (from CppCheck):
// warning: The lock is ineffective because the mutex is locked at the same scope as the mutex itself. [localMutex]
std::unique_lock< std::mutex > locker( m );
_ptr->_cv.wait_for( locker, timeout, pred );
}
};


} // namespace time
} // namespace utilities
34 changes: 21 additions & 13 deletions src/core/streaming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,31 @@ namespace librealsense
std::string frame_to_string(const frame_interface & f)
{
std::ostringstream s;
auto composite = dynamic_cast<const composite_frame *>(&f);
if (composite)

if (!&f)
{
s << "[";
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
{
s << *composite->get_frame(i);
}
s << "]";
s << "[null]";
}
else
{
s << "[" << f.get_stream()->get_stream_type();
s << "/" << f.get_stream()->get_unique_id();
s << " #" << f.get_frame_number();
s << " @" << std::fixed << (double)f.get_frame_timestamp();
s << "]";
auto composite = dynamic_cast<const composite_frame*>(&f);
if (composite)
{
s << "[";
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
{
s << *composite->get_frame(i);
}
s << "]";
}
else
{
s << "[" << f.get_stream()->get_stream_type();
s << "/" << f.get_stream()->get_unique_id();
s << " #" << f.get_frame_number();
s << " @" << std::fixed << (double)f.get_frame_timestamp();
s << "]";
}
}
return s.str();
}
Expand Down
40 changes: 20 additions & 20 deletions src/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Copyright(c) 2021 Intel Corporation. All Rights Reserved.

#include "concurrency.h"
#include "types.h"
#include "../common/utilities/time/waiting-on.h"


dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_drop_callback )
Expand All @@ -27,8 +29,13 @@ dispatcher::dispatcher( unsigned int cap, std::function< void( action ) > on_dro
std::lock_guard< std::mutex > lock( _dispatch_mutex );
item( time );
}
catch( const std::exception & e )
{
LOG_ERROR( "Dispatcher [" << this << "] exception caught: " << e.what() );
}
catch( ... )
{
LOG_ERROR( "Dispatcher [" << this << "] unknown exception caught!" );
}
}
}
Expand Down Expand Up @@ -88,28 +95,21 @@ void dispatcher::stop()
}


// Return when all items in the queue are finished (within a timeout).
// Return when all current items in the queue are finished (within a timeout).
// If additional items are added while we're waiting, those will not be waited on!
// Returns false if a timeout occurred before we were done
//
bool dispatcher::flush()
{
std::mutex m;
std::condition_variable cv;
bool invoked = false;
auto wait_sucess = std::make_shared<std::atomic_bool>(true);
invoke([&, wait_sucess](cancellable_timer t)
{
///TODO: use _queue to flush, and implement properly
if (_was_stopped || !(*wait_sucess))
return;

{
std::lock_guard<std::mutex> locker(m);
invoked = true;
}
cv.notify_one();
});
std::unique_lock<std::mutex> locker(m);
*wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; });
return *wait_sucess;
if( _was_stopped )
return true; // Nothing to do - so success (no timeout)

utilities::time::waiting_on< bool > invoked( false );
invoke( [invoked = invoked.in_thread()]( cancellable_timer ) {
invoked.signal( true );
} );
invoked.wait_until( std::chrono::seconds( 10 ), [&]() {
return invoked || _was_stopped;
} );
return invoked;
}
Loading

0 comments on commit 522605c

Please sign in to comment.