diff --git a/engine/src/coordinator.cpp b/engine/src/coordinator.cpp index 844261f23..10bc74bad 100644 --- a/engine/src/coordinator.cpp +++ b/engine/src/coordinator.cpp @@ -62,6 +62,8 @@ class TriggerRegistrar : public cloe::TriggerRegistrar { return manager_.make_trigger(source_, c); } + // TODO: Should these queue_trigger becomes inserts? Because if they are coming from + // C++ then they should be from a single thread. void insert_trigger(const Conf& c) override { manager_.queue_trigger(source_, c); } void insert_trigger(TriggerPtr&& t) override { manager_.queue_trigger(std::move(t)); } @@ -168,44 +170,54 @@ void Coordinator::register_event(const std::string& key, EventFactoryPtr&& ef, std::bind(&Coordinator::execute_trigger, this, std::placeholders::_1, std::placeholders::_2)); } -void Coordinator::execute_trigger(TriggerPtr&& t, const Sync& sync) { +cloe::CallbackResult Coordinator::execute_trigger(TriggerPtr&& t, const Sync& sync) { logger()->debug("Execute trigger {}", inline_json(*t)); - (t->action())(sync, *executer_registrar_); + auto result = (t->action())(sync, *executer_registrar_); if (!t->is_conceal()) { - history_.emplace_back(HistoryTrigger{sync.time(), std::move(t)}); + history_.emplace_back(sync.time(), std::move(t)); } + return result; } -Duration Coordinator::process(const Sync& sync) { +size_t Coordinator::process_pending_web_triggers(const Sync& sync) { // The only thing we need to do here is distribute the triggers from the // input queue into their respective storage locations. We are responsible // for thread safety here! - auto now = sync.time(); + size_t count = 0; std::unique_lock guard(input_mutex_); while (!input_queue_.empty()) { - auto tp = std::move(input_queue_.front()); + store_trigger(std::move(input_queue_.front()), sync); input_queue_.pop_front(); - tp->set_since(now); + count++; + } + return count; +} - // Decide where to put the trigger - auto key = tp->event().name(); - if (storage_.count(key) == 0) { - // This is a programming error, since we should not be able to come this - // far at all. - throw std::logic_error("cannot insert trigger with unregistered event"); - } - try { - logger()->debug("Insert trigger {}", inline_json(*tp)); - storage_[key]->emplace(std::move(tp), sync); - } catch (TriggerError& e) { - logger()->error("Error inserting trigger: {}", e.what()); - if (!allow_errors_) { - throw; - } +void Coordinator::store_trigger(TriggerPtr&& tp, const Sync& sync) { + tp->set_since(sync.time()); + + // Decide where to put the trigger + auto key = tp->event().name(); + if (storage_.count(key) == 0) { + // This is a programming error, since we should not be able to come this + // far at all. + throw std::logic_error("cannot insert trigger with unregistered event"); + } + try { + logger()->debug("Insert trigger {}", inline_json(*tp)); + storage_[key]->emplace(std::move(tp), sync); + } catch (TriggerError& e) { + logger()->error("Error inserting trigger: {}", e.what()); + if (!allow_errors_) { + throw; } } +} - return now; +Duration Coordinator::process(const Sync& sync) { + auto now = sync.time(); + process_pending_web_triggers(sync); + return sync.time(); } namespace { diff --git a/engine/src/coordinator.hpp b/engine/src/coordinator.hpp index 00a538d70..a52452fd2 100644 --- a/engine/src/coordinator.hpp +++ b/engine/src/coordinator.hpp @@ -22,25 +22,21 @@ #pragma once -#include // for list<> -#include // for map<> -#include // for unique_ptr<>, shared_ptr<> -#include // for mutex -#include // for queue<> -#include // for string -#include // for vector<> +#include // for list<> +#include // for map<> +#include // for unique_ptr<>, shared_ptr<> +#include // for mutex +#include // for queue<> +#include // for string +#include // for vector<> -#include // for Trigger, Action, Event, ... - -// Forward declaration: -namespace cloe { -class Registrar; -} +#include // for Registrar +#include // for Trigger, Action, Event, ... namespace engine { // Forward declarations: -class TriggerRegistrar; // from trigger_manager.cpp +class TriggerRegistrar; // from coordinator.cpp /** * TriggerUnknownAction is thrown when an Action cannot be created because the @@ -120,13 +116,15 @@ class Coordinator { */ cloe::Duration process(const cloe::Sync&); + size_t process_pending_web_triggers(const cloe::Sync& sync); protected: cloe::ActionPtr make_action(const cloe::Conf& c) const; cloe::EventPtr make_event(const cloe::Conf& c) const; cloe::TriggerPtr make_trigger(cloe::Source s, const cloe::Conf& c) const; void queue_trigger(cloe::Source s, const cloe::Conf& c) { queue_trigger(make_trigger(s, c)); } - void queue_trigger(cloe::TriggerPtr&& t); - void execute_trigger(cloe::TriggerPtr&& t, const cloe::Sync& s); + void queue_trigger(cloe::TriggerPtr&& tp); + void store_trigger(cloe::TriggerPtr&& tp, const cloe::Sync& sync); + cloe::CallbackResult execute_trigger(cloe::TriggerPtr&& tp, const cloe::Sync& sync); // for access to protected methods friend TriggerRegistrar; diff --git a/engine/src/utility/command.cpp b/engine/src/utility/command.cpp index 6de1a7586..89596e549 100644 --- a/engine/src/utility/command.cpp +++ b/engine/src/utility/command.cpp @@ -128,7 +128,7 @@ void CommandExecuter::wait_all() { namespace actions { -void Command::operator()(const cloe::Sync&, cloe::TriggerRegistrar&) { executer_->run(command_); } +cloe::CallbackResult Command::operator()(const cloe::Sync&, cloe::TriggerRegistrar&) { executer_->run(command_); return cloe::CallbackResult::Ok; } void Command::to_json(cloe::Json& j) const { command_.to_json(j); } diff --git a/engine/src/utility/command.hpp b/engine/src/utility/command.hpp index eb4b4c373..41789aca4 100644 --- a/engine/src/utility/command.hpp +++ b/engine/src/utility/command.hpp @@ -86,7 +86,7 @@ class Command : public cloe::Action { return std::make_unique(name(), command_, executer_); } - void operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override; + cloe::CallbackResult operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override; protected: void to_json(cloe::Json& j) const override; diff --git a/optional/vtd/src/scp_action.hpp b/optional/vtd/src/scp_action.hpp index 930a76320..1184f63e2 100644 --- a/optional/vtd/src/scp_action.hpp +++ b/optional/vtd/src/scp_action.hpp @@ -35,9 +35,10 @@ class ScpAction : public cloe::Action { ScpAction(const std::string& name, std::shared_ptr scp_client, const std::string& msg) : cloe::Action(name), client_(scp_client), xml_(msg) {} cloe::ActionPtr clone() const override { return std::make_unique(name(), client_, xml_); } - void operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override { + cloe::CallbackResult operator()(const cloe::Sync&, cloe::TriggerRegistrar&) override { logger()->info("Sending SCP message: {}", xml_); client_->send(xml_); + return cloe::CallbackResult::Ok; } bool is_significant() const override { return false; } diff --git a/plugins/basic/src/hmi_contact.hpp b/plugins/basic/src/hmi_contact.hpp index 5a0b166a5..ef3f132ec 100644 --- a/plugins/basic/src/hmi_contact.hpp +++ b/plugins/basic/src/hmi_contact.hpp @@ -207,7 +207,7 @@ class UseContact : public Action { UseContact(const std::string& name, ContactMap* m, const Conf& data) : Action(name), hmi_(m), data_(data) {} ActionPtr clone() const override { return std::make_unique>(name(), hmi_, data_); } - void operator()(const Sync&, TriggerRegistrar&) override { from_json(*data_, *hmi_); } + CallbackResult operator()(const Sync&, TriggerRegistrar&) override { from_json(*data_, *hmi_); return CallbackResult::Ok; } protected: void to_json(Json& j) const override { j = *data_; } diff --git a/runtime/include/cloe/conf/action.hpp b/runtime/include/cloe/conf/action.hpp index 81c4eed3b..ee8966fc4 100644 --- a/runtime/include/cloe/conf/action.hpp +++ b/runtime/include/cloe/conf/action.hpp @@ -39,7 +39,10 @@ class Configure : public Action { conf_.erase("name"); } ActionPtr clone() const override { return std::make_unique(name(), ptr_, conf_); } - void operator()(const Sync&, TriggerRegistrar&) override { ptr_->from_conf(conf_); } + CallbackResult operator()(const Sync&, TriggerRegistrar&) override { + ptr_->from_conf(conf_); + return CallbackResult::Ok; + } protected: void to_json(Json& j) const override { j = *conf_; } diff --git a/runtime/include/cloe/registrar.hpp b/runtime/include/cloe/registrar.hpp index 2cf4f7bc1..f96065318 100644 --- a/runtime/include/cloe/registrar.hpp +++ b/runtime/include/cloe/registrar.hpp @@ -55,7 +55,10 @@ class DirectCallback : public Callback { auto& condition = dynamic_cast((*it)->event()); if (condition(sync, args...)) { if ((*it)->is_sticky()) { - this->execute((*it)->clone(), sync); + auto result = this->execute((*it)->clone(), sync); + if (result == CallbackResult::Unpin) { + it = triggers_.erase(it); + } } else { // Remove from trigger list and advance. this->execute(std::move(*it), sync); diff --git a/runtime/include/cloe/trigger.hpp b/runtime/include/cloe/trigger.hpp index 17ad22f13..af832fc72 100644 --- a/runtime/include/cloe/trigger.hpp +++ b/runtime/include/cloe/trigger.hpp @@ -507,10 +507,18 @@ class Event : public Entity { using EventFactory = TriggerFactory; using EventFactoryPtr = std::unique_ptr; +enum class CallbackResult { + /// Default, use standard behavior. + Ok, + + /// Remove from callback if it was sticky. + Unpin, +}; + /** * Interface the trigger manager must provide for executing triggers. */ -using CallbackExecuter = std::function; +using CallbackExecuter = std::function; /** * Callback provides the interface with which the global trigger manager, @@ -555,7 +563,7 @@ class Callback { * Execute a trigger in the given sync context by passing it to the * executer. */ - void execute(TriggerPtr&& t, const Sync& s); + CallbackResult execute(TriggerPtr&& t, const Sync& s); private: CallbackExecuter executer_; @@ -619,7 +627,7 @@ class Action : public Entity { /** * Execute the action. */ - virtual void operator()(const Sync&, TriggerRegistrar&) = 0; + virtual CallbackResult operator()(const Sync&, TriggerRegistrar&) = 0; /** * Return whether this action is a significant action. diff --git a/runtime/include/cloe/trigger/example_actions.hpp b/runtime/include/cloe/trigger/example_actions.hpp index 3161c26e4..cacded8d6 100644 --- a/runtime/include/cloe/trigger/example_actions.hpp +++ b/runtime/include/cloe/trigger/example_actions.hpp @@ -41,7 +41,10 @@ class Log : public Action { Log(const std::string& name, LogLevel level, const std::string& msg) : Action(name), level_(level), msg_(msg) {} ActionPtr clone() const override { return std::make_unique(name(), level_, msg_); } - void operator()(const Sync&, TriggerRegistrar&) override { logger()->log(level_, msg_.c_str()); } + CallbackResult operator()(const Sync&, TriggerRegistrar&) override { + logger()->log(level_, msg_.c_str()); + return CallbackResult::Ok; + } bool is_significant() const override { return false; } protected: @@ -71,7 +74,7 @@ class Bundle : public Action { public: Bundle(const std::string& name, std::vector&& actions); ActionPtr clone() const override; - void operator()(const Sync& s, TriggerRegistrar& r) override; + CallbackResult operator()(const Sync& s, TriggerRegistrar& r) override; bool is_significant() const override; protected: @@ -103,7 +106,7 @@ class Insert : public Action { public: Insert(const std::string& name, const Conf& triggers) : Action(name), triggers_(triggers) {} ActionPtr clone() const override { return std::make_unique(name(), triggers_); } - void operator()(const Sync& s, TriggerRegistrar& r) override; + CallbackResult operator()(const Sync& s, TriggerRegistrar& r) override; protected: void to_json(Json& j) const override; @@ -138,7 +141,7 @@ class PushRelease : public Action { return std::make_unique(name(), duration_, push_->clone(), release_->clone(), repr_); } - void operator()(const Sync&, TriggerRegistrar&) override; + CallbackResult operator()(const Sync&, TriggerRegistrar&) override; protected: void to_json(Json& j) const override { j = repr_; } diff --git a/runtime/include/cloe/trigger/set_action.hpp b/runtime/include/cloe/trigger/set_action.hpp index c65cd699f..4f07072e9 100644 --- a/runtime/include/cloe/trigger/set_action.hpp +++ b/runtime/include/cloe/trigger/set_action.hpp @@ -68,7 +68,10 @@ class SetVariableAction : public Action { ActionPtr clone() const override { return std::make_unique(name(), data_name_, data_ptr_, value_); } - void operator()(const Sync&, TriggerRegistrar&) override { *data_ptr_ = value_; } + CallbackResult operator()(const Sync&, TriggerRegistrar&) override { + *data_ptr_ = value_; + return CallbackResult::Ok; + } bool is_significant() const override { return false; } void to_json(Json& j) const override { j = Json{ @@ -138,34 +141,37 @@ class SetVariableActionFactory : public ActionFactory { * * This action can be registered with the `register_action` helper function. */ -#define DEFINE_SET_STATE_ACTION(xName, xname, xdescription, xState, xOperatorBlock) \ - class xName : public ::cloe::Action { \ - public: \ - xName(const std::string& name, xState* ptr) : ::cloe::Action(name), ptr_(ptr) {} \ - ::cloe::ActionPtr clone() const override { return std::make_unique(name(), ptr_); } \ - void operator()(const ::cloe::Sync&, ::cloe::TriggerRegistrar&) override { xOperatorBlock } \ - void to_json(::cloe::Json&) const override {} \ - \ - private: \ - xState* ptr_; \ - }; \ - \ - class _X_FACTORY(xName) : public ::cloe::ActionFactory { \ - public: \ - using ActionType = xName; \ - \ - _X_FACTORY(xName)(xState * ptr) : ::cloe::ActionFactory(xname, xdescription), ptr_(ptr) {} \ - \ - ::cloe::ActionPtr make(const ::cloe::Conf&) const override { \ - return std::make_unique(name(), ptr_); \ - } \ - \ - ::cloe::ActionPtr make(const std::string&) const override { \ - return std::make_unique(name(), ptr_); \ - } \ - \ - private: \ - xState* ptr_; \ +#define DEFINE_SET_STATE_ACTION(xName, xname, xdescription, xState, xOperatorBlock) \ + class xName : public ::cloe::Action { \ + public: \ + xName(const std::string& name, xState* ptr) : ::cloe::Action(name), ptr_(ptr) {} \ + ::cloe::ActionPtr clone() const override { return std::make_unique(name(), ptr_); } \ + ::cloe::CallbackResult operator()(const ::cloe::Sync&, ::cloe::TriggerRegistrar&) override { \ + xOperatorBlock; \ + return ::cloe::CallbackResult::Ok; \ + } \ + void to_json(::cloe::Json&) const override {} \ + \ + private: \ + xState* ptr_; \ + }; \ + \ + class _X_FACTORY(xName) : public ::cloe::ActionFactory { \ + public: \ + using ActionType = xName; \ + \ + _X_FACTORY(xName)(xState * ptr) : ::cloe::ActionFactory(xname, xdescription), ptr_(ptr) {} \ + \ + ::cloe::ActionPtr make(const ::cloe::Conf&) const override { \ + return std::make_unique(name(), ptr_); \ + } \ + \ + ::cloe::ActionPtr make(const std::string&) const override { \ + return std::make_unique(name(), ptr_); \ + } \ + \ + private: \ + xState* ptr_; \ }; /** @@ -203,51 +209,54 @@ class SetVariableActionFactory : public ActionFactory { * This action can be registered with the `register_action` helper function. * Refer to doc/reference/actions.rst for the configuration. */ -#define DEFINE_SET_DATA_ACTION(xName, xActionName, xActionDesc, xDataType, xAttributeName, \ - xAttributeType, xOperatorBlock) \ - class xName : public ::cloe::Action { \ - public: \ - xName(const std::string& action_name, xDataType* ptr, const std::string& attribute_name, \ - const xAttributeType attribute_value) \ - : ::cloe::Action(action_name) \ - , ptr_(ptr) \ - , name_(attribute_name) \ - , value_(attribute_value) {} \ - ::cloe::ActionPtr clone() const override { \ - return std::make_unique(name(), ptr_, name_, value_); \ - } \ - void operator()(const ::cloe::Sync&, ::cloe::TriggerRegistrar&) override { xOperatorBlock } \ - bool is_significant() const override { return false; } \ - void to_json(::cloe::Json& j) const override { \ - j = ::fable::Json{ \ - {name_, value_}, \ - }; \ - } \ - \ - private: \ - xDataType* ptr_; \ - std::string name_; \ - xAttributeType value_; \ - }; \ - \ - class _X_FACTORY(xName) : public ::cloe::ActionFactory { \ - public: \ - using ActionType = xName; \ - _X_FACTORY(xName) \ - (xDataType * ptr) : ::cloe::ActionFactory(xActionName, xActionDesc), ptr_(ptr) {} \ - \ - ::cloe::ActionPtr make(const ::cloe::Conf& c) const override { \ - auto value = c.get(xAttributeName); \ - return std::make_unique(name(), ptr_, xAttributeName, value); \ - } \ - \ - ::cloe::ActionPtr make(const std::string& s) const override { \ - auto value = ::cloe::actions::from_string(s); \ - return make(::fable::Conf{::fable::Json{ \ - {xAttributeName, value}, \ - }}); \ - } \ - \ - private: \ - xDataType* ptr_; \ +#define DEFINE_SET_DATA_ACTION(xName, xActionName, xActionDesc, xDataType, xAttributeName, \ + xAttributeType, xOperatorBlock) \ + class xName : public ::cloe::Action { \ + public: \ + xName(const std::string& action_name, xDataType* ptr, const std::string& attribute_name, \ + const xAttributeType attribute_value) \ + : ::cloe::Action(action_name) \ + , ptr_(ptr) \ + , name_(attribute_name) \ + , value_(attribute_value) {} \ + ::cloe::ActionPtr clone() const override { \ + return std::make_unique(name(), ptr_, name_, value_); \ + } \ + ::cloe::CallbackResult operator()(const ::cloe::Sync&, ::cloe::TriggerRegistrar&) override { \ + xOperatorBlock; \ + return ::cloe::CallbackResult::Ok; \ + } \ + bool is_significant() const override { return false; } \ + void to_json(::cloe::Json& j) const override { \ + j = ::fable::Json{ \ + {name_, value_}, \ + }; \ + } \ + \ + private: \ + xDataType* ptr_; \ + std::string name_; \ + xAttributeType value_; \ + }; \ + \ + class _X_FACTORY(xName) : public ::cloe::ActionFactory { \ + public: \ + using ActionType = xName; \ + _X_FACTORY(xName) \ + (xDataType * ptr) : ::cloe::ActionFactory(xActionName, xActionDesc), ptr_(ptr) {} \ + \ + ::cloe::ActionPtr make(const ::cloe::Conf& c) const override { \ + auto value = c.get(xAttributeName); \ + return std::make_unique(name(), ptr_, xAttributeName, value); \ + } \ + \ + ::cloe::ActionPtr make(const std::string& s) const override { \ + auto value = ::cloe::actions::from_string(s); \ + return make(::fable::Conf{::fable::Json{ \ + {xAttributeName, value}, \ + }}); \ + } \ + \ + private: \ + xDataType* ptr_; \ }; diff --git a/runtime/src/cloe/trigger.cpp b/runtime/src/cloe/trigger.cpp index e5677e4fd..244a46330 100644 --- a/runtime/src/cloe/trigger.cpp +++ b/runtime/src/cloe/trigger.cpp @@ -108,9 +108,9 @@ void TriggerRegistrar::insert_trigger(const std::string& label, EventPtr&& e, Ac insert_trigger(std::make_unique(label, source_, std::move(e), std::move(a))); } -void Callback::execute(TriggerPtr&& t, const Sync& sync) { +CallbackResult Callback::execute(TriggerPtr&& t, const Sync& sync) { assert(executer_); - executer_(std::move(t), sync); + return executer_(std::move(t), sync); } } // namespace cloe diff --git a/runtime/src/cloe/trigger/example_actions.cpp b/runtime/src/cloe/trigger/example_actions.cpp index 6c3e1cb60..011d31c73 100644 --- a/runtime/src/cloe/trigger/example_actions.cpp +++ b/runtime/src/cloe/trigger/example_actions.cpp @@ -31,8 +31,7 @@ #include // for Sync #include // for TriggerRegistrar -namespace cloe { -namespace actions { +namespace cloe::actions { // Log ----------------------------------------------------------------------- TriggerSchema LogFactory::schema() const { @@ -54,7 +53,7 @@ ActionPtr LogFactory::make(const Conf& c) const { ActionPtr LogFactory::make(const std::string& s) const { auto level = spdlog::level::info; - auto pos = s.find(":"); + auto pos = s.find(':'); std::string msg; if (pos != std::string::npos) { try { @@ -76,7 +75,7 @@ ActionPtr LogFactory::make(const std::string& s) const { {"level", logger::to_string(level)}, {"msg", msg}, }}; - if (msg.size() == 0) { + if (msg.empty()) { throw TriggerInvalid(c, "cannot log an empty message"); } return make(c); @@ -108,11 +107,16 @@ ActionPtr Bundle::clone() const { return std::make_unique(name(), std::move(actions)); } -void Bundle::operator()(const Sync& sync, TriggerRegistrar& r) { +CallbackResult Bundle::operator()(const Sync& sync, TriggerRegistrar& r) { logger()->trace("Run action bundle"); + auto result = CallbackResult::Ok; for (auto& a : actions_) { - (*a)(sync, r); + auto ar = (*a)(sync, r); + if (ar == CallbackResult::Unpin) { + result = ar; + } } + return result; } TriggerSchema BundleFactory::schema() const { @@ -141,11 +145,12 @@ void Insert::to_json(Json& j) const { }; } -void Insert::operator()(const Sync&, TriggerRegistrar& r) { +CallbackResult Insert::operator()(const Sync& /*unused*/, TriggerRegistrar& r) { for (const auto& tc : triggers_.to_array()) { auto local = r.make_trigger(tc); r.insert_trigger(std::move(local)); } + return CallbackResult::Ok; } TriggerSchema InsertFactory::schema() const { @@ -170,7 +175,7 @@ ActionPtr InsertFactory::make(const Conf& c) const { } // PushRelease --------------------------------------------------------------- -void PushRelease::operator()(const Sync&, TriggerRegistrar& r) { +CallbackResult PushRelease::operator()(const Sync& /*unused*/, TriggerRegistrar& r) { // clang-format off r.insert_trigger( "push down button(s)", @@ -187,6 +192,7 @@ void PushRelease::operator()(const Sync&, TriggerRegistrar& r) { }}), std::move(release_) ); + return CallbackResult::Ok; // clang-format on } @@ -234,5 +240,4 @@ ActionPtr PushReleaseFactory::make(const Conf& c) const { return std::make_unique(name(), dur, create(true), create(false), repr); } -} // namespace actions -} // namespace cloe +} // namespace cloe::actions