Skip to content

Commit

Permalink
add protobuf definition for persisted streams
Browse files Browse the repository at this point in the history
  • Loading branch information
MGathier committed Feb 6, 2024
1 parent ddd0588 commit 732da54
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- master
- axon-server-api-*.*.x
- persisted-streams
workflow_dispatch:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.axoniq</groupId>
<artifactId>axon-server-api</artifactId>
<version>2023.0.2-SNAPSHOT</version>
<version>2024.0.0-SNAPSHOT</version>
<name>Axon Server API</name>
<description>Public API for communication with AxonServer</description>

Expand Down
125 changes: 125 additions & 0 deletions src/main/proto/streams.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
syntax = "proto3";
package io.axoniq.axonserver.grpc.streams;
import "event.proto";
import "google/protobuf/empty.proto";
option java_multiple_files = true;

/* Service providing operations for persisted event streams, event streams where Axon Server keeps
track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define
the context.
*/
service EventStreamService {
/* Open a persisted event stream connection from a client. Creates the stream if it does not exist. */
rpc OpenStream(stream StreamCommand) returns (stream StreamSignal) {

}

/* Deletes a persisted event stream. All existing connections to the stream are closed. */
rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Change properties of a persisted event stream. */
rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Returns a list of all persisted event streams defined (for the context). For each event stream it returns
the progress per segment. */
rpc ListStreams( google.protobuf.Empty) returns (stream StreamStatus) {

}

/* Returns the clients connected to all persisted event streams defined (for the context). For each client it returns
the segments that are received by the client. */
rpc ListConnections( google.protobuf.Empty) returns (stream StreamConnections) {

}
}

/* Contains the current status of a persisted stream */
message StreamStatus {
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */
string filter = 4; /* an expression to filter events */
repeated SegmentPosition segments = 5; /* the last confirmed position per segment */
}

/* Contains the position per segment */
message SegmentPosition {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the last confirmed position */
}

/* Contains the current connections for a persisted stream */
message StreamConnections {
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
repeated StreamConnection segments_per_connection = 3; /* the segments held per client */
}

/* Contains the segments per client */
message StreamConnection {
string client_id = 1; /* the unique identification of the client */
repeated int32 segments = 2; /* a list of segment numbers */
}

/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection
to connect to a stream and can then submit ProgressRequest messages to report progress. */
message StreamCommand {
oneof request {
OpenRequest open = 1; /* the initial message to connect to a stream */
ProgressRequest progress = 2; /* sends progress in processing events to Axon Server */
}
}

/* Request to delete a persisted stream */
message DeleteStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
}

/* Request to update the properties of a persisted stream */
message UpdateStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
int32 segments = 2; /* Request to change the number of segments */
string stream_name = 3; /* Request to change the name of the stream */
}


/* Request to open a connection to a persisted stream */
message OpenRequest {
string stream_id = 1; /* the unique identification of the stream */
string client_id = 2; /* the unique identification of the client */
InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */
}

/* Properties to create the stream if it does not exist */
message InitializationProperties {
int32 segments = 1; /* the initial number of segments */
int64 initial_position = 2; /* the position in the event store to start reading from */
SequencingPolicy sequencing_policy = 3; /* the sequencing policy */
string filter = 4; /* an expression to filter events */
string stream_name = 5; /* a name for the stream */
}

/* Defines the policy used to distribute events across segments */
message SequencingPolicy {
string policy_name = 1; /* the name of the sequencing policy */
repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */
}

/* Message to report progress of event processing for a specific segment in a stream */
message ProgressRequest {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the position of the last processed event */
}

/* Message sent by Axon Server to the client stream connection */
message StreamSignal {
int32 segment = 1; /* the segment number */
oneof type {
io.axoniq.axonserver.grpc.event.EventWithToken event = 2; /* an event to process in the client */
bool closed = 3; /* indicates that the segment is closed by Axon Server */
}
}

0 comments on commit 732da54

Please sign in to comment.