diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index b5a940f..f99cec3 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -5,6 +5,7 @@ on: branches: - master - axon-server-api-*.*.x + - persisted-streams workflow_dispatch: jobs: diff --git a/pom.xml b/pom.xml index a0662cd..fcf841f 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ 4.0.0 io.axoniq axon-server-api - 2023.0.2-SNAPSHOT + 2024.0.0-SNAPSHOT Axon Server API Public API for communication with AxonServer diff --git a/src/main/proto/streams.proto b/src/main/proto/streams.proto new file mode 100644 index 0000000..c5c3504 --- /dev/null +++ b/src/main/proto/streams.proto @@ -0,0 +1,125 @@ +syntax = "proto3"; +package io.axoniq.axonserver.grpc.streams; +import "event.proto"; +import "google/protobuf/empty.proto"; +option java_multiple_files = true; + +/* Service providing operations for persisted event streams, event streams where Axon Server keeps + track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define + the context. + */ +service EventStreamService { + /* Open a persisted event stream connection from a client. Creates the stream if it does not exist. */ + rpc OpenStream(stream StreamCommand) returns (stream StreamSignal) { + + } + + /* Deletes a persisted event stream. All existing connections to the stream are closed. */ + rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) { + + } + + /* Change properties of a persisted event stream. */ + rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) { + + } + + /* Returns a list of all persisted event streams defined (for the context). For each event stream it returns + the progress per segment. */ + rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) { + + } + + /* Returns the clients connected to all persisted event streams defined (for the context). For each client it returns + the segments that are received by the client. */ + rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) { + + } +} + +/* Contains the current status of a persisted stream */ +message StreamStatus { + string stream_id = 1; /* the unique identification of the stream */ + string stream_name = 2; /* a name for the stream */ + SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */ + string filter = 4; /* an expression to filter events */ + repeated SegmentPosition segments = 5; /* the last confirmed position per segment */ +} + +/* Contains the position per segment */ +message SegmentPosition { + int32 segment = 1; /* the segment number */ + int64 position = 2; /* the last confirmed position */ +} + +/* Contains the current connections for a persisted stream */ +message StreamConnections { + string stream_id = 1; /* the unique identification of the stream */ + string stream_name = 2; /* a name for the stream */ + repeated StreamConnection segments_per_connection = 3; /* the segments held per client */ +} + +/* Contains the segments per client */ +message StreamConnection { + string client_id = 1; /* the unique identification of the client */ + repeated int32 segments = 2; /* a list of segment numbers */ +} + +/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection + to connect to a stream and can then submit ProgressRequest messages to report progress. */ +message StreamCommand { + oneof request { + OpenRequest open = 1; /* the initial message to connect to a stream */ + ProgressRequest progress = 2; /* sends progress in processing events to Axon Server */ + } +} + +/* Request to delete a persisted stream */ +message DeleteStreamRequest { + string stream_id = 1; /* the unique identification of the stream */ +} + +/* Request to update the properties of a persisted stream */ +message UpdateStreamRequest { + string stream_id = 1; /* the unique identification of the stream */ + int32 segments = 2; /* Request to change the number of segments */ + string stream_name = 3; /* Request to change the name of the stream */ +} + + +/* Request to open a connection to a persisted stream */ +message OpenRequest { + string stream_id = 1; /* the unique identification of the stream */ + string client_id = 2; /* the unique identification of the client */ + InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */ +} + +/* Properties to create the stream if it does not exist */ +message InitializationProperties { + int32 segments = 1; /* the initial number of segments */ + int64 initial_position = 2; /* the position in the event store to start reading from */ + SequencingPolicy sequencing_policy = 3; /* the sequencing policy */ + string filter = 4; /* an expression to filter events */ + string stream_name = 5; /* a name for the stream */ +} + +/* Defines the policy used to distribute events across segments */ +message SequencingPolicy { + string policy_name = 1; /* the name of the sequencing policy */ + repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */ +} + +/* Message to report progress of event processing for a specific segment in a stream */ +message ProgressRequest { + int32 segment = 1; /* the segment number */ + int64 position = 2; /* the position of the last processed event */ +} + +/* Message sent by Axon Server to the client stream connection */ +message StreamSignal { + int32 segment = 1; /* the segment number */ + oneof type { + io.axoniq.axonserver.grpc.event.EventWithToken event = 2; /* an event to process in the client */ + bool closed = 3; /* indicates that the segment is closed by Axon Server */ + } +} \ No newline at end of file