Skip to content

Commit

Permalink
runtime: Add CallbackResult return value to all action operator()
Browse files Browse the repository at this point in the history
BREAKING CHANGE: If you have implemented any Actions yourself,
you will need to make sure the `operator()(const Sync& ...)`
returns `cloe::CallbackResult`. You can use `CallbackResult::Ok`
to achieve the same behavior.
  • Loading branch information
cassava committed Jul 5, 2024
1 parent 30b05f5 commit 51f3c0a
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 140 deletions.
58 changes: 35 additions & 23 deletions engine/src/coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class TriggerRegistrar : public cloe::TriggerRegistrar {
return manager_.make_trigger(source_, c);
}

// TODO: Should these queue_trigger becomes inserts? Because if they are coming from
// C++ then they should be from a single thread.
void insert_trigger(const Conf& c) override { manager_.queue_trigger(source_, c); }
void insert_trigger(TriggerPtr&& t) override { manager_.queue_trigger(std::move(t)); }

Expand Down Expand Up @@ -168,44 +170,54 @@ void Coordinator::register_event(const std::string& key, EventFactoryPtr&& ef,
std::bind(&Coordinator::execute_trigger, this, std::placeholders::_1, std::placeholders::_2));
}

void Coordinator::execute_trigger(TriggerPtr&& t, const Sync& sync) {
cloe::CallbackResult Coordinator::execute_trigger(TriggerPtr&& t, const Sync& sync) {
logger()->debug("Execute trigger {}", inline_json(*t));
(t->action())(sync, *executer_registrar_);
auto result = (t->action())(sync, *executer_registrar_);
if (!t->is_conceal()) {
history_.emplace_back(HistoryTrigger{sync.time(), std::move(t)});
history_.emplace_back(sync.time(), std::move(t));
}
return result;
}

Duration Coordinator::process(const Sync& sync) {
size_t Coordinator::process_pending_web_triggers(const Sync& sync) {
// The only thing we need to do here is distribute the triggers from the
// input queue into their respective storage locations. We are responsible
// for thread safety here!
auto now = sync.time();
size_t count = 0;
std::unique_lock guard(input_mutex_);
while (!input_queue_.empty()) {
auto tp = std::move(input_queue_.front());
store_trigger(std::move(input_queue_.front()), sync);
input_queue_.pop_front();
tp->set_since(now);
count++;
}
return count;
}

// Decide where to put the trigger
auto key = tp->event().name();
if (storage_.count(key) == 0) {
// This is a programming error, since we should not be able to come this
// far at all.
throw std::logic_error("cannot insert trigger with unregistered event");
}
try {
logger()->debug("Insert trigger {}", inline_json(*tp));
storage_[key]->emplace(std::move(tp), sync);
} catch (TriggerError& e) {
logger()->error("Error inserting trigger: {}", e.what());
if (!allow_errors_) {
throw;
}
void Coordinator::store_trigger(TriggerPtr&& tp, const Sync& sync) {
tp->set_since(sync.time());

// Decide where to put the trigger
auto key = tp->event().name();
if (storage_.count(key) == 0) {
// This is a programming error, since we should not be able to come this
// far at all.
throw std::logic_error("cannot insert trigger with unregistered event");
}
try {
logger()->debug("Insert trigger {}", inline_json(*tp));
storage_[key]->emplace(std::move(tp), sync);
} catch (TriggerError& e) {
logger()->error("Error inserting trigger: {}", e.what());
if (!allow_errors_) {
throw;
}
}
}

return now;
Duration Coordinator::process(const Sync& sync) {
auto now = sync.time();
process_pending_web_triggers(sync);
return sync.time();
}

namespace {
Expand Down
30 changes: 14 additions & 16 deletions engine/src/coordinator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,21 @@

#pragma once

#include <list> // for list<>
#include <map> // for map<>
#include <memory> // for unique_ptr<>, shared_ptr<>
#include <mutex> // for mutex
#include <queue> // for queue<>
#include <string> // for string
#include <vector> // for vector<>
#include <list> // for list<>
#include <map> // for map<>
#include <memory> // for unique_ptr<>, shared_ptr<>
#include <mutex> // for mutex
#include <queue> // for queue<>
#include <string> // for string
#include <vector> // for vector<>

#include <cloe/trigger.hpp> // for Trigger, Action, Event, ...

// Forward declaration:
namespace cloe {
class Registrar;
}
#include <cloe/cloe_fwd.hpp> // for Registrar
#include <cloe/trigger.hpp> // for Trigger, Action, Event, ...

namespace engine {

// Forward declarations:
class TriggerRegistrar; // from trigger_manager.cpp
class TriggerRegistrar; // from coordinator.cpp

/**
* TriggerUnknownAction is thrown when an Action cannot be created because the
Expand Down Expand Up @@ -120,13 +116,15 @@ class Coordinator {
*/
cloe::Duration process(const cloe::Sync&);

size_t process_pending_web_triggers(const cloe::Sync& sync);
protected:
cloe::ActionPtr make_action(const cloe::Conf& c) const;
cloe::EventPtr make_event(const cloe::Conf& c) const;
cloe::TriggerPtr make_trigger(cloe::Source s, const cloe::Conf& c) const;
void queue_trigger(cloe::Source s, const cloe::Conf& c) { queue_trigger(make_trigger(s, c)); }
void queue_trigger(cloe::TriggerPtr&& t);
void execute_trigger(cloe::TriggerPtr&& t, const cloe::Sync& s);
void queue_trigger(cloe::TriggerPtr&& tp);
void store_trigger(cloe::TriggerPtr&& tp, const cloe::Sync& sync);
cloe::CallbackResult execute_trigger(cloe::TriggerPtr&& tp, const cloe::Sync& sync);

// for access to protected methods
friend TriggerRegistrar;
Expand Down
2 changes: 1 addition & 1 deletion engine/src/utility/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void CommandExecuter::wait_all() {

namespace actions {

void Command::operator()(const cloe::Sync&, cloe::TriggerRegistrar&) { executer_->run(command_); }
cloe::CallbackResult Command::operator()(const cloe::Sync&, cloe::TriggerRegistrar&) { executer_->run(command_); return cloe::CallbackResult::Ok; }

void Command::to_json(cloe::Json& j) const { command_.to_json(j); }

Expand Down
2 changes: 1 addition & 1 deletion engine/src/utility/command.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Command : public cloe::Action {
return std::make_unique<Command>(name(), command_, executer_);
}

void operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override;
cloe::CallbackResult operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override;

protected:
void to_json(cloe::Json& j) const override;
Expand Down
3 changes: 2 additions & 1 deletion optional/vtd/src/scp_action.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ class ScpAction : public cloe::Action {
ScpAction(const std::string& name, std::shared_ptr<ScpTransceiver> scp_client, const std::string& msg)
: cloe::Action(name), client_(scp_client), xml_(msg) {}
cloe::ActionPtr clone() const override { return std::make_unique<ScpAction>(name(), client_, xml_); }
void operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override {
cloe::CallbackResult operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override {
logger()->info("Sending SCP message: {}", xml_);
client_->send(xml_);
return cloe::CallbackResult::Ok;
}
bool is_significant() const override { return false; }

Expand Down
2 changes: 1 addition & 1 deletion plugins/basic/src/hmi_contact.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class UseContact : public Action {
UseContact(const std::string& name, ContactMap<D>* m, const Conf& data)
: Action(name), hmi_(m), data_(data) {}
ActionPtr clone() const override { return std::make_unique<UseContact<D>>(name(), hmi_, data_); }
void operator()(const Sync&, TriggerRegistrar&) override { from_json(*data_, *hmi_); }
CallbackResult operator()(const Sync&, TriggerRegistrar&) override { from_json(*data_, *hmi_); return CallbackResult::Ok; }

protected:
void to_json(Json& j) const override { j = *data_; }
Expand Down
5 changes: 4 additions & 1 deletion runtime/include/cloe/conf/action.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class Configure : public Action {
conf_.erase("name");
}
ActionPtr clone() const override { return std::make_unique<Configure>(name(), ptr_, conf_); }
void operator()(const Sync&, TriggerRegistrar&) override { ptr_->from_conf(conf_); }
CallbackResult operator()(const Sync&, TriggerRegistrar&) override {
ptr_->from_conf(conf_);
return CallbackResult::Ok;
}

protected:
void to_json(Json& j) const override { j = *conf_; }
Expand Down
5 changes: 4 additions & 1 deletion runtime/include/cloe/registrar.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ class DirectCallback : public Callback {
auto& condition = dynamic_cast<E&>((*it)->event());
if (condition(sync, args...)) {
if ((*it)->is_sticky()) {
this->execute((*it)->clone(), sync);
auto result = this->execute((*it)->clone(), sync);
if (result == CallbackResult::Unpin) {
it = triggers_.erase(it);
}
} else {
// Remove from trigger list and advance.
this->execute(std::move(*it), sync);
Expand Down
14 changes: 11 additions & 3 deletions runtime/include/cloe/trigger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,18 @@ class Event : public Entity {
using EventFactory = TriggerFactory<Event>;
using EventFactoryPtr = std::unique_ptr<EventFactory>;

enum class CallbackResult {
/// Default, use standard behavior.
Ok,

/// Remove from callback if it was sticky.
Unpin,
};

/**
* Interface the trigger manager must provide for executing triggers.
*/
using CallbackExecuter = std::function<void(TriggerPtr&&, const Sync&)>;
using CallbackExecuter = std::function<CallbackResult(TriggerPtr&&, const Sync&)>;

/**
* Callback provides the interface with which the global trigger manager,
Expand Down Expand Up @@ -555,7 +563,7 @@ class Callback {
* Execute a trigger in the given sync context by passing it to the
* executer.
*/
void execute(TriggerPtr&& t, const Sync& s);
CallbackResult execute(TriggerPtr&& t, const Sync& s);

private:
CallbackExecuter executer_;
Expand Down Expand Up @@ -619,7 +627,7 @@ class Action : public Entity {
/**
* Execute the action.
*/
virtual void operator()(const Sync&, TriggerRegistrar&) = 0;
virtual CallbackResult operator()(const Sync&, TriggerRegistrar&) = 0;

/**
* Return whether this action is a significant action.
Expand Down
11 changes: 7 additions & 4 deletions runtime/include/cloe/trigger/example_actions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ class Log : public Action {
Log(const std::string& name, LogLevel level, const std::string& msg)
: Action(name), level_(level), msg_(msg) {}
ActionPtr clone() const override { return std::make_unique<Log>(name(), level_, msg_); }
void operator()(const Sync&, TriggerRegistrar&) override { logger()->log(level_, msg_.c_str()); }
CallbackResult operator()(const Sync&, TriggerRegistrar&) override {
logger()->log(level_, msg_.c_str());
return CallbackResult::Ok;
}
bool is_significant() const override { return false; }

protected:
Expand Down Expand Up @@ -71,7 +74,7 @@ class Bundle : public Action {
public:
Bundle(const std::string& name, std::vector<ActionPtr>&& actions);
ActionPtr clone() const override;
void operator()(const Sync& s, TriggerRegistrar& r) override;
CallbackResult operator()(const Sync& s, TriggerRegistrar& r) override;
bool is_significant() const override;

protected:
Expand Down Expand Up @@ -103,7 +106,7 @@ class Insert : public Action {
public:
Insert(const std::string& name, const Conf& triggers) : Action(name), triggers_(triggers) {}
ActionPtr clone() const override { return std::make_unique<Insert>(name(), triggers_); }
void operator()(const Sync& s, TriggerRegistrar& r) override;
CallbackResult operator()(const Sync& s, TriggerRegistrar& r) override;

protected:
void to_json(Json& j) const override;
Expand Down Expand Up @@ -138,7 +141,7 @@ class PushRelease : public Action {
return std::make_unique<PushRelease>(name(), duration_, push_->clone(), release_->clone(),
repr_);
}
void operator()(const Sync&, TriggerRegistrar&) override;
CallbackResult operator()(const Sync&, TriggerRegistrar&) override;

protected:
void to_json(Json& j) const override { j = repr_; }
Expand Down
Loading

0 comments on commit 51f3c0a

Please sign in to comment.