-
Notifications
You must be signed in to change notification settings - Fork 46
SO 5.8 ByExample Machine Control
Note. This example is one of the biggest and complex examples in SO-5.8 distribution. For understanding of it a good knowledge of SO-5.8 Basics, SO-5.5 InDepth - Message Delivery Filters, SO-5.8 InDepth - Priorities of Agents, SO-5.8 InDepth - Dispatchers is required.
Support of agent's priorities has been added in v.5.5.8. Three new priority-respected dispatches were introduced. They implement various priority handling policies. This example demostrates usage of one of these dispatchers -- prio_one_thread::strictly_ordered.
This example shows abitility of prio_one_thread::strictly_ordered to reorder events with respect to its priorities. This ability is used in imitation of managing some equipment with engines/coolers inside where some event (like enging overheating) must be handled as soon as possible, even if that leads to delaying handling of other events with lower priorities.
Because this example is long its full source code is not represented here. The full source can be found in the repository.
This example is a very simple imitiation of a task of monitoring and controlling work of some machines with engines and coolers inside. When an engine is working it is get warmer. When temperature becomes greater that some warn level a cooler must be turned on. If engine is still getting warmer and its temperature becomes greater that some critical level then engine must be turned off. If engine is getting colder and its temperature drops below some safe level then cooler could be turned off.
There are several kinds of agents in the example.
There are machines represented by agents of type a_machine_t
. Those agents imitate behaviour of some equipment with engines and coolers insides. They distributes information about its status and react to control commands like turning engine on or off.
There is status analyzer repesented by agent of type a_statuses_analyzer_t
. This agent reacts to machine statuses and detects cases when some machine needs various kinds of attention (like necessity of turning cooler/engine on/off).
There are machine controllers represented by agent of template type a_machine_controller_t
parametrized by corresponding controller implementation (like engine_starter_t
or cooler_stopper_t
). Those agents have different priorities. They react on notifications from status analyser and produce actions for machines (like signals turn_engine_off
or turn_cooler_on
).
There is a dashboard agent repesented by agent of type a_total_status_dashboard_t
. It receives statuses from machines, collects them and shows them to the console on periodic basics. This agent allows to visualize the states of machines.
Almost all interactions are done on one MPMC mbox. Machines send status messages to that mbox. Status analyzer receives statuses from that mbox and sends attention messages back to that mbox. Machine controllers receive attention messages from that mbox. But direct action for a specific machine is sent to direct mbox of that machine.
This working scheme means that there are many messages going from one MPMC mbox. Because of that machine controllers use message delivery filters to receive only those messages they are interested.
Agent of type a_machine_t
does very simple imitation of work of some equipment. There are two states of an agent: st_engine_on
and st_engine_off
. In state st_engine_on
an agent ignores control command turn_engine_on
, and in state st_engine_off
an agent ignores control command turn_engine_off
. Also the current parameters of a machine is calculated differently in every state. All other events are handled the same way in both states.
Periodically an agent recalculates engine temperature with use statuses of engine and cooler. Then the new status is distributed via machine_status
message.
The agent implementation is:
class a_machine_t final : public so_5::agent_t
{
// Periodic signal to update and distribute status of the machine.
struct update_status final : public so_5::signal_t {};
public :
a_machine_t(
context_t ctx,
std::string id,
so_5::mbox_t status_distrib_mbox,
float initial_temperature,
float engine_heating_step,
float cooler_impact_step )
: so_5::agent_t{ ctx }
, m_id( std::move( id ) )
, m_status_distrib_mbox{ std::move( status_distrib_mbox ) }
, m_initial_temperature{ initial_temperature }
, m_engine_heating_step{ engine_heating_step }
, m_cooler_impact_step{ cooler_impact_step }
, m_engine_temperature{ initial_temperature }
{}
void so_define_agent() override
{
this >>= st_engine_off;
st_engine_on
.event( &a_machine_t::evt_turn_engine_off )
.event( &a_machine_t::evt_turn_cooler_on )
.event( &a_machine_t::evt_turn_cooler_off )
.event( &a_machine_t::evt_update_status_when_engine_on );
st_engine_off
.event( &a_machine_t::evt_turn_engine_on )
.event( &a_machine_t::evt_turn_cooler_on )
.event( &a_machine_t::evt_turn_cooler_off )
.event( &a_machine_t::evt_update_status_when_engine_off );
}
void so_evt_start() override
{
// Periodic update_status signal must be initiated.
m_update_status_timer = so_5::send_periodic< update_status >(
*this,
std::chrono::milliseconds(0),
std::chrono::milliseconds(200) );
}
private :
const state_t st_engine_on{ this, "on" };
const state_t st_engine_off{ this, "off" };
const std::string m_id;
const so_5::mbox_t m_status_distrib_mbox;
const float m_initial_temperature;
const float m_engine_heating_step;
const float m_cooler_impact_step;
float m_engine_temperature;
engine_state_t m_engine_status = engine_state_t::off;
cooler_state_t m_cooler_status = cooler_state_t::off;
// Timer ID for periodic update_status.
so_5::timer_id_t m_update_status_timer;
void evt_turn_engine_off(mhood_t< turn_engine_off >)
{
this >>= st_engine_off;
m_engine_status = engine_state_t::off;
}
void evt_turn_engine_on(mhood_t< turn_engine_on >)
{
this >>= st_engine_on;
m_engine_status = engine_state_t::on;
}
void evt_turn_cooler_off(mhood_t< turn_cooler_off >)
{
m_cooler_status = cooler_state_t::off;
}
void evt_turn_cooler_on(mhood_t< turn_cooler_on >)
{
m_cooler_status = cooler_state_t::on;
}
void evt_update_status_when_engine_on(mhood_t< update_status >)
{
m_engine_temperature += m_engine_heating_step;
if( cooler_state_t::on == m_cooler_status )
m_engine_temperature -= m_cooler_impact_step;
distribute_status();
}
void evt_update_status_when_engine_off(mhood_t< update_status >)
{
if( cooler_state_t::on == m_cooler_status )
{
m_engine_temperature -= m_cooler_impact_step;
if( m_engine_temperature < m_initial_temperature )
m_engine_temperature = m_initial_temperature;
}
distribute_status();
}
void distribute_status()
{
so_5::send< machine_status >(
m_status_distrib_mbox,
m_id,
m_engine_status,
m_cooler_status,
m_engine_temperature );
}
};
Agent for monitoring statuses of machines handles just one message -- machine_status
. But it maintains the current statuses for all engines. It allows to detect moments when some attention to a machine must be taken. For example if engine temperature becomes higher than warn the level a message machine_needs_attention
with code engine_cooling_needed
will be sent.
The full source code of that agent is:
class a_statuses_analyzer_t final : public so_5::agent_t
{
public :
a_statuses_analyzer_t(
context_t ctx,
so_5::mbox_t status_distrib_mbox,
float safe_temperature,
float warn_temperature,
float high_temperature)
: so_5::agent_t{ ctx }
, m_status_distrib_mbox{ std::move( status_distrib_mbox ) }
, m_safe_temperature{ safe_temperature }
, m_warn_temperature{ warn_temperature }
, m_high_temperature{ high_temperature }
{}
void so_define_agent() override
{
so_subscribe( m_status_distrib_mbox ).event(
&a_statuses_analyzer_t::evt_machine_status );
}
private :
const so_5::mbox_t m_status_distrib_mbox;
const float m_safe_temperature;
const float m_warn_temperature;
const float m_high_temperature;
// Info about last known machine status.
struct last_machine_info_t
{
attention_t m_attention;
float m_engine_temperature;
};
// Map from machine ID to last known status.
using last_info_map_t = std::map< std::string, last_machine_info_t >;
last_info_map_t m_last_infos;
void evt_machine_status( const machine_status & status )
{
auto it = m_last_infos.find( status.m_id );
if( it == m_last_infos.end() )
// There is no information about this machine yet.
// It must be added.
it = m_last_infos.insert( last_info_map_t::value_type {
status.m_id,
last_machine_info_t {
attention_t::none,
status.m_engine_temperature
} } ).first;
handle_new_status( status, it->second );
}
void handle_new_status(
const machine_status & status,
last_machine_info_t & last_info ) const
{
const auto fresh_info = last_machine_info_t {
detect_attention( status, last_info ),
status.m_engine_temperature
};
if( last_info.m_attention != fresh_info.m_attention )
// Machine needs some new attention.
so_5::send< machine_needs_attention >(
m_status_distrib_mbox,
status.m_id,
fresh_info.m_attention,
status.m_engine_status,
status.m_cooler_status );
last_info = fresh_info;
}
attention_t detect_attention(
const machine_status & status,
const last_machine_info_t & last ) const
{
if( last.m_engine_temperature < status.m_engine_temperature )
{
// Engine is warming.
if( status.m_engine_temperature > m_high_temperature )
{
if( attention_t::engine_overheat_detected != last.m_attention )
return attention_t::engine_overheat_detected;
}
else if( status.m_engine_temperature > m_warn_temperature )
{
if( attention_t::engine_cooling_needed != last.m_attention )
return attention_t::engine_cooling_needed;
}
}
else
{
// Engine is cooling.
if( status.m_engine_temperature < m_safe_temperature )
if( attention_t::none != last.m_attention &&
attention_t::engine_cooling_done != last.m_attention )
return attention_t::engine_cooling_done;
}
// Attention need not to be changed.
return last.m_attention;
}
};
Machine controllers are implemented by a template class a_machine_controller_t
and several structs with action-specific logic.
Machine controller does very simple tasks: it sets up a message delivery filter to receive only messages appropriate for its action and then reacts to those messages. For example, for turning engine off because of engine overheating it is necessary to receive only messages with mark engine_overhead_detected
and produce a signal turn_engine_off
when this message arrived. Something like:
struct engine_stopper_t
{
bool filter( const machine_needs_attention & msg ) const
{
return msg.m_attention == attention_t::engine_overheat_detected;
}
void action(
const machine_dictionary_t & machines,
const machine_needs_attention & evt ) const
{
so_5::send< turn_engine_off >( machines.find_mbox( evt.m_id ) );
}
};
A template class a_machine_controller_t
needs to be parametrized by types like engine_stopper_t
shown above. The code of a_machine_controlle_t
template is short and simple:
template< class Logic >
class a_machine_controller_t final : public so_5::agent_t
{
public :
a_machine_controller_t(
context_t ctx,
so_5::priority_t priority,
so_5::mbox_t status_distrib_mbox,
const machine_dictionary_t & machines )
: so_5::agent_t( ctx + priority )
, m_status_distrib_mbox( std::move( status_distrib_mbox ) )
, m_machines( machines )
, m_logic()
{}
void so_define_agent() override
{
so_set_delivery_filter( m_status_distrib_mbox,
[this]( const machine_needs_attention & msg ) {
return m_logic.filter( msg );
} );
so_subscribe( m_status_distrib_mbox )
.event( [this]( const machine_needs_attention & evt ) {
m_logic.action( m_machines, evt );
} );
}
private :
const so_5::mbox_t m_status_distrib_mbox;
const machine_dictionary_t & m_machines;
const Logic m_logic;
};
The main part of machine controller is a priority. Priority is passed to a constructor of a_machine_controller_t
. Actual value of priority is dependent of the controller task: controller which is responsible for turning engine off has the highest priority in the sample, controller which is responsible for turning cooler off has the lowest priority.
Priorities are specified during agents creation (please note also binding to prio_one_thread::strictly_ordered dispatcher):
void create_machine_controllers(
so_5::coop_t & coop,
const so_5::mbox_t & status_distrib_mbox,
const machine_dictionary_t & machines )
{
// There must be a priority-respected dispatcher.
auto disp = so_5::disp::prio_one_thread::strictly_ordered::
make_dispatcher( coop.environment() );
coop.make_agent_with_binder< a_machine_controller_t< engine_stopper_t > >(
disp.binder(),
so_5::prio::p4,
status_distrib_mbox,
machines );
coop.make_agent_with_binder< a_machine_controller_t< cooler_starter_t > >(
disp.binder(),
so_5::prio::p3,
status_distrib_mbox,
machines );
coop.make_agent_with_binder< a_machine_controller_t< engine_starter_t > >(
disp.binder(),
so_5::prio::p2,
status_distrib_mbox,
machines );
coop.make_agent_with_binder< a_machine_controller_t< cooler_stopper_t > >(
disp.binder(),
so_5::prio::p1,
status_distrib_mbox,
machines );
}
Agent of type a_total_status_dashboard_t
simply collects statuses of all machines and periodically shows them to the console. Code of that agent is not show here because of its simplicity.
All engines must be started at the begining of example work. To do that a very simple agent is used:
void create_starter_agent(
so_5::coop_t & coop,
const machine_dictionary_t & dict )
{
// A very simple agent will be used as starter.
// It will work on the default dispatcher.
class starter_t final : public so_5::agent_t {
const machine_dictionary_t & m_dict;
public :
starter_t( context_t ctx, const machine_dictionary_t & dict )
: so_5::agent_t{ std::move(ctx) }
, m_dict{ dict }
{}
void so_evt_start() override {
m_dict.for_each(
[]( const std::string &, const so_5::mbox_t & mbox ) {
so_5::send< turn_engine_on >( mbox );
} );
}
};
coop.make_agent< starter_t >( std::ref(dict) );
}
All agents in the sample are belong to one coop. But they are bound to different dispatchers.
Machines are working on common working thread:
const machine_dictionary_t & create_machines(
so_5::coop_t & coop,
const so_5::mbox_t & status_distrib_mbox )
{
// Data for machine dictionary.
machine_dictionary_t::dictionary_type_t dict_data;
// All machines will work on dedicated working thread.
auto machine_disp = so_5::disp::one_thread::make_dispatcher(
coop.environment() );
// Helper for creation of machine agent and adding it info into
// machine dictionary.
auto make_machine = [&]( const std::string & name,
float initial, float warming_step, float cooling_step )
{
auto machine = coop.make_agent_with_binder< a_machine_t >(
machine_disp.binder(),
name, status_distrib_mbox,
initial, warming_step, cooling_step );
dict_data[ name ] = machine->so_direct_mbox();
};
make_machine( "Mch01", 20.0f, 0.3f, 0.2f );
make_machine( "Mch02", 20.0f, 0.45f, 0.2f );
make_machine( "Mch03", 20.0f, 0.25f, 0.3f );
make_machine( "Mch04", 20.0f, 0.26f, 0.27f );
// Machine dictionary could be created at that point.
return *( coop.take_under_control(
std::make_unique< machine_dictionary_t >( std::move( dict_data ) ) ) );
}
Status analyzer and total status dashboard work on different working threads. Machine controllers work on common working thread but under control of prio_one_thread::strictly_ordered dispatcher. Starter agent works on the default dispatcher:
void fill_coop( so_5::coop_t & coop )
{
// Common mbox for information distribution.
auto status_distrib_mbox = coop.environment().create_mbox();
// Create machines and form machines dictionary.
const auto & machine_dict = create_machines( coop, status_distrib_mbox );
// Machine dashboard will work on its own dedicated thread.
coop.make_agent_with_binder< a_total_status_dashboard_t >(
so_5::disp::one_thread::make_dispatcher(
coop.environment() ).binder(),
status_distrib_mbox );
// Status analyzer will work on its own dedicated thread.
coop.make_agent_with_binder< a_statuses_analyzer_t >(
so_5::disp::one_thread::make_dispatcher(
coop.environment() ).binder(),
status_distrib_mbox,
50.0f, // Safe temperature.
70.0f, // Warn temperature (cooler must be turned on)
95.0f // Critical temperature (engine must be turned off).
);
// Create machine controllers.
create_machine_controllers( coop, status_distrib_mbox, machine_dict );
// Special agent which will start machines.
create_starter_agent( coop, machine_dict );
}
Example works until it would be stopped by breaking (like Ctrl+C). During its work a statuses of machines will be shown to the standard output. Something like this: