Realmar.Pipes
is a library to compose modular and chainable pipes using small reusable
units called processors. Processors are statically typed, therefore the need
to cast object
s to their actual type disappears.
var pipe = new Pipe<double>();
pipe.FirstConnector
.Connect(x => x * x)
.Connect(x => 8.Equals(x))
.Finish(VerifyCallback);
This library currently targets netstandard2.0
(.NET Core
2.0 or .NET Framework 4.6.1), net461
, net46
, net452
, net45
and net40
.
Mono is not targeted yet.
Realmar.Pipes
is available on nuget.org.
It can be installed using following command from within Visual Studio:
PM> Install-Package Realmar.Pipes
This library is CLS compilant which means that it is language independent. (eg. you may use it in C#, VB.NET, IronPython, etc.)
SemVer is used for versioning. Currently Realmar.Pipes
has only prerelease versions. (0.y.z)
This section describes the architecture of Realmar.Pipes
.
The idea is that a pipe is constructed using small reusable components called processors. Data is given to the pipe which then processes it using the composed processors.
┌─Pipe─────────────────────────────────┐
Input Data ⇒ [Processor]→[Processor]→[Processor] ⇒ Transformed Data
└──────────────────────────────────────┘
A processor is the smallest composable unit and is designed to do exactly one transformation on the data. This results in processors being very modular.
Pipes are the next bigger unit and can also be connected to each other. Either by trivially giving the transformed data to the next pipe or by using a more complex construct called a pipe connector. The 'ConditionalPipeConnector' is such a connector, which, based on a predicate, decides to which pipe it should give the processed data.
┌─Pipe─────────────────────┐ ┌─Connector─┐ True: [ Pipe_True ]
Data ⇒ [Processor]→[Processor] ⇒ Predicate ⇒
└──────────────────────────┘ └───────────┘ False: [ Pipe_False ]
Pipes can be easily composed using delegates:
var pipe = new Pipe<double>();
pipe.FirstConnector
.Connect(x => x * 2)
.Connect(x => $"{x} is your number!")
.Connect(x => x.ToUpperInvariant())
.Finish(Console.WriteLine);
pipe.Process(2);
// prints:
// 4 IS YOUR NUMBER!
Use delegates if the computations are simple. For actions which are
more complex and may require state implement IPipeProcessor
to create
your own processor. This process will be described in the following sections.
A processor is responsible for performing a single transformation on the input data and return it.
Let's create a new processor which converts a string to a primitive type. This
is done by implementing the IPipeProcessor<TIn, TOut>
interface:
public class ParseStringProcessor<TOut> : IPipeProcessor<string, TOut>
{
public TOut Process(string data)
{
var tc = TypeDescriptor.GetConverter(typeof(TOut));
var obj = tc.ConvertFromInvariantString(null, data);
return (TOut)obj;
}
}
A pipe is responsible for combining multiple processors into one processing pipe line. Additionally, a pipe uses a processing strategy which defines how the data is processed. (later more)
The processor defined above is used for this example:
// Instantiate a pipe which takes data of type string as input
var pipe = new Pipe<string>();
pipe.FirstConnector
.Connect(new ParseStringProcessor<int>())
.Finish(Console.WriteLine); // print to console
pipe.Process(new List<string> { "1", "2" });
// it is also possible to just give one item to the pipe
pipe.Process("1");
Using pseudo processors it is possible to illustrate the usage of a pipe better:
// create a pipe which takes a list of objects of type TIn as input
var pipe new Pipe<TIn>();
.Connect(new Processor<TIn, T1>())
.Connect(new Processor<T1, T2>())
.Finish(Action<IList<T2>>);
IList<TIn> data ...;
pipe.Process(data);
The delegate which is given to 'Finish' is the callback invoked after the data has been processed by all processors in the specified pipe.
More examples can be found in the tests.
Processors are variant:
// TIn is contravariant
// TOut is covariant
IPipeProcessor<in TIn, out TOut>
This allows a processor to take input which is "less derived" than the output
data of the previous processor: (Note: TOut
does not need to be covariant
to achieve this behavior. The reason why TOut
is covariant is convenience.)
class Base { }
class Derived : Base { }
// ---
// pipe takes in a list of objects of type `Derived`
var pipe = new Pipe<Derived>();
pipe.FirstConnector
// because TIn of IPipeProcessor is contravariant
// it is possible for this processors to take objects
// of type `Base` as input:
//
// Derived --> Base
.Connect(new Processor<Base, Base>())
// Base --> object
.Connect(new CastProcessor<object, Derived>()) // cast object to Derived
// Derived --> Base
.Connect(new Processor<Base, object>())
.Finish(results => { });
pipe.Process(new Derived());
Note that this is not possible with value types (eg. int
--> object
)
because value types are not polymorphic to object. (They are boxed instead)
Process strategies define how the data is processed:
// SerialProcessStrategy is the default when using the parameterless constructor
// of Pipe<TIn>. This strategy processes each data in the list after each other.
var pipe = new Pipe<object>(new SerialProcessStrategy());
Serial
[data] ┌Strategy─┐
[data] ⇒ data ⇒ [data][data][data]
[data] └─────────┘
// Process data in parallel.
// Each element of the list will be processed using the
// Parallel facility of .NET. All threads are synchronized
// at the end, and further processing is done in the thread
// where the processing was started.
var pipe = new Pipe<object>(new ParallelProcessStrategy());
Parallel
┌─Strategy───┐
| data |
[data] | ╱ ╲ | [data]
[data] ⇒ ━ data ━ ⇒ [data]
[data] | ╲ ╱ | [data]
| data |
└────────────┘
// Use a threadpool to process the data.
// It is advised to use a NonBlockingPipe when using the ThreadPoolProcessStrategy
// as this processing strategy will not block when processing the data. (in contrast
// to the ParallelProcessStrategy which blocks until all data is processed)
// A NonBlockingPipe is optimized to work with such a strategy.
var pipe = new NonBlockingPipe<object>(new ThreadPoolProcessStrategy());
ThreadPool
┌─Strategy───┐
| data ━━ ⇒ [data]
[data] | ╱ |
[data] ⇒ ━ data ━━ ⇒ [data]
[data] | ╲ |
| data ━━ ⇒ [data]
└────────────┘
By combining processors into a pipe one might see the resemblance to Unix pipes '|'. In such a scenario the chained programs produce output before all input is processed. Meaning that they take a stream of data as input and give the transformed stream as output.
Using 'NonBlockingPipe's, 'Realmar.Pipes' is able provides this functionality too!
It is possible to give a 'NonBlockingPipe' data continuously so that the caller is not blocked.
var pipe = new NonBlockingPipe<string>();
pipe.FirstConnector
.Connect(new ParseStringProcessor<int>())
.Finish(Console.WriteLine);
// will not block
pipe.Process(new List<string>{ "1", "2" });
// will not block
pipe.Process(new List<string>{ "3", "4"});
The default processing strategy of the 'NonBlockingPipe' is the 'ThreadPoolProcessStrategy'.
It is important to note that the 'NonBlockingPipe' uses a worker thread to manage the input data. This means that even when using the 'SerialProcessStrategy' the caller will not be blocked and the data is processed in a separate thread. However, the pipe will not be able to process additional data before it processed previously given data. (Because it needs to wait for the 'SerialProcessStrategy' to finish before it is able to give it more data.) The same applies to the 'ParallelProcessStrategy'.
Pipe Connectors are used to pass processed data from one pipe to other pipes.
This example is taken out of the tests.
(Testcase: Process_ConditionalPipeConnector
) The goal is to multiply a number until it
is bigger or equal than 20, then append a string.
var mathPipe = new Pipe<double>();
var stringPipe = new Pipe<double>();
// false true predicate
var connector = new ConditionalPipeConnector<double>(mathPipe, stringPipe, x => x < 20);
mathPipe.FirstConnector
.Connect(x => x * 2)
.Finish(connector.Process);
stringPipe.FirstConnector
.Connect(x => $"{x} is your number!")
.Finish(results => Console.WriteLine);
mathPipe.Process(new List<double> { 1, 2, 3, 4, 5, 6 });
Change directory into test project and restore dependencies:
$ cd Realmar.Pipes.Tests
$ dotnet restore
Run the tests:
$ dotnet xunit
- Add more processors which are useful!
- Distributed pipe connectors which allow to send the processed data to another computer for further processing
This project is licensed under the MIT License - see the LICENSE file for details.