Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Declarative Port Graph #46

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
e5a5a6a
working ports example
tanneberger May 15, 2023
3ba636a
letting reactor-cpp compile again
tanneberger May 31, 2023
92e788b
new graph output schema
tanneberger May 31, 2023
f67c4ab
start initializing connections
tanneberger May 31, 2023
1f31b20
adding pull connection methods
tanneberger May 31, 2023
e8204c2
Reactor::register_connection
tanneberger May 31, 2023
7d3fa58
implementing full assembly functionality
tanneberger May 31, 2023
4e94252
clang format
tanneberger May 31, 2023
9b15b87
merging with main
tanneberger May 31, 2023
6ec65e3
refactored reactor_element out
tanneberger Jun 4, 2023
e7829ae
connection now registering at partent container
tanneberger Jun 4, 2023
3fd668e
implementing other connection types
tanneberger Jun 4, 2023
b490231
clang-format
tanneberger Jun 4, 2023
c12eb0a
fixing file header
tanneberger Jun 4, 2023
bdc7764
declaring delayed connection
tanneberger Jun 4, 2023
29ea85a
connection merging
tanneberger Jun 5, 2023
1cf7a0d
clang-tidy clean up
tanneberger Jun 5, 2023
6696d79
move consturctor for PropertyGraph
tanneberger Jun 5, 2023
0d287e1
formatting
tanneberger Jun 5, 2023
43d2885
adding some NOLINTS
tanneberger Jun 5, 2023
6096b72
more clean up
tanneberger Jun 5, 2023
87900b5
adding operators to PropertyGraph
tanneberger Jun 5, 2023
202c7b4
operators
tanneberger Jun 5, 2023
46c9aff
fixing self assing
tanneberger Jun 5, 2023
00adab5
ownership of pointers
tanneberger Jun 5, 2023
162e8a6
reactor formatting
tanneberger Jun 5, 2023
ceb709e
clean up & making the the compilers happy
tanneberger Jun 8, 2023
90497a8
remove useless macro
tanneberger Jun 8, 2023
800dea1
working in some more comments
tanneberger Jun 8, 2023
5b6253c
make current_workers variable inline
cmnrd Jun 8, 2023
d5c0407
working in the comments and reviews
tanneberger Jun 12, 2023
8965c6f
making optimizations toggleable
tanneberger Jun 12, 2023
63b0d72
optimization as part of assemble
tanneberger Jun 12, 2023
2d36ce8
clang-format
tanneberger Jun 12, 2023
59d777c
added draw_connection which takes ports
tanneberger Jun 15, 2023
f6551c8
self registering ports
tanneberger Jun 15, 2023
a0a04be
draw connection now only on top level reactor
tanneberger Jun 21, 2023
5801fdb
merging with master
tanneberger Jun 24, 2023
909a322
fixing a lot of bugs
tanneberger Jun 29, 2023
4d1d18f
adding NOLINTs
tanneberger Jun 29, 2023
8b6334f
only in top level environment
tanneberger Jul 5, 2023
251299a
fixed very suble bug
tanneberger Jul 5, 2023
fc1c23d
clang-tidy
tanneberger Jul 5, 2023
098b064
adding debug methods back
tanneberger Jul 5, 2023
7b34c68
remove optimizations for now
tanneberger Jul 5, 2023
decbf3e
renamed ProperyGraph -> Graph
tanneberger Jul 5, 2023
90819b6
renamed tracing.hpp -> tracing.hh
tanneberger Jul 5, 2023
304ee3c
worked in more review
tanneberger Jul 5, 2023
f0e00cc
fixed header guard
tanneberger Jul 5, 2023
5775399
fix trace include
tanneberger Jul 5, 2023
54ca1c0
squashed another bug
tanneberger Jul 5, 2023
d609070
bug that callbacks were registered twice
tanneberger Jul 5, 2023
11a5427
clang-tidy
tanneberger Jul 5, 2023
36900a0
wrong assert
tanneberger Jul 5, 2023
aaa7249
remove false assert
tanneberger Jul 5, 2023
9043251
making clang-tidy happy
tanneberger Jul 5, 2023
d651240
unused variable
tanneberger Jul 5, 2023
a6bb591
last dump
tanneberger Jul 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions examples/ports/main.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include <iostream>

#include "reactor-cpp/reactor-cpp.hh"
#include <reactor-cpp/reactor-cpp.hh>

using namespace reactor;
using namespace std::chrono_literals;
Expand Down Expand Up @@ -38,6 +38,7 @@ class Counter : public Reactor {
Output<int> count{"count", this}; // NOLINT

void assemble() override {
std::cout << "assemble Counter" << std::endl << std::flush;
r_trigger.declare_trigger(&trigger);
r_trigger.declare_antidependency(&count);
}
Expand All @@ -53,12 +54,15 @@ class Printer : public Reactor {
Reaction r_value{"r_value", 1, this, [this]() { on_value(); }};

public:
Input<int> value{"value", this}; // NOLINT

Input<int> value{"value", this}; // NOLINT
Input<int> forward{"forward", this}; // NOLINT
Printer(const std::string& name, Environment* env)
: Reactor(name, env) {}

void assemble() override { r_value.declare_trigger(&value); }
void assemble() override {
std::cout << "assemble Printer" << std::endl << std::flush;
r_value.declare_trigger(&value);
}

void on_value() { std::cout << this->name() << ": " << *value.get() << std::endl; }
};
Expand All @@ -84,6 +88,7 @@ class Adder : public Reactor {
void add() {
if (i1.is_present() && i2.is_present()) {
sum.set(*i1.get() + *i2.get());
std::cout << "setting sum" << std::endl;
}
}
};
Expand All @@ -94,23 +99,49 @@ auto main() -> int {
Trigger trigger1{"t1", &env, 1s};
Counter counter1{"c1", &env};
Printer printer1{"p1", &env};
trigger1.trigger.bind_to(&counter1.trigger);
counter1.count.bind_to(&printer1.value);

auto trigger1_trigger = env.register_port(&trigger1.trigger);
auto counter1_trigger = env.register_port(&counter1.trigger);
auto counter1_count = env.register_port(&counter1.count);
auto print1_value = env.register_port(&printer1.value);
// trigger1.trigger.set_inward_binding(&counter1.trigger);
// counter1.count.set_inward_binding(&printer1.value);

env.draw_connection(trigger1_trigger, counter1_trigger, ConnectionProperties{});
env.draw_connection(counter1_count, print1_value, ConnectionProperties{});

Trigger trigger2{"t2", &env, 2s};
Counter counter2{"c2", &env};
Printer printer2{"p2", &env};
trigger2.trigger.bind_to(&counter2.trigger);
counter2.count.bind_to(&printer2.value);

auto trigger2_trigger = env.register_port(&trigger2.trigger);
auto counter2_trigger = env.register_port(&counter2.trigger);
auto counter2_count = env.register_port(&counter2.count);
auto printer2_value = env.register_port(&printer2.value);

// trigger2.trigger.set_inward_binding(&counter2.trigger);
// counter2.count.set_inward_binding(&printer2.value);
env.draw_connection(trigger2_trigger, counter2_trigger, ConnectionProperties{});
env.draw_connection(counter2_count, printer2_value, ConnectionProperties{});

Adder add{"add", &env};
Printer p_add{"p_add", &env};
counter1.count.bind_to(&add.i1);
counter2.count.bind_to(&add.i2);
add.sum.bind_to(&p_add.value);

auto add_i1 = env.register_port(&add.i1);
auto add_i2 = env.register_port(&add.i2);
auto add_sum = env.register_port(&add.sum);
auto p_add_forward = env.register_port(&p_add.forward);
auto p_add_value = env.register_port(&p_add.value);

env.draw_connection(counter1_count, add_i1, ConnectionProperties{});
env.draw_connection(counter2_count, add_i2, ConnectionProperties{});
env.draw_connection(add_sum, p_add_forward, ConnectionProperties{ConnectionType::Delayed, 10s, nullptr});
env.draw_connection(p_add_forward, p_add_value, ConnectionProperties{ConnectionType::Delayed, 5s, nullptr});

std::cout << "assemble" << std::endl << std::flush;
env.assemble();

std::cout << "optimize" << std::endl << std::flush;
auto thread = env.startup();
thread.join();

Expand Down
16 changes: 8 additions & 8 deletions examples/power_train/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ auto main() -> int {
Engine engine{&env};

env.assemble();
left_pedal.angle.bind_to(&brake_control.angle);
left_pedal.on_off.bind_to(&engine_control.on_off);
brake_control.force.bind_to(&brakes.force);
right_pedal.angle.bind_to(&engine_control.angle);
engine_control.check.bind_to(&right_pedal.check);
engine_control.torque.bind_to(&engine.torque);

env.export_dependency_graph("graph.dot");
left_pedal.angle.set_inward_binding(&brake_control.angle);
left_pedal.on_off.set_inward_binding(&engine_control.on_off);
brake_control.force.set_inward_binding(&brakes.force);
right_pedal.angle.set_inward_binding(&engine_control.angle);
engine_control.check.set_inward_binding(&right_pedal.check);
engine_control.torque.set_inward_binding(&engine.torque);

// TODO: env.export_dependency_graph("graph.dot");

auto thread = env.startup();
thread.join();
Expand Down
27 changes: 7 additions & 20 deletions include/reactor-cpp/action.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "fwd.hh"
#include "logical_time.hh"
#include "reactor.hh"
#include "reactor_element.hh"
#include "time_barrier.hh"
#include "value_ptr.hh"

Expand All @@ -27,7 +28,7 @@ class BaseAction : public ReactorElement {
private:
std::set<Reaction*> triggers_{};
std::set<Reaction*> schedulers_{};
const Duration min_delay_{};
const Duration min_delay_{0};
const bool logical_{true};
bool present_{false};

Expand All @@ -48,22 +49,13 @@ protected:
* indicates that the tag is safe to process.
*/
virtual auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
const std::function<bool(void)>& abort_waiting) -> bool {
reactor_assert(!logical_);
reactor_assert(lock.owns_lock());
return PhysicalTimeBarrier::acquire_tag(tag, lock, environment()->scheduler(), abort_waiting);
}
const std::function<bool(void)>& abort_waiting) -> bool;

BaseAction(const std::string& name, Reactor* container, bool logical, Duration min_delay)
: ReactorElement(name, ReactorElement::Type::Action, container)
, min_delay_(min_delay)
, logical_(logical) {}
BaseAction(const std::string& name, Environment* environment, bool logical, Duration min_delay)
: ReactorElement(name, ReactorElement::Type::Action, environment)
, min_delay_(min_delay)
, logical_(logical) {
environment->register_input_action(this);
}
BaseAction(const std::string& name, Environment* environment, bool logical, Duration min_delay);

public:
[[nodiscard]] auto inline triggers() const noexcept -> const auto& { return triggers_; }
Expand Down Expand Up @@ -150,12 +142,7 @@ public:

template <class T> class PhysicalAction : public Action<T> {
public:
PhysicalAction(const std::string& name, Reactor* container)
: Action<T>(name, container, false, Duration::zero()) {
// all physical actions act as input actions to the program as they can be
// scheduled from external threads
container->environment()->register_input_action(this);
}
PhysicalAction(const std::string& name, Reactor* container);
};

template <class T> class LogicalAction : public Action<T> {
Expand All @@ -166,8 +153,8 @@ public:

class Timer : public BaseAction {
private:
const Duration offset_{};
const Duration period_{};
const Duration offset_{0};
const Duration period_{0};

void cleanup() noexcept final;

Expand Down
1 change: 1 addition & 0 deletions include/reactor-cpp/assert.hh
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ template <typename E> constexpr auto extract_value(E enum_value) -> typename std
}

void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] Phase phase);
void assert_phase([[maybe_unused]] const Environment* ptr, [[maybe_unused]] Phase phase);

} // namespace reactor

Expand Down
51 changes: 26 additions & 25 deletions include/reactor-cpp/connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,19 @@ protected:
[[nodiscard]] auto upstream_port() -> auto* { return upstream_port_; }
[[nodiscard]] auto upstream_port() const -> const auto* { return upstream_port_; }

virtual auto upstream_set_callback() noexcept -> PortCallback = 0;

public:
virtual auto upstream_set_callback() noexcept -> PortCallback = 0;
virtual void bind_upstream_port(Port<T>* port) {
reactor_assert(upstream_port_ == nullptr);
upstream_port_ = port;
port->register_set_callback(upstream_set_callback());
}

virtual void bind_downstream_ports(const std::vector<BasePort*>& ports) {
// with C++23 we can use insert_rage here
for ([[maybe_unused]] auto* port : ports) { // NOLINT
tanneberger marked this conversation as resolved.
Show resolved Hide resolved
this->downstream_ports_.insert(static_cast<Port<T>*>(port));
}
}
virtual void bind_downstream_port(Port<T>* port) {
[[maybe_unused]] bool result = this->downstream_ports_.insert(port).second;
reactor_assert(result);
Expand Down Expand Up @@ -110,31 +114,28 @@ protected:

EnclaveConnection(const std::string& name, Environment* enclave, const Duration& delay)
: BaseDelayedConnection<T>(name, enclave, false, delay)
, log_{this->fqn()}
, logical_time_barrier_(enclave->scheduler()) {}
, logical_time_barrier_(enclave->scheduler())
, log_{this->fqn()} {}

public:
EnclaveConnection(const std::string& name, Environment* enclave)
: BaseDelayedConnection<T>(name, enclave, false, Duration::zero())
, log_{this->fqn()}
, logical_time_barrier_(enclave->scheduler()) {}
, logical_time_barrier_(enclave->scheduler())
, log_{this->fqn()} {};

inline auto upstream_set_callback() noexcept -> PortCallback override {
return [this](const BasePort& port) {
// We know that port must be of type Port<T>
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
const auto* scheduler = port.environment()->scheduler();
return [&](const BasePort& port) { // NOLINT unused this
// This callback will be called from a reaction executing in the context
// of the upstream port. Hence, we can retrieve the current tag directly
// without locking.
auto tag = Tag::from_logical_time(scheduler->logical_time());
bool result{false};
if constexpr (std::is_same<T, void>::value) {
result = this->schedule_at(tag);
this->schedule_at(Tag::from_logical_time(port.environment()->scheduler()->logical_time()));
} else {
result = this->schedule_at(std::move(typed_port.get()), tag);
// We know that port must be of type Port<T>
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
this->schedule_at(std::move(typed_port.get()),
Tag::from_logical_time(port.environment()->scheduler()->logical_time()));
}
reactor_assert(result);
};
}

Expand Down Expand Up @@ -186,21 +187,21 @@ public:
: EnclaveConnection<T>(name, enclave, delay) {}

inline auto upstream_set_callback() noexcept -> PortCallback override {
return [this](const BasePort& port) {
// We know that port must be of type Port<T>
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
const auto* scheduler = port.environment()->scheduler();
return [&](const BasePort& port) { // NOLINT unused this
// This callback will be called from a reaction executing in the context
// of the upstream port. Hence, we can retrieve the current tag directly
// without locking.
auto tag = Tag::from_logical_time(scheduler->logical_time()).delay(this->min_delay());
bool result{false};

if constexpr (std::is_same<T, void>::value) {
result = this->schedule_at(tag);
this->schedule_at(
Tag::from_logical_time(port.environment()->scheduler()->logical_time()).delay(this->min_delay()));
} else {
result = this->schedule_at(std::move(typed_port.get()), tag);
// We know that port must be of type Port<T>
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
this->schedule_at(
std::move(typed_port.get()),
Tag::from_logical_time(port.environment()->scheduler()->logical_time()).delay(this->min_delay()));
}
reactor_assert(result);
};
}

Expand Down
26 changes: 26 additions & 0 deletions include/reactor-cpp/connection_properties.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2023 TU Dresden
* All rights reserved.
*
* Authors:
* Tassilo Tanneberger
*/

#ifndef REACTOR_CPP_CONNECTION_PROPERTIES_HH
#define REACTOR_CPP_CONNECTION_PROPERTIES_HH

#include "fwd.hh"
#include "logical_time.hh"

namespace reactor {

enum ConnectionType { Normal, Delayed, Enclaved, Physical, DelayedEnclaved, PhysicalEnclaved, Plugin, Invalid };
struct ConnectionProperties {
ConnectionType type_ = ConnectionType::Normal;
Duration delay_{0};
Environment* enclave_{nullptr};
};

} // namespace reactor

#endif // REACTOR_CPP_CONNECTION_PROPERTIES_HH
Loading
Loading