-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Persistent streams #58
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I just have a few suggestions for renaming some properties.
src/main/proto/streams.proto
Outdated
track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define | ||
the context. | ||
*/ | ||
service EventStreamService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: rename to PersistentStreamsService
I also suggest to change the name of the proto file to persistent-streams.proto
src/main/proto/streams.proto
Outdated
|
||
/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection | ||
to connect to a stream and can then submit ProgressRequest messages to report progress. */ | ||
message StreamCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid confusion with Command (Messages), I suggest renaming to StreamInstruction or StreamRequest
src/main/proto/streams.proto
Outdated
to connect to a stream and can then submit ProgressRequest messages to report progress. */ | ||
message StreamCommand { | ||
oneof request { | ||
OpenRequest open = 1; /* the initial message to connect to a stream */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the outer message is StreamRequest, this one could just be called "Open" or "OpenStream". Or since the description says "the initial message to connect to a stream", perhaps "Connect"?
src/main/proto/streams.proto
Outdated
} | ||
|
||
/* Message to report progress of event processing for a specific segment in a stream */ | ||
message ProgressRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other messaging systems typically call these message Acknowledgements or ACKs. Might be easier for people to understand if we use the same terminology. Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bunch of remarks, with a majority towards documentation.
.github/workflows/maven.yml
Outdated
@@ -5,6 +5,7 @@ on: | |||
branches: | |||
- master | |||
- axon-server-api-*.*.x | |||
- persisted-streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this in the PR, right? Or will the persisted-streams
branch live forever?
string stream_id = 1; /* the unique identification of the stream */ | ||
string stream_name = 2; /* a name for the stream */ | ||
SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */ | ||
string filter = 4; /* an expression to filter events */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for the filter
documentation to refer, one way or another, to the fact it uses the same query language as the QueryEventsRequest#query
field does?
|
||
/* Returns a list of all persistent event streams defined (for the context). For each event stream it returns | ||
the progress per segment. */ | ||
rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the indentation is slightly off:
rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) { | |
rpc ListStreams(google.protobuf.Empty) returns (stream StreamStatus) { |
|
||
/* Returns the clients connected to all persistent event streams defined (for the context). For each client it returns | ||
the segments that are received by the client. */ | ||
rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the indentation is slightly off:
rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) { | |
rpc ListConnections(google.protobuf.Empty) returns (stream StreamConnections) { |
string stream_id = 1; /* the unique identification of the stream */ | ||
string stream_name = 2; /* a name for the stream */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string stream_id = 1; /* the unique identification of the stream */ | |
string stream_name = 2; /* a name for the stream */ | |
string stream_id = 1; /* the unique identification of the persistent stream */ | |
string stream_name = 2; /* a name for the persistent stream */ |
int32 segments = 1; /* the initial number of segments */ | ||
int64 initial_position = 2; /* the position in the event store to start reading from */ | ||
SequencingPolicy sequencing_policy = 3; /* the sequencing policy */ | ||
string filter = 4; /* an expression to filter events */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for the filter
documentation to refer, one way or another, to the fact it uses the same query language as the QueryEventsRequest#query
field does?
int64 initial_position = 2; /* the position in the event store to start reading from */ | ||
SequencingPolicy sequencing_policy = 3; /* the sequencing policy */ | ||
string filter = 4; /* an expression to filter events */ | ||
string stream_name = 5; /* a name for the stream */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
string stream_name = 5; /* a name for the stream */ | |
string stream_name = 5; /* a name for the persistent stream */ |
message StreamRequest { | ||
oneof request { | ||
Open open = 1; /* the initial message to connect to a stream */ | ||
Acknowledgement acknowledgment = 2; /* sends progress in processing events to Axon Server */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From one part I like the name acknowledgement to acknowledge the progress, but from the other end it seems a bit out of the persistent stream context to me.
I think I'd expect something like UpdateProgress
or StreamPosition
, or SegmentPosition
(which already exists in this file), as that's what it updates/acknowledges about.
} | ||
|
||
/* Defines the policy used to distribute events across segments */ | ||
message SequencingPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the stream request fail if the policy_name
doesn't exist on the implementers side of this API?
/* Defines the policy used to distribute events across segments */ | ||
message SequencingPolicy { | ||
string policy_name = 1; /* the name of the sequencing policy */ | ||
repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the stream request fail if any of the parameters don't exist or are mistyped?
No description provided.