Skip to content

Commit

Permalink
Merge pull request #58 from AxonIQ/persisted-streams
Browse files Browse the repository at this point in the history
Persistent streams
  • Loading branch information
MGathier authored Feb 27, 2024
2 parents ddd0588 + d902dae commit 4ba7b4f
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.axoniq</groupId>
<artifactId>axon-server-api</artifactId>
<version>2023.0.2-SNAPSHOT</version>
<version>2024.0.0-SNAPSHOT</version>
<name>Axon Server API</name>
<description>Public API for communication with AxonServer</description>

Expand Down
125 changes: 125 additions & 0 deletions src/main/proto/persistent-streams.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
syntax = "proto3";
package io.axoniq.axonserver.grpc.streams;
import "event.proto";
import "google/protobuf/empty.proto";
option java_multiple_files = true;

/* Service providing operations for persistent event streams, event streams where Axon Server keeps
track of the progress. All operations require a header (AxonIQ-Context) to be passed with each request to define
the context.
*/
service PersistentStreamService {
/* Open a persistent event stream connection from a client. Creates the stream if it does not exist. */
rpc OpenStream(stream StreamRequest) returns (stream StreamSignal) {

}

/* Deletes a persistent event stream. All existing connections to the stream are closed. */
rpc DeleteStream(DeleteStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Change properties of a persistent event stream. */
rpc UpdateStream(UpdateStreamRequest) returns (stream google.protobuf.Empty) {

}

/* Returns a list of all persistent event streams defined (for the context). For each event stream it returns
the progress per segment. */
rpc ListStreams(google.protobuf.Empty) returns (stream StreamStatus) {

}

/* Returns the clients connected to all persistent event streams defined (for the context). For each client it returns
the segments that are received by the client. */
rpc ListConnections(google.protobuf.Empty) returns (stream StreamConnections) {

}
}

/* Contains the current status of a persistent stream */
message StreamStatus {
string stream_id = 1; /* the unique identification of the persistent stream */
string stream_name = 2; /* a name for the persistent stream */
SequencingPolicy sequencing_policy = 3; /* the policy used to distribute events across segments. */
string filter = 4; /* an expression to filter events, same syntax as used for ad-hoc queries on the event store */
repeated SegmentPosition segments = 5; /* the last confirmed position per segment */
}

/* Contains the position per segment */
message SegmentPosition {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the last confirmed position */
}

/* Contains the current connections for a persistent stream */
message StreamConnections {
string stream_id = 1; /* the unique identification of the stream */
string stream_name = 2; /* a name for the stream */
repeated StreamConnection segments_per_connection = 3; /* the segments held per client */
}

/* Contains the segments per client */
message StreamConnection {
string client_id = 1; /* the unique identification of the client */
repeated int32 segments = 2; /* a list of segment numbers */
}

/* Request to set up a connection to a stream. Clients should first submit an OpenRequest on this connection
to connect to a stream and can then submit Acknowledgement messages to report progress. */
message StreamRequest {
oneof request {
Open open = 1; /* the initial message to connect to a stream */
ProgressAcknowledgement acknowledgeProgress = 2; /* sends progress in processing events to Axon Server */
}
}

/* Request to delete a persistent stream */
message DeleteStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
}

/* Request to update the properties of a persistent stream */
message UpdateStreamRequest {
string stream_id = 1; /* the unique identification of the stream */
int32 segments = 2; /* Request to change the number of segments */
string stream_name = 3; /* Request to change the name of the stream */
}


/* Request to open a connection to a persistent stream */
message Open {
string stream_id = 1; /* the unique identification of the stream */
string client_id = 2; /* the unique identification of the client */
InitializationProperties initialization_properties = 3; /* properties to create the stream if it does not exist */
}

/* Properties to create the stream if it does not exist */
message InitializationProperties {
int32 segments = 1; /* the initial number of segments */
int64 initial_position = 2; /* the position in the event store to start reading from */
SequencingPolicy sequencing_policy = 3; /* the sequencing policy */
string filter = 4; /* an expression to filter events, same syntax as used for ad-hoc queries on the event store */
string stream_name = 5; /* a name for the persistent stream */
}

/* Defines the policy used to distribute events across segments. The policy name must be known on the server. */
message SequencingPolicy {
string policy_name = 1; /* the name of the sequencing policy */
repeated string parameter = 2; /* optional list of parameters used by the sequencing policy */
}

/* Message to report progress of event processing for a specific segment in a stream */
message ProgressAcknowledgement {
int32 segment = 1; /* the segment number */
int64 position = 2; /* the position of the last processed event */
}

/* Message sent by Axon Server to the client stream connection */
message StreamSignal {
int32 segment = 1; /* the segment number */
oneof type {
io.axoniq.axonserver.grpc.event.EventWithToken event = 2; /* an event to process in the client */
bool closed = 3; /* indicates that the segment is closed by Axon Server */
}
}

0 comments on commit 4ba7b4f

Please sign in to comment.