Skip to content

Commit

Permalink
feat: defines and implements pull streams as provider data returning …
Browse files Browse the repository at this point in the history
…only
  • Loading branch information
rafamel committed Jul 29, 2020
1 parent 6eff803 commit 889b444
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 256 deletions.
128 changes: 51 additions & 77 deletions assets/Consume.puml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@startuml consume

partition "Initialize (Synchronous)" {
partition "Initialize" {
start
: Stream.consume();
: Call **Provider** Executor;
Expand All @@ -9,103 +9,90 @@ partition "Initialize (Synchronous)" {
stop
else (no)
: Provider.open(): Primer;
if (Error?) then (yes)
: Provider.close();
: Throw **Error**;
stop
else (no)
if (Error?) then (no)
: Call **Consumer** Executor;
if (Error?) then (yes)
: Provider.close();
: Throw **Error**;
stop
else (no)
: Consumer.open(primer: Primer): I;
: Consumer.open(primer: Primer): void;
if (Error?) then (yes)
: Consumer.close();
: Provider.close();
: Throw **Error**;
stop
else (no)
: Return Broker;
: Return **StreamBroker**;
: Start **Asynchronous** Process;
(P)
detach
endif
endif
else (yes)
: Provider.close();
: Throw **Error**;
stop
endif
endif
}

partition "Provider (Asynchronous)" {
partition "Process" {
(P)
if (Externally Cancelled?) then (yes)
if (Externally cancelled?) then (yes)
stop
else (no)
if (Provider.data exists?) then (no)
(F)
detach
else (yes)
: Provider.data(value: I): StreamResult<O>;
: Provider.data(): StreamResponse<T>;
if (Error?) then (yes)
' Externally Cancelled?
: Consumer.error(error: Error): StreamResult<I>;
if (Error?) then (yes)
(T)
detach
else (no)
if (Done?) then (yes)
(F)
detach
if (Consumer.close exists?) then (no)
if (Externally cancelled?) then (yes)
stop
else (no)
(P)
detach
: Provider.close();
: Throw **Uncaught Error**;
stop
endif
endif
else (no)
if (Done?) then (yes)
else (yes)
(F)
detach
else (no)
(C)
detach
endif
endif
endif
endif
}

partition "Consumer (Asynchronous)" {
(C)
if (Externally Cancelled?) then (yes)
stop
else (no)
if (Consumer.data exists?) then (no)
(F)
detach
else (yes)
: Consumer.data(value: O): StreamResult<I>;
if (Error?) then (yes)
' Externally Cancelled?
: Provider.error(error: Error): StreamResult<O>;
if (Error?) then (yes)
(T)
detach
else (no)
if (Done?) then (yes)
(F)
detach
else (no)
(C)
detach
endif
endif
else (no)
if (Done?) then (yes)
if (Done, returns **{ done: true }**?) then (yes)
(F)
detach
else (no)
(P)
detach
if (Externally cancelled?) then (yes)
stop
else (no)
if (Consumer.data exists?) then (no)
(F)
detach
else (yes)
: Consumer.data(value: T): Resolve<void | boolean>;
if (Error?) then (yes)
if (Externally cancelled?) then (yes)
stop
else (no)
: Consumer.close();
: Provider.close();
: Throw **Uncaught Error**;
stop
endif
else (no)
if (Done, returns **true**?) then (yes)
(F)
detach
else (no)
(P)
detach
endif
endif
endif
endif
endif
endif
endif
Expand All @@ -114,10 +101,10 @@ partition "Consumer (Asynchronous)" {

partition Finalization {
(F)
if (Externally Cancelled?) then (yes)
if (Externally cancelled?) then (yes)
stop
else (no)
: Consumer.close();
: Consumer.close(error?: Error): void;
if (Error?) then (yes)
: Provider.close();
: Throw **Uncaught Error**;
Expand All @@ -134,19 +121,7 @@ partition Finalization {
endif
}

partition Termination {
(T)
if (Externally Cancelled?) then (yes)
stop
else (no)
: Consumer.close();
: Provider.close();
: Throw **Uncaught Error**;
stop
endif
}

partition "External Cancellation (Synchronous)" {
partition "External Cancellation" {
start
: Broker.cancel();
if (Done?) then (yes)
Expand All @@ -163,7 +138,6 @@ partition "External Cancellation (Synchronous)" {
: Throw **Error**;
stop
else (no)
: Return;
stop
endif
endif
Expand Down
65 changes: 28 additions & 37 deletions assets/Definitions.puml
Original file line number Diff line number Diff line change
@@ -1,65 +1,56 @@
@startuml Classes

interface StreamProvider<O, I, Primer> {
first(value: I): Primer
data(value: I): StreamResult<O>
error(error: Error): StreamResult<O>
interface StreamProvider<T, Primer> {
open(): Primer
data(): StreamResult<T>
close(): void
}

interface StreamConsumer<O, I> {
first(): I
data(value: O): StreamResult<I>
error(error: Error): StreamResult<I>
close(): void
interface StreamConsumer<T, Primer> {
open(primer: Primer): void
data(value: T): boolean | void
close(error?: Error): void
}

interface StreamResult<T> {
done: boolean
value: T | Promise<T>
value: T
}

interface StreamBroker {
done: boolean
cancel(): void
}

interface ProcedureStream<O, I, Primer> {
interface BroadStream<T, Primer> {
primer(): Primer;
engage(): StreamProvider<O, I, Primer>
consume(executor: (): StreamConsumer<O, I, Primer>): Broker
}

PureStream <|-- ProcedureStream
interface PureStream<O, Primer> {
primer(): Primer
engage(): StreamProvider<O, void, Primer>
consume(executor: (): StreamConsumer<O, void, Primer>): Broker
execute(): Provider<T, Primer>
consume(executor: (): StreamConsumer<T, Primer>): StreamBroker
}

ForeStream <|-- PureStream
interface ForeStream<O> {
primer(): O
engage(): StreamProvider<O, void, O>
consume(executor: (): StreamConsumer<O, void, O>): Broker
PureStream <|-- BroadStream
interface PureStream<T> {
primer(): void
engage(): Provider<T, void>
consume(executor: (): StreamConsumer<T, void>): StreamBroker
}

Stream <|.. ProcedureStream
class Stream<O, I, Primer> {
+constructor(executor: (): StreamProvider<O, I, Primer>): Stream<O, I, Primer>
ForeStream <|-- BroadStream
interface ForeStream<T> {
primer(): T
engage(): Provider<T, T>
consume(executor: (): StreamConsumer<T, T>): StreamBroker
}

PushStream <|.. PureStream
PushStream <|-- Stream
class PushStream<O, Primer> {
+subscribe(): Broker
Stream <|-- BroadStream
class Stream<T, Primer> {
+constructor(executor: (): StreamProvider<T, Primer>): Stream<T, Primer>
}

SubjectStream <|-- PushStream
class SubjectStream<O, Primer> {
+data(value: O): void
+error(error: Error): void
+done(): void
SubjectStream <|.. Stream
class SubjectStream<T, Primer> {
+data(value: T): void
+close(error?: Error): void
}

@enduml
71 changes: 28 additions & 43 deletions src/definitions/streams.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,42 @@
import { MultiPipe } from 'pipettes/dist/types/multi';

/* Stream */
export interface StreamConstructor {
new <O, I = void, Primer = void>(
executor: () => Provider<O, I, Primer>
): Stream<O, I, Primer>;
}

export interface Stream<O, I, Primer> {
probe(): Primer;
engage(): Provider<O, I, Primer>;
consume(executor: () => Consumer<O, I, Primer>): Broker;
pipe: MultiPipe<Stream<O, I, Primer>, Stream<O, I, Primer>, false, true>;
new <T, Primer extends T | void = void>(
executor: ProviderExecutor<T, Primer>
): Stream<T, Primer>;
}

export interface PushStream<O, Primer> extends Stream<O, void, Primer> {
subscribe(): Broker;
export interface Stream<T, Primer extends T | void> {
primer(): Primer;
execute(): Provider<T, Primer>;
consume(executor: ConsumerExecutor<T, Primer>): Broker;
}

export interface SubjectStream<O, Primer = any> extends PushStream<O, Primer> {
data(value: O): void;
error(error: Error): void;
done(): void;
export interface SubjectStream<T, Primer extends T | void = any>
extends Stream<T, Primer> {
data(value: T): void;
close(error?: Error): void;
}

/* Provider */
export interface Provider<O, I, Primer> {
open?(): Primer;
data?(value: I): Resolve<Response<O>>;
error?(error: Error): Resolve<Response<O>>;
close?(): void;
export type ProviderExecutor<T, Primer extends T | void> = () => Partial<
Provider<T, Primer>
>;

export interface Provider<T, Primer extends T | void> {
open(): Primer;
data(): Response<T>;
close(): void;
}

/* Consumer */
export type Consumer<O, I, Primer> =
| ProcedureConsumer<O, I, Primer>
| (I extends void | undefined
? PartialConsumer<O, I, Primer>
: ProcedureConsumer<O, I, Primer>);
export type ConsumerExecutor<T, Primer extends T | void> = () => Partial<
Consumer<T, Primer>
>;

export interface PartialConsumer<O, I, Primer> {
open?(primer: Primer): I;
data?(value: O): Resolve<Response<I>>;
error?(error: Error): Resolve<Response<I>>;
close?(): void;
}

export interface ProcedureConsumer<O, I, Primer>
extends PartialConsumer<O, I, Primer> {
open(primer: Primer): I;
data(value: O): Resolve<Response<I>>;
export interface Consumer<T, Primer extends T | void> {
open(primer: Primer): void;
data(value: T): Resolve<void | boolean>;
close(error?: Error): void;
}

/* Broker */
Expand All @@ -59,11 +46,9 @@ export interface Broker {
}

/* Response */
export type Resolve<T> = T | Promise<T>;
export type Response<T> = Resolve<Result<T>>;

export type Response<T> =
| Result<T>
| (T extends void | undefined ? Result<T> | void : Result<T>);
export type Resolve<T> = T | Promise<T>;

export type Result<T> =
| { done: true; value?: void }
Expand Down
Loading

0 comments on commit 889b444

Please sign in to comment.