Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible other type of pipeline object required? #21

Open
jonathanstowe opened this issue Feb 26, 2021 · 0 comments
Open

Possible other type of pipeline object required? #21

jonathanstowe opened this issue Feb 26, 2021 · 0 comments

Comments

@jonathanstowe
Copy link

This isn't so much an issue as an RFC, or maybe it will reveal a grave error in my understanding of Cro or a lack in the documentation.

I'm looking at creating a service that encapsulates what should be considered an opaque implementation of a messaging service where one object manages the connection state and the subscriptions to messages, so I wind up with something like the following:

use Cro::Service;
use Cro::Message;
use Cro::Source;
use Cro::Types;
use Cro::Connection;
use Cro::Connector;
use Cro::Transform;
use Cro::Sink;
use Cro;

class Test::Client {

	has Supply $!supply;

    has @!topics = <one two three>;

	method connect() {
        $!supply = supply {
            whenever Supply.interval(0.5) {
                emit { topic => @!topics.pick, message => DateTime.now.Str };
            }
        }
        Promise.kept: True;
	}

    method subscribe(Str $topic) {
        $!supply.grep(*<topic> eq $topic)
    }
}

class Cro::Test does Cro::Service {

    class ClientConnection does Cro::Connection {
        has Test::Client $.client is required handles <connect subscribe>;

        method produces() { }

        method incoming() {
            supply {
                emit self;
            }
        }
    }

    class BrokerConnection does Cro::Source {

        method produces() { ClientConnection }

        method incoming( ) {
            my $client = Test::Client.new;
            supply {
                whenever $client.connect {
                    emit ClientConnection.new(:$client);
                }
            }
        }
    }


    class Message does Cro::Message {
        has Str     $.topic;
        has Str     $.message;
    }

    class Subscription does Cro::Transform {
        has Str $.topic is required;

        method consumes() { ClientConnection }
        method produces() { Message }

        method transformer( Supply:D $incoming --> Supply ) {
            supply {
                whenever $incoming -> $connection {
                    whenever $connection.subscribe($!topic) -> %message {
                        emit Message.new(|%message);
                    }
                }
            }
        }
    }


    class Consumer does Cro::Sink {

        has Callable $.consume is required;

        method consumes() { Message }

        method sinker ( Supply:D $messages ) {
            supply {
                whenever $messages -> $message {
                    $!consume.($message);
                }
            }
        }
    }
}



my $service = Cro.compose(
    Cro::Test::BrokerConnection.new,
    Cro::Test::Subscription.new(topic => 'two'),
    Cro::Test::Consumer.new( consume => -> $m { say "GOT IT ", $m.raku }),
    );

$service.start;

react {
    whenever signal(SIGINT) {
        $service.stop;
        done;
    }
}

Where the Test::Client is just a mock of the actual thing. The actual intent is to have multiple Subscriptions each associated with possibly multiple Consumer (which would be handled by some SubscriptionSet and ConsumerSet Transformers in a similar way to which Cro::HTTP works.)

This all works fine (the actual thing not so much because of some scoping issue,) but it seems to me that the ClientConnection is all wrong, it's necessary because the Source has to produce something for the other components to consume (of course if one was aiming for a single Subscription then the BrokerConnection could deal with the subscription and emit the Message itself and omit the ClientConnection and Subscription altogether,) but really it is a facade for the underlying connection provided by the Source so doesn't need to have the produces or incoming as the next stages in the pipeline will simply receive the object and call the (possibly delegated,) methods on that.

So I think there needs to be an additional role which is basically like a Cro::Message but with less semantic baggage which can be passed between a Source and, say, a Transform which just implements behaviour specific to the application.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant