Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Non-Linear pipelines and Non-Runnable Nodes (#261)
## The Goal The goal of this PR was to enable non-linear pipelines from python code and to also allow the user to create nodes that do not have a progress engine. For example: ```python # Create a C++ source source = m.SourceDerived(seg, "source") # Make a broadcast node broadcast = mrc.core.node.Broadcast(seg, "broadcast") # Create a C++ sink with a progress engine sink = m.SinkBase(seg, "sink") def on_next(x): print("Got: {}".format(type(x))) # Create a Python sink without a progress engine sink_component = seg.make_sink_component("sink_component", on_next, None, None) # Connect source -> broadcast seg.make_edge(source, broadcast) # Connect broadcast -> sink seg.make_edge(broadcast, sink) # Connect broadcast -> sink_component seg.make_edge(broadcast, sink_component) ``` The example above shows creating a pipeline with a single source and multiple readers and is interesting for a few reasons: - The `sink_component` object will have its `on_next` function run immediately, using the progress engine from the `source` node - The `sink` object will run it's body asynchronously on a different thread from any other node - The C++(Source) -> Broadcast -> C++(Sink) path should not need the python GIL While it was possible to do something similar to this in C++ code, this was not possible in Python and presented a few challenges. ## The Problem Currently, there are several node types in C++ that do not have a progress engine (`Queue`, `Muxer`, `Broadcast`, etc). These object were referred to as "Operators" in the code but for the remainder of this PR, they will be referred to as "Components". ### Problem 1 - C++ Broadcast Node Types Simply wrapping the existing Component nodes in python bindings doesn't really solve our goal. If you wanted to connect two C++ nodes together through a broadcast from Python, you would need to derive from `Broadcast<T>` for your particular C++ type, create python bindings for this single broadcast object, and then create that object in python. You would need to repeat this for every C++ type you plan to use in your pipeline. Since the C++ broadcast does very little with the C++ object, this is incredibly tedious for the amount of work it accomplishes. ### Problem 2 - Implementing Source/Node/Sink Components The above example used a simple Sink Component, but what about a Source Component? A Sink Component is easy to reason about since it's a push-driven design. The upstream progress engine calls `await_write` when it is done with a value, and we can easily have this call the Sink component `on_next` function. But a Source Component would need to be a pull driven system similar to the existing `Queue`. How would this work? Can you connect a pull-driven node to a push-driven one? Our current node/edge system didn't have any way to distinguish between the two. ### Problem 3 - Coupling of Channels in the Node Inheritance Tree The number of base node types has grown significantly and it can be difficult to know which one to derive from when making a new node type. For example, this is the full inheritance for an `RxSink`: - `RxSink<T>` - `RxSinkBase<T>` - `SinkChannel<T>` - `SinkChannelBase<T>` - `SinkProperties<T>` - `SinkPropertiesBase` - `ChannelAcceptor<T>` - `Watchable` - `RxRunnable<ContextT>` - `runnable::RunnableWithContext<ContextT>` - `Runnable` - `RxSubscribable` - `RxPrologueTap<T>` It can be difficult to know which ones to inherit from because they are very similar in functionality and naming. For example, what's the difference between a `SinkChannel<T>`, `SinkChannelBase<T>`, `SinkProperties<T>`, and `ChannelAcceptor<T>`? Additionally, many of these classes are tighly coupled with `Channel<T>` objects such as `SinkChannel<T>`, `SinkChannelBase<T>`, and `ChannelAcceptor<T>`. Early iterations of this PR that attempted to use the existing classes were very awkward because Components do not often use channels. Even the previous `Operator` class duplicated bits of functionality from `SinkChannelBase` to get around this issue. ## The Solution To solve these problems, the clever `ChannelProvider`/`ChannelAcceptor` system was expanded on. In this system, nodes can declare themselves as a `ChannelProvider` if they give a reference to their channel to another node and a `ChannelAcceptor` if they can accept a reference to the channel of another node. But instead of accepting and providing channel object, we accept and provide ingress/egress objects. - `Ingress` - An object that you can write to, i.e. it has an `await_write(T)` method - `Egress` - An object that you can read from, i.e. it has an `await_read(T&)` method - `Provider` - A node provides objects to other nodes to hold onto - `Acceptor` - A node accepts and holds onto other node objects This leaves us with the 4 possible node object types: - `IngressProvider` - I give other nodes an Ingress object that they can write to - `IngressAcceptor` - I accept an Ingress object from other nodes that I can write to - `EgressProvider` - I give other nodes an Egress object that they can read from - `EgressAcceptor` - I accept an Egress object from other nodes that I can read from With these 4 classes, we can describe any node as a combination of 1 or more of the above objects. For example, if a node is a push-based source with a progress engine, it should derive from `IngressAcceptor`. We can apply this logic to all properties of nodes to come up with the handy table below which maps node concepts to these 4 classes. | | `IngressProvider` | `IngressAcceptor` | `EgressProvider` | `EgressAcceptor` | | --- | :-: | :-: | :-: | :-: | | Is a Source? | | `X` | `X` | | | Is a Sink? | `X` | | | `X` | | Push-Based? | `X` | `X` | | | | Pull-Based? | | | `X` | `X` | | Requires Progress Engine? | | `X` | | `X` | Finally, if all nodes derive from these objects, then it's pretty simple to determine valid connections with the following overloads: - `make_edge(IngressAcceptor<T> source, IngressProvider<T> sink)` - Push-based edge - `make_edge(EgressProvider<T> source, EgressAcceptor<T> sink)` - Pull-based edge How does this help with the problems above? These classes help in the following ways: 1. It makes it much easier to distinguish between nodes with progress engines from those that do not. i.e. a Sink vs a Sink Component 2. It establishes a simple system for determining if edges should be push or pull based and eliminates invalid connections between incompatible nodes 3. It moves most of the edge connection logic from the nodes, to the Ingress/Egress objects that are shared between nodes (Allowing us to do things like deferring edge conversion for C++ nodes) 4. It separates the node inheritance tree from Channels ### An Example It's easier to understand this concept with some examples. Lets look at some of the properties of the `RxSink`. Anything we can answer "No" to eliminates options from the above chart: - Does it have a progress engine? Yes - If it did not have a progress engine, we could eliminate both `Provider` classes - Is it a Sink? Yes - If it wasn't a sink, we could have eliminated `IngressProvider` and `EgressAcceptor` - Is it a Source? No - This eliminates `IngressAcceptor` and `EgressProvider` This leaves only two options: - `class RxSink: public IngressProvider, public EgressAcceptor` - `IngressProvider` - The `RxSink` provides an Ingress object to upstream nodes. - When upstream nodes call `await_write` on the Ingress object, the value is written into the `RxSink`'s channel. - When the `RxSink`'s progress engine is ready for a new value, it calls `await_read` on the channel object to get the next value for processing - `EgressAcceptor` - The `RxSink` can also run in pull-mode and receives an Egress object from upstream nodes. - In this situation, there is no channel on the `RxSink` - When the `RxSink`'s progress engine is ready for a new value, it calls `await_read` on the Egress object. This method will then "pull" the next value from the upstream node for processing. Let's consider a harder example, the `Queue` object which converts a push based upstream node, to a pull based downstream node. - Does it have a progress engine? No - The `Queue` only wraps a channel object and does not have a progress engine - Eliminates `EgressAcceptor` and `IngressAcceptor` - Is it a Sink? Yes - Upstream nodes can write directly into the channel - Is it a Source? Yes - Downstream nodes can read from the channel This leaves the following options: - `class Queue: public IngressProvider, public EgressProvider` - `IngressProvider` - The `Queue` provides an Ingress object to upstream nodes. - When upstream nodes call `await_write` on the Ingress object, the value is written into the `Queues`'s channel. - `EgressProvider` - The `Queue` provides an Egress object to downstream nodes. - When downstream nodes are ready for their next value, `await_read` is called which pulls the next value from the `Queue`'s channel. Below is a listing of all node types and what type of node they are: | | `IngressProvider` | `IngressAcceptor` | `EgressProvider` | `EgressAcceptor` | | --- | :-: | :-: | :-: | :-: | | `RxSource` | | `X` | `X` | | | `GenericSourceComponent`* | | | `X` | | | `RxNode` | `X` | `X` | `X` | `X` | | `RxNodeComponent` | `X` | `X` | | | | `RxSink` | `X` | | | `X` | | `RxSinkComponent` | `X` | | | | | `Broadcast` | `X` | `X` | | | | `Queue` | `X` | | `X` | | | `CombineLatest` | `X**` | `X` | | | | `Conditional` | `X` | `X**` | | | | `Muxer` | `X` | `X` | | | | `Router` | `X` | `X**` | | | | `WritableSubject` | | `X` | | | | `ReadableSubject` | | | | `X` | `*` - An `RxSourceComponent` isn't possible since Reactive is push based by design. `**` - These are `Multi-` variants that accept multiple edge connections specified by a key ### Making an Edge Connection Edges can only be made between the following two pairs: - `IngressAcceptor <- IngressProvider` - `EgressProvider -> EgressAcceptor` Because nodes can derive from multiple different classes at the same time, it's possible for both overloads to be valid. In this case, the `Ingress` overload will be preferred to continue backwards compatibility. If no overload is valid, then an edge cannot be made. Again, this is best illustrated with some examples: - `RxSource -> RxSink` - `IngressAcceptor <- IngressProvider` is valid (will be preferred) - `EgressProvider -> EgressAcceptor` is valid - `Queue -> RxSink` - `EgressProvider -> EgressAcceptor` is valid - `WritableSubject -> RxSink` - `IngressAcceptor <- IngressProvider` is valid - `Queue -> RxSinkComponent` - No overload is valid. Cannot connect. - `RxNodeComponent -> RxSinkComponent` - `IngressAcceptor <- IngressProvider` is valid Which overload is used will be completely hidden from the node implementation and does not alter the node functionality in any way. ## Implementation In the current system, all edges are formed by the sink providing an `Ingress<T>` to the source. This only supports push-based edge connections. Pull-based edges are simulated by loaning out a channel to another progress engine. So how do we support pull-based edges? What are these Ingress/Egress objects? And how does evertyhing shut down properly with both push and pull based edges? ### The `EdgeHolder` and `EdgeHandle` Classes The two classes that make everything work are the `EdgeHolder` and the `EdgeHandle`. The `EdgeHolder` class functions very similarly to the existing `SinkChannel` class but it is separated from the concept of a Channel. It's responsible for maintaining the lifetime of an `EdgeHandle` with a shared pointer. When a dependent edge connection is formed, the shared pointer to the `EdgeHandle` is released and a `weak_ptr` is kept. Any additional connections will try to reuse the `EdgeHandle` held by the `weak_ptr`. Once all dependent `shared_ptr` to the `EdgeHandle` are released, the `EdgeHandle` is destroyed and the node is stopped. This introduces the concept of "dependent" nodes. "Dependent" nodes are nodes who derive from `IngressProvider` or `EgressProvider` and have made an edge connection using that base class. "Dependent" nodes are not allowed to shut down until all of their independent nodes have released their edge connection. This is because the dependent node is not the driver of when it will be called. For example, if you have a SourceComponent connected to an `RxSink`, the SourceComponent is the dependent node and the `RxSink` is the independent. If you think about this situation another way, the `RxSink` is the independent node because it is free to call `await_read` on the SourceComponent's Egress object whenever it wants. So even though data flows from the SourceComponent to the `RxSink`, the SourceComponent must remain alive as long as the `RxSink` can pull values from it. To signal to the downstream node that it should shutdown, the `closed` return value of `await_read` should be used. This was tricky to get correct because it is important to shut down nodes in the correct order. If any `Acceptor` class is still alive, the corresponding `Provider` class must stay alive as well. To guarantee this, on destruction, the `EdgeHolder` will generate an error if any outstanding Ingress/Egress objects are still pending. ### Connected and Disconnected Events On the `EdgeHandle` class, two types of events can be registered. On connection and on disconnection. These events help simplify the edge connection process and make it easier to guarantee the lifetime of objects. For example, to call `on_complete` when upstream connections have all been dropped, the `ForwardingIngressProvider` uses this code: ```cpp auto inner_edge = std::make_shared<ForwardingEdge>(*this); inner_edge->add_disconnector([this]() { // Only call the on_complete if we have been connected this->on_complete(); }); IngressProvider<T>::init_owned_edge(inner_edge); ``` The "connected" event fires when an edge connection is made for the first time. It only fires when a complete edge has been formed and only fires the first time. The disconnected event is the opposite and only fires when the last edge has been released. These events are very useful to guarantee specific actions occur at the right time. Such as calling `on_complete`, or releasing downstream edge connections. ## Differences This section lists the differences between the old system and the new system to make it easy to upgrade. ### Concepts - Pushing functionality from classes into Ingress/Egress objects - One major difference from before is much more logic for a node is contained in the Ingress/Egress objects themselves, instead of inside of the nodes. - TBD more info on this. ### Class Name Changes This is a table of the old class names and what should be used in their place: | Old Class | New Class | Notes | | --- | --- | :-- | | `Operator<T>` | `NodeComponent<T>` | `Operator` was too overloaded of a term and functionally identical to a Node Component | | `SourceChannelWriteable<T>` | `WritableSubject<T>` | "Subjects" are bookends to an edge chain. If you need to manually call `await_write` use the `WritableSubject` | | `UniqueOperator<T>` | N/A | This object was a temporary fix and has been replaced | | `SourceProperties` | `IIngressAcceptor`/`IEgressProvider` | What type of source needs to be specified | | `SinkProperties` | `IIngressProvider`/`IEgressAcceptor` | What type of sink needs to be specified | | `SourceChannel`/`SourceChannel` | N/A | These classes still exist but should only be derived from and not used as function arguments or with `shared_ptr` | | `OperatorComponent<T>` | `LambdaNodeComponent<T>` | Functionally identical. Consistent naming | ### Remaining Items - [ ] Cleanup the file organization - [ ] Move the classes in `channel_holder.hpp` into separate files - [ ] Move specific node types into their own namespace, i.e. The Rx classes, Components, etc. - [ ] Rename `IEdgeWritable` and `IEdgeReadable` after Ingress/Egress to better work with the other names - [ ] Finish improving the gRPC, subscription and control plane pieces to use more of the new nodes Authors: - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - Ryan Olson (https://github.com/ryanolson) - Devin Robison (https://github.com/drobison00) URL: #261
- Loading branch information