The WaitSet is a set where you can attach objects so that they can signal a wide variety of events to one single notifyable. The typical approach is that one creates a WaitSet attaches multiple subscribers, user trigger or other Triggerables to it and then wait till one or many of the attached entities signal an event. If that happens one receives a list of EventInfos which is corresponding to all occurred events.
WaitSet events can be state based, this means that the WaitSet will notify you
till you reset the state. The HAS_SAMPLES
event of the subscriber for instance
will notify you as long as there are samples. But it is also possible that one
attaches one shot events. These are events which will trigger the WaitSet only once.
The WaitSet is not threadsafe!
- It is not allowed to attach or detach Triggerable
classes with
attachEvent
ordetachEvent
when another thread is currently waiting for events withwait
.
The TriggerHandle on the other hand is threadsafe! Therefore you are allowed to attach/detach a TriggerHandle to a Triggerable while another thread may trigger the TriggerHandle.
- EventCallback a callback attached to an EventInfo. It must have the
following signature
void ( EventOrigin )
. Any free function, static class method and non capturing lambda is allowed. You have to ensure the lifetime of that callback. This can become important when you would like to use lambdas. - EventId an id which is tagged to an event. It does not need to be unique
or follow any restrictions. The user can choose any arbitrary
uint64_t
. Assigning the same EventId to multiple Events can be useful when you would like to group Events. - EventInfo a class which corresponds with Triggers and is used to inform the user which Event occurred. You can use the EventInfo to acquire the EventId, call the EventCallback or acquire the EventOrigin.
- EventOrigin the pointer to the class where the Event originated from, short pointer to the Triggerable.
- Events a Triggerable will signal an event via a TriggerHandle to a Notifyable.
For instance one can attach the subscriber event
HAS_SAMPLES
to WaitSet. This will cause the subscriber to notify the WaitSet via the TriggerHandle everytime when a sample was received. - Notifyable is a class which listens to events. A TriggerHandle which corresponds to a Trigger is used to notify the Notifyable that an event occurred. The WaitSet is a Notifyable.
- Trigger a class which is used by the Notifyable to acquire the information which events were signalled. It corresponds to a TriggerHandle. If the Notifyable goes out of scope the corresponding TriggerHandle will be invalidated and if the Triggerable goes out of scope the corresponding Trigger will be invalidated.
- Triggerable a class which has attached a TriggerHandle to itself to signal certain Events to a Notifyable.
- TriggerHandle a threadsafe class which can be used to trigger a Notifyable.
If a TriggerHandle goes out of scope it will detach itself from the Notifyable. A TriggerHandle is
logical equal to another Trigger if they:
- are attached to the same Notifyable (or in other words they are using the
same
ConditionVariable
) - they have the same EventOrigin
- they have the same callback to verify that they were triggered
(
hasEventCallback
) - they have the same EventId
- are attached to the same Notifyable (or in other words they are using the
same
- WaitSet a Notifyable which manages a set of Triggers which are corresponding to Events. A user may attach or detach events. The Waitset listens to the whole set of Triggers and if one or more Triggers are triggered by an event it will notify the user. If a WaitSet goes out of scope all attached Triggers will be invalidated.
To a Notifyable like the WaitSet Events can be attached or detached.
The WaitSet will listen on Triggers for a signal that an Event has occurred and it hands out
TriggerHandles to Triggerable objects. The TriggerHandle is used to inform the WaitSet
about the occurrence of an Event. When returning from WaitSet::wait()
the user is provided with a vector of EventInfos
associated with Events which had occurred. The EventOrigin, EventId and EventCallback
are stored inside of the EventInfo and can be acquired by the user.
task | call |
---|---|
attach subscriber to a WaitSet | waitset.attachEvent(subscriber, iox::popo::SubscriberEvent::HAS_SAMPLES, 123, mySubscriberCallback) |
attach user trigger to a WaitSet | waitset.attachEvent(userTrigger, 456, myUserTriggerCallback) |
wait for triggers | auto triggerVector = myWaitSet.wait(); |
wait for triggers with timeout | auto triggerVector = myWaitSet.timedWait(1_s); |
check if event originated from some object | event.doesOriginateFrom(ptrToSomeObject) |
get id of the event | event.getEventId() |
call eventCallback | event() |
acquire EventOrigin | event.getOrigin<OriginType>(); |
This example consists of 5 use cases.
-
ice_waitset_gateway.cpp
: We build a gateway to forward data to another network. A list of subscribers is handled in an uniform way by defining a callback and which is executed for every subscriber who has received data. -
ice_waitset_grouping
: We would like to group multiple subscribers into 2 distinct groups and handle them according to their group membership. -
ice_waitset_individual
: A list of subscribers where every subscriber is handled differently. -
ice_waitset_sync
: We use the WaitSet to trigger a cyclic call which should execute an algorithm every 100ms. -
ice_waitset_trigger
: We create our own class which can be attached to a WaitSet to signal events.
All our examples require a running iox-roudi
and some data to receive which will be
send by iox-ex-waitset-publisher
. The publisher does not contain any WaitSet specific
logic and is explained in detail in the icedelivery example.
We have a list of subscribers which can be subscribed to any arbitrary topic and everytime we received a sample we would like to send the bytestream to a socket, write it into a file or print it to the console. But whatever we choose to do we perform the same task for all the subscribers.
Let's start by implementing our callback which prints the subscriber pointer, the payload size and the payload pointer to the console.
void subscriberCallback(iox::popo::UntypedSubscriber* const subscriber)
{
subscriber->take().and_then([&](iox::popo::Sample<const void>& sample) {
std::cout << "subscriber: " << std::hex << subscriber << " length: " << std::dec
<< sample.getHeader()->payloadSize << " ptr: " << std::hex << sample.getHeader()->payload()
<< std::endl;
});
}
An Event always requires a callback which has the following signature
void (EventOrigin)
. In our example the EventOrigin is a
iox::popo::UntypedSubscriber
pointer which we use to acquire the latest sample by calling
take()
. When take()
was successful we print our message to
the console inside of the and_then
lambda.
In our main
function we create a WaitSet which has storage capacity for 5 events,
4 subscribers and one shutdown trigger, after we registered us at our central
broker RouDi. Then we attach our shutdownTrigger
to handle CTRL+c
events.
iox::popo::WaitSet waitset<NUMBER_OF_SUBSCRIBERS + ONE_SHUTDOWN_TRIGGER>;
waitset.attachEvent(shutdownTrigger);
After that we create a vector to hold our subscribers, we create, subscribe and then
attach them to a WaitSet with the HAS_SAMPLES
event and the subscriberCallback
.
Everytime one
of the subscribers is receiving a new sample it will trigger the WaitSet.
iox::cxx::vector<iox::popo::UntypedSubscriber, NUMBER_OF_SUBSCRIBERS> subscriberVector;
for (auto i = 0; i < NUMBER_OF_SUBSCRIBERS; ++i)
{
subscriberVector.emplace_back(iox::capro::ServiceDescription{"Radar", "FrontLeft", "Counter"});
auto& subscriber = subscriberVector.back();
subscriber.subscribe();
waitset.attachEvent(subscriber, iox::popo::SubscriberEvent::HAS_SAMPLES, subscriberCallback);
}
Now our system is prepared and ready to work. We enter the event loop which
starts with a call to our WaitSet (waitset.wait()
). This call will block until
one or more events triggered the WaitSet. After the call returned we get a
vector filled with EventInfos which are corresponding to all the events which
triggered the WaitSet.
We iterate through this vector, if an Event originated from the shutdownTrigger
we exit the program otherwise we just call the assigned callback by calling
the trigger. This will then call subscriberCallback
with the EventOrigin
(the pointer to the untyped subscriber) as parameter.
while (true)
{
auto eventVector = waitset.wait();
for (auto& event : eventVector)
{
if (event->doesOriginateFrom(&shutdownTrigger))
{
return (EXIT_SUCCESS);
}
else
{
(*event)();
}
}
In our next use case we would like to divide the subscribers into two groups and we do not want to attach a callback to them. Instead we perform the calls on the subscribers directly.
We again start by creating a WaitSet with a capacity of 5 (4 subscribers and 1 shutdownTrigger),
and attach the shutdownTrigger
to handle CTRL+c
.
iox::popo::WaitSet<NUMBER_OF_SUBSCRIBERS + ONE_SHUTDOWN_TRIGGER> waitset;
waitset.attachEvent(shutdownTrigger);
Now we create a vector of 4 subscribers and subscribe them to our topic.
iox::cxx::vector<iox::popo::UntypedSubscriber, NUMBER_OF_SUBSCRIBERS> subscriberVector;
for (auto i = 0; i < NUMBER_OF_SUBSCRIBERS; ++i)
{
subscriberVector.emplace_back(iox::capro::ServiceDescription{"Radar", "FrontLeft", "Counter"});
auto& subscriber = subscriberVector.back();
subscriber.subscribe();
}
After that we define our two groups with the ids FIRST_GROUP_ID
and SECOND_GROUP_ID
and attach the first two subscribers to the first group and the remaining subscribers
to the second group.
for (auto i = 0; i < NUMBER_OF_SUBSCRIBERS / 2; ++i)
{
waitset.attachEvent(subscriberVector[i], iox::popo::SubscriberEvent::HAS_SAMPLES, FIRST_GROUP_ID);
}
for (auto i = NUMBER_OF_SUBSCRIBERS / 2; i < NUMBER_OF_SUBSCRIBERS; ++i)
{
waitset.attachEvent(subscriberVector[i], iox::popo::SubscriberEvent::HAS_SAMPLES, SECOND_GROUP_ID);
}
The event loop calls auto eventVector = waitset.wait()
in a blocking call to
receive a vector of all the EventInfos which are corresponding to the occurred events.
If the Event originated from the shutdownTrigger
we terminate the program.
while (true)
{
auto eventVector = waitset.wait();
for (auto& event : eventVector)
{
if (event->doesOriginateFrom(&shutdownTrigger))
{
return (EXIT_SUCCESS);
}
The remaining part of the loop is handling the subscribers. In the first group we would like to print the received data to the console and in the second group we just dismiss the received data.
else if (event->getEventId() == FIRST_GROUP_ID)
{
auto subscriber = event->getOrigin<iox::popo::UntypedSubscriber>();
subscriber->take().and_then([&](iox::popo::Sample<const void>& sample) {
const CounterTopic* data = reinterpret_cast<const CounterTopic*>(sample.get());
std::cout << "received: " << std::dec << data->counter << std::endl;
});
}
else if (event->getEventId() == SECOND_GROUP_ID)
{
std::cout << "dismiss data\n";
auto subscriber = event->getOrigin<iox::popo::UntypedSubscriber>();
subscriber->releaseQueuedSamples();
}
Important The second group needs to release all queued samples otherwise the WaitSet would notify the user again and again that the subscriber from the second group has new samples.
When every Triggerable requires a different reaction we need to know the
origin of an Event. We can call event.doesOriginateFrom(EventOrigin)
which will return true if the event originated from EventOrigin and
otherwise false.
We start this example by creating a WaitSet with the default capacity and
attaching the shutdownTrigger
to handle CTRL-c
.
iox::popo::WaitSet waitset<>;
waitset.attachEvent(shutdownTrigger);
Additionally, we create two subscribers, subscribe them to our topic and attach them to the waitset to let them inform us whenever they receive a new sample.
iox::popo::TypedSubscriber<CounterTopic> subscriber1({"Radar", "FrontLeft", "Counter"});
iox::popo::TypedSubscriber<CounterTopic> subscriber2({"Radar", "FrontLeft", "Counter"});
subscriber1.subscribe();
subscriber2.subscribe();
waitset.attachEvent(subscriber1, iox::popo::SubscriberEvent::HAS_SAMPLES);
waitset.attachEvent(subscriber2, iox::popo::SubscriberEvent::HAS_SAMPLES);
With that set up we enter the event loop and handle the program termination first.
while (true)
{
auto eventVector = waitset.wait();
for (auto& event : eventVector)
{
if (event->doesOriginateFrom(&shutdownTrigger))
{
return (EXIT_SUCCESS);
}
When the origin is subscriber1
we would like to state that subscriber 1 has received the
following number X. But for subscriber2
we just dismiss the received samples.
We accomplish this by asking the event
if it originated from the
corresponding subscriber. If so we act.
else if (event->doesOriginateFrom(&subscriber1))
{
subscriber1.take().and_then([&](iox::popo::Sample<const CounterTopic>& sample) {
std::cout << " subscriber 1 received: " << sample->counter << std::endl;
});
}
if (event->doesOriginateFrom(&subscriber2))
{
subscriber2.releaseQueuedSamples();
std::cout << "subscriber 2 received something - dont care\n";
}
Let's say we have SomeClass
and would like to execute a cyclic static method cyclicRun
every second. We could execute any arbitrary algorithm in there
but for now we just print activation callback
. The class could look like
class SomeClass
{
public:
static void cyclicRun(iox::popo::UserTrigger*)
{
std::cout << "activation callback\n";
trigger->resetTrigger();
}
};
Important We need to reset the user trigger otherwise the WaitSet would notify us immediately again since the user trigger is state based.
We begin as always, by creating a WaitSet with the default capacity and by
attaching the shutdownTrigger
to
it. In this case we do not set an event id when calling attachEvent
which means
the default event id EventInfo::INVALID_ID
is set.
iox::popo::WaitSet<> waitset;
// attach shutdownTrigger to handle CTRL+C
waitset.attachEvent(shutdownTrigger);
After that we require a cyclicTrigger
to trigger our
cyclicRun
every second. Therefore, we attach it to the waitset
with
eventId 0
and the callback SomeClass::cyclicRun
iox::popo::UserTrigger cyclicTrigger;
waitset.attachEvent(cyclicTrigger, 0U, SomeClass::cyclicRun);
The next thing we need is something which will trigger our cyclicTrigger
every second. We use a infinite loop packed inside of a thread.
std::thread cyclicTriggerThread([&] {
while (keepRunning.load())
{
std::this_thread::sleep_for(std::chrono::seconds(1));
cyclicTrigger.trigger();
}
});
Everything is set up and we can implement the event loop. As usual we handle
CTRL-c
which is indicated by the shutdownTrigger
.
while (true)
{
auto eventVector = waitset.wait();
for (auto& event : eventVector)
{
if (event->doesOriginateFrom(&shutdownTrigger))
{
keepRunning.store(false);
}
The cyclicTrigger
callback is called in the else part.
else
{
(*event)();
}
In this example we describe how you would implement a Triggerable class which
can be attached to a WaitSet. Our class in this example will be called
MyTriggerClass
and it signals the WaitSet two events.
The PERFORMED_ACTION
event which is triggered whenever the method performAction
is called and the
ACTIVATE
event which is triggered when activate
is called with an activationCode
.
At the moment the WaitSet does not support Triggerable classes which are movable
or copyable. This is caused by the resetCallback
and the hasEventCallback
which are pointing to the Triggerable. After a move the callbacks inside of the WaitSet
would point to the wrong memory location and a copy could lead to an unattached object
if there is no more space left in the WaitSet. Therefore we have to delete the move
and copy operations for now.
MyTriggerClass(const MyTriggerClass&) = delete;
MyTriggerClass(MyTriggerClass&&) = delete;
MyTriggerClass& operator=(const MyTriggerClass&) = delete;
MyTriggerClass& operator=(MyTriggerClass&&) = delete;
The class implementation of these two methods could look like the following.
class MyTriggerClass
{
public:
void activate(const int activationCode) noexcept
{
m_activationCode = activationCode;
m_isActivated = true;
m_activateTrigger.trigger();
}
void performAction() noexcept
{
m_hasPerformedAction = true;
m_actionTrigger.trigger();
}
As you can see we perform some internal action and when they are finished we signal the corresponding Trigger via our stored TriggerHandle that we performed the task. Internally we just set a boolean to signal that the method was called.
Every Trigger requires a corresponding class method which returns a boolean
stating if the Trigger was actually triggered or not. In our case these are
the two const methods hasPerformedAction
and isActivated
.
bool hasPerformedAction() const noexcept
{
return m_hasPerformedAction;
}
bool isActivated() const noexcept
{
return m_isActivated;
}
The method enableEvent
attaches our class to a WaitSet but the user has
to specify which event they would like to attach. Additionally, they can
set a eventId
and a callback
.
If the parameter event was set to PERFORMED_ACTION
we call acquireTriggerHandle
and the waitset which will return an cxx::expected
. The following parameters
have to be provided.
- The origin of the trigger, e.g.
this
- A method which can be called by the trigger to ask if it was triggered.
- A method which resets the trigger. Used when the WaitSet goes out of scope.
- The id of the event.
- A callback with the signature
void (MyTriggerClass * )
.
iox::cxx::expected<iox::popo::WaitSetError>
enableEvent(iox::popo::WaitSet<>& waitset,
const MyTriggerClassEvents event,
const uint64_t eventId,
const iox::popo::Trigger::Callback<MyTriggerClass> callback) noexcept
{
switch (event)
{
case MyTriggerClassEvents::PERFORMED_ACTION:
{
return waitset
.acquireTriggerHandle(this,
{*this, &MyTriggerClass::hasPerformedAction},
{*this, &MyTriggerClass::disableEvent},
eventId,
callback)
.and_then([this](iox::popo::TriggerHandle& trigger) {
m_actionTrigger = std::move(trigger); });
}
When the parameter event has the value ACTIVATE
we use the isActivated
method
for the trigger.
case MyTriggerClassEvents::ACTIVATE:
{
return waitset
.acquireTriggerHandle(this,
{*this, &MyTriggerClass::isActivated},
{*this, &MyTriggerClass::disableEvent},
eventId,
callback)
.and_then([this](iox::popo::TriggerHandle& trigger) {
m_activateTrigger = std::move(trigger); });
}
The next thing on our checklist is the disableEvent
method used by the WaitSet
to reset the Trigger when it goes out of scope. Therefore we look up the
correct unique trigger id first and then invalidate
it.
void disableEvent(const uint64_t uniqueEventId)
{
if (m_actionTrigger.getUniqueId() == uniqueEventId)
{
m_actionTrigger.invalidate();
}
else if (m_activateTrigger.getUniqueId() == uniqueEventId)
{
m_activateTrigger.invalidate();
}
}
The next thing we define is a free function, our eventLoop
, which will handle
all events of our waitset. The action is for every trigger the same, resetting
the MyTriggerClass
event and then call the callback which is attached to the
trigger.
void eventLoop()
{
while (true)
{
auto triggerStateVector = waitset->wait();
for (auto& triggerState : triggerStateVector)
{
if (triggerState->getEventId() == ACTIVATE_ID)
{
triggerState->getOrigin<MyTriggerClass>()->reset(MyTriggerClassEvents::ACTIVATE);
(*triggerState)();
}
else if (triggerState->getEventId() == ACTION_ID)
{
triggerState->getOrigin<MyTriggerClass>()->reset(MyTriggerClassEvents::PERFORMED_ACTION);
(*triggerState)();
}
}
}
}
We start like in every other example by creating the waitset
first. In this
case the waitset
and the triggerClass
are stored inside of two global
optional
's and have to be created with an emplace
call.
waitset.emplace();
triggerClass.emplace();
After that we can attach both triggerClass
events to the waitset and provide
also a callback for them.
waitset->attachEvent(*triggerClass, MyTriggerClassEvents::ACTIVATE, ACTIVATE_ID, callOnActivate);
waitset->attachEvent(
*triggerClass, MyTriggerClassEvents::PERFORMED_ACTION, ACTION_ID, MyTriggerClass::callOnAction);
Now that everything is set up we can start our eventLoop
in a new thread.
std::thread eventLoopThread(eventLoop);
A thread which will trigger an event every second is started with the following lines.
std::thread triggerThread([&] {
int activationCode = 1;
for (auto i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
triggerClass->activate(activationCode++);
std::this_thread::sleep_for(std::chrono::seconds(1));
triggerClass->performAction();
}
});