From f38cf0c49dfbf257e8b18b606b6472d066ab3b2d Mon Sep 17 00:00:00 2001 From: w-h-a Date: Tue, 2 Jul 2024 07:14:57 -0700 Subject: [PATCH] feat: get event streaming working --- proto/streams/streams.pb.go | 508 ++++++++++++++++++++++++------- proto/streams/streams.proto | 40 ++- server/grpcserver/grpc_stream.go | 2 +- server/options.go | 4 +- streams/domain.go | 24 +- streams/options.go | 107 ++++++- streams/streams.go | 18 +- 7 files changed, 569 insertions(+), 134 deletions(-) diff --git a/proto/streams/streams.pb.go b/proto/streams/streams.pb.go index c16820a..9630c04 100644 --- a/proto/streams/streams.pb.go +++ b/proto/streams/streams.pb.go @@ -100,20 +100,22 @@ func (x *Event) GetMetadata() map[string]string { return nil } -// produce request/response -type ProduceRequest struct { +// subscribe request/response +type SubscribeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` - Timestampt int64 `protobuf:"varint,3,opt,name=timestampt,proto3" json:"timestampt,omitempty"` - Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Group string `protobuf:"bytes,2,opt,name=group,proto3" json:"group,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` + AutoAck bool `protobuf:"varint,4,opt,name=auto_ack,json=autoAck,proto3" json:"auto_ack,omitempty"` + AckWait int64 `protobuf:"varint,5,opt,name=ack_wait,json=ackWait,proto3" json:"ack_wait,omitempty"` + RetryLimit int64 `protobuf:"varint,6,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` } -func (x *ProduceRequest) Reset() { - *x = ProduceRequest{} +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} if protoimpl.UnsafeEnabled { mi := &file_proto_streams_streams_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -121,13 +123,13 @@ func (x *ProduceRequest) Reset() { } } -func (x *ProduceRequest) String() string { +func (x *SubscribeRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ProduceRequest) ProtoMessage() {} +func (*SubscribeRequest) ProtoMessage() {} -func (x *ProduceRequest) ProtoReflect() protoreflect.Message { +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { mi := &file_proto_streams_streams_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -139,47 +141,61 @@ func (x *ProduceRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ProduceRequest.ProtoReflect.Descriptor instead. -func (*ProduceRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { return file_proto_streams_streams_proto_rawDescGZIP(), []int{1} } -func (x *ProduceRequest) GetTopic() string { +func (x *SubscribeRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *SubscribeRequest) GetGroup() string { + if x != nil { + return x.Group + } + return "" +} + +func (x *SubscribeRequest) GetTopic() string { if x != nil { return x.Topic } return "" } -func (x *ProduceRequest) GetPayload() []byte { +func (x *SubscribeRequest) GetAutoAck() bool { if x != nil { - return x.Payload + return x.AutoAck } - return nil + return false } -func (x *ProduceRequest) GetTimestampt() int64 { +func (x *SubscribeRequest) GetAckWait() int64 { if x != nil { - return x.Timestampt + return x.AckWait } return 0 } -func (x *ProduceRequest) GetMetadata() map[string]string { +func (x *SubscribeRequest) GetRetryLimit() int64 { if x != nil { - return x.Metadata + return x.RetryLimit } - return nil + return 0 } -type ProduceResponse struct { +type SubscribeResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields } -func (x *ProduceResponse) Reset() { - *x = ProduceResponse{} +func (x *SubscribeResponse) Reset() { + *x = SubscribeResponse{} if protoimpl.UnsafeEnabled { mi := &file_proto_streams_streams_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -187,13 +203,13 @@ func (x *ProduceResponse) Reset() { } } -func (x *ProduceResponse) String() string { +func (x *SubscribeResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ProduceResponse) ProtoMessage() {} +func (*SubscribeResponse) ProtoMessage() {} -func (x *ProduceResponse) ProtoReflect() protoreflect.Message { +func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { mi := &file_proto_streams_streams_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -205,29 +221,110 @@ func (x *ProduceResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ProduceResponse.ProtoReflect.Descriptor instead. -func (*ProduceResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeResponse) Descriptor() ([]byte, []int) { return file_proto_streams_streams_proto_rawDescGZIP(), []int{2} } +// unsubscribe request/response +type UnsubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *UnsubscribeRequest) Reset() { + *x = UnsubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_streams_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnsubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnsubscribeRequest) ProtoMessage() {} + +func (x *UnsubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_streams_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnsubscribeRequest.ProtoReflect.Descriptor instead. +func (*UnsubscribeRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_streams_proto_rawDescGZIP(), []int{3} +} + +func (x *UnsubscribeRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type UnsubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *UnsubscribeResponse) Reset() { + *x = UnsubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_streams_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UnsubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UnsubscribeResponse) ProtoMessage() {} + +func (x *UnsubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_streams_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UnsubscribeResponse.ProtoReflect.Descriptor instead. +func (*UnsubscribeResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_streams_proto_rawDescGZIP(), []int{4} +} + // consume request (stream Event response) type ConsumeRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Group string `protobuf:"bytes,2,opt,name=group,proto3" json:"group,omitempty"` - Offset int64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` - AutoAck bool `protobuf:"varint,4,opt,name=auto_ack,json=autoAck,proto3" json:"auto_ack,omitempty"` - AckWait int64 `protobuf:"varint,5,opt,name=ack_wait,json=ackWait,proto3" json:"ack_wait,omitempty"` - RetryLimit int64 `protobuf:"varint,6,opt,name=retry_limit,json=retryLimit,proto3" json:"retry_limit,omitempty"` + Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` } func (x *ConsumeRequest) Reset() { *x = ConsumeRequest{} if protoimpl.UnsafeEnabled { - mi := &file_proto_streams_streams_proto_msgTypes[3] + mi := &file_proto_streams_streams_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -240,7 +337,7 @@ func (x *ConsumeRequest) String() string { func (*ConsumeRequest) ProtoMessage() {} func (x *ConsumeRequest) ProtoReflect() protoreflect.Message { - mi := &file_proto_streams_streams_proto_msgTypes[3] + mi := &file_proto_streams_streams_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -253,51 +350,182 @@ func (x *ConsumeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ConsumeRequest.ProtoReflect.Descriptor instead. func (*ConsumeRequest) Descriptor() ([]byte, []int) { - return file_proto_streams_streams_proto_rawDescGZIP(), []int{3} + return file_proto_streams_streams_proto_rawDescGZIP(), []int{5} } -func (x *ConsumeRequest) GetTopic() string { +func (x *ConsumeRequest) GetOffset() int64 { if x != nil { - return x.Topic + return x.Offset } - return "" + return 0 } -func (x *ConsumeRequest) GetGroup() string { +// ack request (no response) +type AckRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` +} + +func (x *AckRequest) Reset() { + *x = AckRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_streams_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AckRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AckRequest) ProtoMessage() {} + +func (x *AckRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_streams_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AckRequest.ProtoReflect.Descriptor instead. +func (*AckRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_streams_proto_rawDescGZIP(), []int{6} +} + +func (x *AckRequest) GetId() string { if x != nil { - return x.Group + return x.Id } return "" } -func (x *ConsumeRequest) GetOffset() int64 { +func (x *AckRequest) GetSuccess() bool { if x != nil { - return x.Offset + return x.Success } - return 0 + return false +} + +// produce request/response +type ProduceRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + Timestampt int64 `protobuf:"varint,3,opt,name=timestampt,proto3" json:"timestampt,omitempty"` + Metadata map[string]string `protobuf:"bytes,4,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *ProduceRequest) Reset() { + *x = ProduceRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_streams_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProduceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProduceRequest) ProtoMessage() {} + +func (x *ProduceRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_streams_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProduceRequest.ProtoReflect.Descriptor instead. +func (*ProduceRequest) Descriptor() ([]byte, []int) { + return file_proto_streams_streams_proto_rawDescGZIP(), []int{7} } -func (x *ConsumeRequest) GetAutoAck() bool { +func (x *ProduceRequest) GetTopic() string { if x != nil { - return x.AutoAck + return x.Topic } - return false + return "" } -func (x *ConsumeRequest) GetAckWait() int64 { +func (x *ProduceRequest) GetPayload() []byte { if x != nil { - return x.AckWait + return x.Payload } - return 0 + return nil } -func (x *ConsumeRequest) GetRetryLimit() int64 { +func (x *ProduceRequest) GetTimestampt() int64 { if x != nil { - return x.RetryLimit + return x.Timestampt } return 0 } +func (x *ProduceRequest) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +type ProduceResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ProduceResponse) Reset() { + *x = ProduceResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_streams_streams_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProduceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProduceResponse) ProtoMessage() {} + +func (x *ProduceResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_streams_streams_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProduceResponse.ProtoReflect.Descriptor instead. +func (*ProduceResponse) Descriptor() ([]byte, []int) { + return file_proto_streams_streams_proto_rawDescGZIP(), []int{8} +} + var File_proto_streams_streams_proto protoreflect.FileDescriptor var file_proto_streams_streams_proto_rawDesc = []byte{ @@ -317,36 +545,47 @@ var file_proto_streams_streams_proto_rawDesc = []byte{ 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xe0, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, - 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x18, - 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x74, 0x12, 0x41, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, - 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x73, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, - 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x64, - 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xab, 0x01, 0x0a, 0x0e, - 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, - 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, - 0x6f, 0x70, 0x69, 0x63, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, - 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x75, 0x74, 0x6f, 0x5f, 0x61, 0x63, 0x6b, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x61, 0x75, 0x74, 0x6f, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, - 0x08, 0x61, 0x63, 0x6b, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, - 0x07, 0x61, 0x63, 0x6b, 0x57, 0x61, 0x69, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x72, - 0x79, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, - 0x65, 0x74, 0x72, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x42, 0x24, 0x5a, 0x22, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x77, 0x2d, 0x68, 0x2d, 0x61, 0x2f, 0x70, 0x6b, - 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa5, 0x01, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, + 0x6f, 0x75, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x6f, 0x75, 0x70, + 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x75, 0x74, 0x6f, 0x5f, 0x61, + 0x63, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x61, 0x75, 0x74, 0x6f, 0x41, 0x63, + 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x63, 0x6b, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x63, 0x6b, 0x57, 0x61, 0x69, 0x74, 0x12, 0x1f, 0x0a, 0x0b, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x13, 0x0a, + 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x24, 0x0a, 0x12, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x55, 0x6e, 0x73, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x28, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x36, 0x0a, 0x0a, 0x41, 0x63, 0x6b, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, + 0x73, 0x22, 0xe0, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x74, 0x12, 0x41, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, + 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x22, 0x11, 0x0a, 0x0f, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x24, 0x5a, 0x22, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x77, 0x2d, 0x68, 0x2d, 0x61, 0x2f, 0x70, 0x6b, 0x67, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -361,23 +600,28 @@ func file_proto_streams_streams_proto_rawDescGZIP() []byte { return file_proto_streams_streams_proto_rawDescData } -var file_proto_streams_streams_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_proto_streams_streams_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_proto_streams_streams_proto_goTypes = []interface{}{ - (*Event)(nil), // 0: streams.Event - (*ProduceRequest)(nil), // 1: streams.ProduceRequest - (*ProduceResponse)(nil), // 2: streams.ProduceResponse - (*ConsumeRequest)(nil), // 3: streams.ConsumeRequest - nil, // 4: streams.Event.MetadataEntry - nil, // 5: streams.ProduceRequest.MetadataEntry + (*Event)(nil), // 0: streams.Event + (*SubscribeRequest)(nil), // 1: streams.SubscribeRequest + (*SubscribeResponse)(nil), // 2: streams.SubscribeResponse + (*UnsubscribeRequest)(nil), // 3: streams.UnsubscribeRequest + (*UnsubscribeResponse)(nil), // 4: streams.UnsubscribeResponse + (*ConsumeRequest)(nil), // 5: streams.ConsumeRequest + (*AckRequest)(nil), // 6: streams.AckRequest + (*ProduceRequest)(nil), // 7: streams.ProduceRequest + (*ProduceResponse)(nil), // 8: streams.ProduceResponse + nil, // 9: streams.Event.MetadataEntry + nil, // 10: streams.ProduceRequest.MetadataEntry } var file_proto_streams_streams_proto_depIdxs = []int32{ - 4, // 0: streams.Event.metadata:type_name -> streams.Event.MetadataEntry - 5, // 1: streams.ProduceRequest.metadata:type_name -> streams.ProduceRequest.MetadataEntry - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 9, // 0: streams.Event.metadata:type_name -> streams.Event.MetadataEntry + 10, // 1: streams.ProduceRequest.metadata:type_name -> streams.ProduceRequest.MetadataEntry + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_proto_streams_streams_proto_init() } @@ -399,7 +643,7 @@ func file_proto_streams_streams_proto_init() { } } file_proto_streams_streams_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ProduceRequest); i { + switch v := v.(*SubscribeRequest); i { case 0: return &v.state case 1: @@ -411,7 +655,7 @@ func file_proto_streams_streams_proto_init() { } } file_proto_streams_streams_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ProduceResponse); i { + switch v := v.(*SubscribeResponse); i { case 0: return &v.state case 1: @@ -423,6 +667,30 @@ func file_proto_streams_streams_proto_init() { } } file_proto_streams_streams_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnsubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_streams_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UnsubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_streams_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ConsumeRequest); i { case 0: return &v.state @@ -434,6 +702,42 @@ func file_proto_streams_streams_proto_init() { return nil } } + file_proto_streams_streams_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AckRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_streams_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProduceRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_streams_streams_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProduceResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -441,7 +745,7 @@ func file_proto_streams_streams_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_streams_streams_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/streams/streams.proto b/proto/streams/streams.proto index cfa3824..0c34b62 100644 --- a/proto/streams/streams.proto +++ b/proto/streams/streams.proto @@ -13,6 +13,36 @@ message Event { map metadata = 5; } +// subscribe request/response +message SubscribeRequest { + string id = 1; + string group = 2; + string topic = 3; + bool auto_ack = 4; + int64 ack_wait = 5; + int64 retry_limit = 6; +} + +message SubscribeResponse {} + +// unsubscribe request/response +message UnsubscribeRequest { + string id = 1; +} + +message UnsubscribeResponse {} + +// consume request (stream Event response) +message ConsumeRequest { + int64 offset = 1; +} + +// ack request (no response) +message AckRequest { + string id = 1; + bool success = 2; +} + // produce request/response message ProduceRequest { string topic = 1; @@ -22,13 +52,3 @@ message ProduceRequest { } message ProduceResponse {} - -// consume request (stream Event response) -message ConsumeRequest { - string topic = 1; - string group = 2; - int64 offset = 3; - bool auto_ack = 4; - int64 ack_wait = 5; - int64 retry_limit = 6; -} diff --git a/server/grpcserver/grpc_stream.go b/server/grpcserver/grpc_stream.go index cfbced2..6162709 100644 --- a/server/grpcserver/grpc_stream.go +++ b/server/grpcserver/grpc_stream.go @@ -34,4 +34,4 @@ func (s *grpcStream) Error() error { func (s *grpcStream) Close() error { return nil -} \ No newline at end of file +} diff --git a/server/options.go b/server/options.go index cdee400..3ac9d47 100644 --- a/server/options.go +++ b/server/options.go @@ -1,6 +1,8 @@ package server -import "context" +import ( + "context" +) type ServerOption func(o *ServerOptions) diff --git a/streams/domain.go b/streams/domain.go index a7c72b4..8d599f0 100644 --- a/streams/domain.go +++ b/streams/domain.go @@ -6,9 +6,9 @@ import ( "time" ) -type AckFunc func() error +type Ack func() error -type NackFunc func() error +type Nack func() error type Event struct { Id string @@ -16,32 +16,32 @@ type Event struct { Payload []byte Timestamp time.Time Metadata map[string]string - ackFunc AckFunc - nackFunc NackFunc + ack Ack + nack Nack } func (e *Event) Unmarshal(v interface{}) error { return json.Unmarshal(e.Payload, v) } -func (e *Event) SetAck(f AckFunc) { - e.ackFunc = f +func (e *Event) SetAck(f Ack) { + e.ack = f } func (e *Event) Ack() error { - if e.ackFunc == nil { + if e.ack == nil { return errors.New("no ack function set") } - return e.ackFunc() + return e.ack() } -func (e *Event) SetNack(f NackFunc) { - e.nackFunc = f +func (e *Event) SetNack(f Nack) { + e.nack = f } func (e *Event) Nack() error { - if e.nackFunc == nil { + if e.nack == nil { return errors.New("no nack function set") } - return e.nackFunc() + return e.nack() } diff --git a/streams/options.go b/streams/options.go index 9e505ae..d41c2b0 100644 --- a/streams/options.go +++ b/streams/options.go @@ -1,12 +1,72 @@ package streams -type ProduceOption func(o *ProduceOptions) +import ( + "context" + "time" -type ProduceOptions struct { + "github.com/google/uuid" +) + +type StreamsOption func(o *StreamsOptions) + +type StreamsOptions struct { + Context context.Context } -func NewProduceOptions(opts ...ProduceOption) ProduceOptions { - options := ProduceOptions{} +func NewStreamsOptions(opts ...StreamsOption) StreamsOptions { + options := StreamsOptions{ + Context: context.Background(), + } + + for _, fn := range opts { + fn(&options) + } + + return options +} + +type SubscribeOption func(o *SubscribeOptions) + +type SubscribeOptions struct { + Group string + Topic string + AutoAck bool + AckWait time.Duration + RetryLimit int +} + +func SubscribeWithGroup(n string) SubscribeOption { + return func(o *SubscribeOptions) { + o.Group = n + } +} + +func SubscribeWithTopic(t string) SubscribeOption { + return func(o *SubscribeOptions) { + o.Topic = t + } +} + +func SubscribeWithAck(ack bool, ackWait time.Duration) SubscribeOption { + return func(o *SubscribeOptions) { + o.AutoAck = ack + o.AckWait = ackWait + } +} + +func SubscribeWithRetryLimit(retries int) SubscribeOption { + return func(o *SubscribeOptions) { + o.RetryLimit = retries + } +} + +func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { + options := SubscribeOptions{ + Group: uuid.New().String(), + AutoAck: false, + AckWait: 4 * time.Second, + RetryLimit: 4, + } for _, fn := range opts { fn(&options) @@ -18,6 +78,13 @@ func NewProduceOptions(opts ...ProduceOption) ProduceOptions { type ConsumeOption func(o *ConsumeOptions) type ConsumeOptions struct { + Offset time.Time +} + +func ConsumeWithOffset(t time.Time) ConsumeOption { + return func(o *ConsumeOptions) { + o.Offset = t + } } func NewConsumeOptions(opts ...ConsumeOption) ConsumeOptions { @@ -29,3 +96,35 @@ func NewConsumeOptions(opts ...ConsumeOption) ConsumeOptions { return options } + +type ProduceOption func(o *ProduceOptions) + +type ProduceOptions struct { + Timestamp time.Time + Metadata map[string]string +} + +func ProduceWithTimestamp(t time.Time) ProduceOption { + return func(o *ProduceOptions) { + o.Timestamp = t + } +} + +func ProduceWithMetadata(md map[string]string) ProduceOption { + return func(o *ProduceOptions) { + o.Metadata = md + } +} + +func NewProduceOptions(opts ...ProduceOption) ProduceOptions { + options := ProduceOptions{ + Timestamp: time.Now(), + Metadata: map[string]string{}, + } + + for _, fn := range opts { + fn(&options) + } + + return options +} diff --git a/streams/streams.go b/streams/streams.go index fc290e4..84a7fe1 100644 --- a/streams/streams.go +++ b/streams/streams.go @@ -1,8 +1,18 @@ package streams -var () +import "errors" -type Stream interface { - Produce(topic string, payload interface{}, opts ...ProduceOption) error - Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) +var ( + ErrSubscriberNotFound = errors.New("failed to find subscriber") + ErrEncodingData = errors.New("failed to encode incoming data") + ErrEncodingEvent = errors.New("failed to encode outgoing event") +) + +type Streams interface { + Options() StreamsOptions + Subscribe(id string, opts ...SubscribeOption) error + Unsubscribe(id string) error + Consume(id string, opts ...ConsumeOption) (<-chan Event, error) + Produce(topic string, data interface{}, opts ...ProduceOption) error + String() string }