From dff88f75bbab2c75ea93817cdb9139923933d61f Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 6 Jul 2022 09:04:17 +0200 Subject: [PATCH] Migrate to elastic-agent-shipper-client (#32185) So, we stop copying the package to Beats and use a shared package instead. --- NOTICE.txt | 103 ++ dev-tools/notice/overrides.json | 1 + go.mod | 1 + go.sum | 3 + libbeat/api/routes.go.orig | 2 +- libbeat/outputs/shipper/api/shipper.pb.go | 912 ------------------ .../outputs/shipper/api/shipper_grpc.pb.go | 211 ---- libbeat/outputs/shipper/api/shipper_mock.go | 22 +- libbeat/outputs/shipper/shipper.go | 45 +- libbeat/outputs/shipper/shipper_test.go | 45 +- 10 files changed, 165 insertions(+), 1180 deletions(-) delete mode 100644 libbeat/outputs/shipper/api/shipper.pb.go delete mode 100644 libbeat/outputs/shipper/api/shipper_grpc.pb.go diff --git a/NOTICE.txt b/NOTICE.txt index 5e41ad12669f..91e379481bbb 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -6490,6 +6490,109 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/elastic-agent-shipper-client +Version: v0.2.0 +Licence type (autodetected): Elastic +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-shipper-client@v0.2.0/LICENSE.txt: + +Elastic License 2.0 + +URL: https://www.elastic.co/licensing/elastic-license + +## Acceptance + +By using the software, you agree to all of the terms and conditions below. + +## Copyright License + +The licensor grants you a non-exclusive, royalty-free, worldwide, +non-sublicensable, non-transferable license to use, copy, distribute, make +available, and prepare derivative works of the software, in each case subject to +the limitations and conditions below. + +## Limitations + +You may not provide the software to third parties as a hosted or managed +service, where the service provides users with access to any substantial set of +the features or functionality of the software. + +You may not move, change, disable, or circumvent the license key functionality +in the software, and you may not remove or obscure any functionality in the +software that is protected by the license key. + +You may not alter, remove, or obscure any licensing, copyright, or other notices +of the licensor in the software. Any use of the licensor’s trademarks is subject +to applicable law. + +## Patents + +The licensor grants you a license, under any patent claims the licensor can +license, or becomes able to license, to make, have made, use, sell, offer for +sale, import and have imported the software, in each case subject to the +limitations and conditions in this license. This license does not cover any +patent claims that you cause to be infringed by modifications or additions to +the software. If you or your company make any written claim that the software +infringes or contributes to infringement of any patent, your patent license for +the software granted under these terms ends immediately. If your company makes +such a claim, your patent license ends immediately for work on behalf of your +company. + +## Notices + +You must ensure that anyone who gets a copy of any part of the software from you +also gets a copy of these terms. + +If you modify the software, you must include in any modified copies of the +software prominent notices stating that you have modified the software. + +## No Other Rights + +These terms do not imply any licenses other than those expressly granted in +these terms. + +## Termination + +If you use the software in violation of these terms, such use is not licensed, +and your licenses will automatically terminate. If the licensor provides you +with a notice of your violation, and you cease all violation of this license no +later than 30 days after you receive that notice, your licenses will be +reinstated retroactively. However, if you violate these terms after such +reinstatement, any additional violation of these terms will cause your licenses +to terminate automatically and permanently. + +## No Liability + +*As far as the law allows, the software comes as is, without any warranty or +condition, and the licensor will not be liable to you for any damages arising +out of these terms or the use or nature of the software, under any kind of +legal claim.* + +## Definitions + +The **licensor** is the entity offering these terms, and the **software** is the +software the licensor makes available under these terms, including any portion +of it. + +**you** refers to the individual or entity agreeing to these terms. + +**your company** is any legal entity, sole proprietorship, or other kind of +organization that you work for, plus all organizations that have control over, +are under the control of, or are under common control with that +organization. **control** means ownership of substantially all the assets of an +entity, or the power to direct its management and policies by vote, contract, or +otherwise. Control can be direct or indirect. + +**your licenses** are all the licenses granted to you for the software under +these terms. + +**use** means anything you do with the software requiring one of your licenses. + +**trademark** means trademarks, service marks, and similar rights. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-system-metrics Version: v0.4.1 diff --git a/dev-tools/notice/overrides.json b/dev-tools/notice/overrides.json index a4e8a78e568f..2d9a3315e947 100644 --- a/dev-tools/notice/overrides.json +++ b/dev-tools/notice/overrides.json @@ -1,4 +1,5 @@ {"name": "github.com/elastic/elastic-agent-client/v7", "licenceType": "Elastic"} +{"name": "github.com/elastic/elastic-agent-shipper-client", "licenceType": "Elastic"} {"name": "github.com/gorhill/cronexpr", "licenceType": "Apache-2.0", "licenceFile":"APLv2"} {"name": "github.com/hashicorp/cronexpr", "licenceType": "Apache-2.0", "licenceFile":"APLv2"} {"name": "github.com/miekg/dns", "licenceType": "BSD"} diff --git a/go.mod b/go.mod index 093748c35af6..0b490630807f 100644 --- a/go.mod +++ b/go.mod @@ -162,6 +162,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.2.1 github.com/elastic/elastic-agent-libs v0.2.9 + github.com/elastic/elastic-agent-shipper-client v0.2.0 github.com/elastic/elastic-agent-system-metrics v0.4.1 github.com/elastic/go-elasticsearch/v8 v8.2.0 github.com/pierrec/lz4/v4 v4.1.15 diff --git a/go.sum b/go.sum index 5c76ec598bf5..71a34c205cb5 100644 --- a/go.sum +++ b/go.sum @@ -538,8 +538,11 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 h1 github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= github.com/elastic/elastic-agent-libs v0.2.2/go.mod h1:1xDLBhIqBIjhJ7lr2s+xRFFkQHpitSp8q2zzv1Dqg+s= github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= +github.com/elastic/elastic-agent-libs v0.2.7/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.9 h1:7jOCqNqEWG0kJb3fa8/SC6beSiys1TmAylH9+hWTnrM= github.com/elastic/elastic-agent-libs v0.2.9/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= +github.com/elastic/elastic-agent-shipper-client v0.2.0 h1:p+5ep48YCOe+3nICeWmiLwQV11yDLad2n4NunI66Shg= +github.com/elastic/elastic-agent-shipper-client v0.2.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.4.1 h1:1bKgU0Y2F4PBLSCX2LmJbRd4wWoq5DOvXc9ysuXBVpI= github.com/elastic/elastic-agent-system-metrics v0.4.1/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI= github.com/elastic/elastic-transport-go/v8 v8.1.0 h1:NeqEz1ty4RQz+TVbUrpSU7pZ48XkzGWQj02k5koahIE= diff --git a/libbeat/api/routes.go.orig b/libbeat/api/routes.go.orig index 3908eb2979bc..ed27379907b3 100644 --- a/libbeat/api/routes.go.orig +++ b/libbeat/api/routes.go.orig @@ -23,7 +23,7 @@ import ( _ "net/http/pprof" "net/url" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" diff --git a/libbeat/outputs/shipper/api/shipper.pb.go b/libbeat/outputs/shipper/api/shipper.pb.go deleted file mode 100644 index 8dc82e4f661a..000000000000 --- a/libbeat/outputs/shipper/api/shipper.pb.go +++ /dev/null @@ -1,912 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package api - -import ( - reflect "reflect" - sync "sync" - - status "google.golang.org/genproto/googleapis/rpc/status" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - structpb "google.golang.org/protobuf/types/known/structpb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type PublishRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Events []*Event `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` -} - -func (x *PublishRequest) Reset() { - *x = PublishRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PublishRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PublishRequest) ProtoMessage() {} - -func (x *PublishRequest) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. -func (*PublishRequest) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{0} -} - -func (x *PublishRequest) GetEvents() []*Event { - if x != nil { - return x.Events - } - return nil -} - -// Event is a translation of beat.Event into protobuf. -type Event struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Creation timestamp of the event. - Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // Source of the generated event. - Source *Source `protobuf:"bytes,2,opt,name=source,proto3" json:"source,omitempty"` - // Optional. Producers may specify a unique event ID if the event has a natural unique - // identifier to simplify acknowledgement tracking. If no such identifier exists, the queue - // offset can be used for acknowledgement tracking. - EventId string `protobuf:"bytes,3,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` - // Data stream for the event. - DataStream *DataStream `protobuf:"bytes,4,opt,name=data_stream,json=dataStream,proto3" json:"data_stream,omitempty"` - // Metadata JSON object (map[string]google.protobuf.Value) - Metadata *structpb.Struct `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` - // Field JSON object (map[string]google.protobuf.Value) - Fields *structpb.Struct `protobuf:"bytes,6,opt,name=fields,proto3" json:"fields,omitempty"` -} - -func (x *Event) Reset() { - *x = Event{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Event) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Event) ProtoMessage() {} - -func (x *Event) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Event.ProtoReflect.Descriptor instead. -func (*Event) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{1} -} - -func (x *Event) GetTimestamp() *timestamppb.Timestamp { - if x != nil { - return x.Timestamp - } - return nil -} - -func (x *Event) GetSource() *Source { - if x != nil { - return x.Source - } - return nil -} - -func (x *Event) GetEventId() string { - if x != nil { - return x.EventId - } - return "" -} - -func (x *Event) GetDataStream() *DataStream { - if x != nil { - return x.DataStream - } - return nil -} - -func (x *Event) GetMetadata() *structpb.Struct { - if x != nil { - return x.Metadata - } - return nil -} - -func (x *Event) GetFields() *structpb.Struct { - if x != nil { - return x.Fields - } - return nil -} - -// Source information required for proper event tracking, processing and routing -type Source struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Input ID in the agent policy. - InputId string `protobuf:"bytes,1,opt,name=input_id,json=inputId,proto3" json:"input_id,omitempty"` - // Stream ID in the agent policy (Optional, some inputs don't use streams). - // Not to be confused with data streams in Elasticsearch. - StreamId string `protobuf:"bytes,2,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` -} - -func (x *Source) Reset() { - *x = Source{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Source) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Source) ProtoMessage() {} - -func (x *Source) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Source.ProtoReflect.Descriptor instead. -func (*Source) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{2} -} - -func (x *Source) GetInputId() string { - if x != nil { - return x.InputId - } - return "" -} - -func (x *Source) GetStreamId() string { - if x != nil { - return x.StreamId - } - return "" -} - -// Elastic data stream -// See https://www.elastic.co/blog/an-introduction-to-the-elastic-data-stream-naming-scheme -type DataStream struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Generic type describing the data - Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` - // Describes the data ingested and its structure - Dataset string `protobuf:"bytes,2,opt,name=dataset,proto3" json:"dataset,omitempty"` - // User-configurable arbitrary grouping - Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` -} - -func (x *DataStream) Reset() { - *x = DataStream{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *DataStream) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*DataStream) ProtoMessage() {} - -func (x *DataStream) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use DataStream.ProtoReflect.Descriptor instead. -func (*DataStream) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{3} -} - -func (x *DataStream) GetType() string { - if x != nil { - return x.Type - } - return "" -} - -func (x *DataStream) GetDataset() string { - if x != nil { - return x.Dataset - } - return "" -} - -func (x *DataStream) GetNamespace() string { - if x != nil { - return x.Namespace - } - return "" -} - -type PublishReply struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Results describing messages successfully published to the queue. - // - // The output order matches the input order e.g. `results[N]` maps to `events[N]`, - // however, `results` may contain only the first K results for `events`, when - // not all the events are accepted by the queue, so the list sizes may not match. - Results []*EventResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` -} - -func (x *PublishReply) Reset() { - *x = PublishReply{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *PublishReply) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*PublishReply) ProtoMessage() {} - -func (x *PublishReply) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use PublishReply.ProtoReflect.Descriptor instead. -func (*PublishReply) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{4} -} - -func (x *PublishReply) GetResults() []*EventResult { - if x != nil { - return x.Results - } - return nil -} - -type EventResult struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Timestamp of the event that was published. - Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // ID derived from the queue offset that uniquely identifies the event within the shipper - // system. Used to track the progress of events through the queue system. - QueueId string `protobuf:"bytes,2,opt,name=queue_id,json=queueId,proto3" json:"queue_id,omitempty"` - // Optional. Event ID provided when the event was published, if it exists. - EventId string `protobuf:"bytes,3,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` - // Optional. List of errors encountered by processors, if any. Processor - // errors do not indicate the message failed to send. - ProcessorErrors []*status.Status `protobuf:"bytes,4,rep,name=processor_errors,json=processorErrors,proto3" json:"processor_errors,omitempty"` -} - -func (x *EventResult) Reset() { - *x = EventResult{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *EventResult) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*EventResult) ProtoMessage() {} - -func (x *EventResult) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use EventResult.ProtoReflect.Descriptor instead. -func (*EventResult) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{5} -} - -func (x *EventResult) GetTimestamp() *timestamppb.Timestamp { - if x != nil { - return x.Timestamp - } - return nil -} - -func (x *EventResult) GetQueueId() string { - if x != nil { - return x.QueueId - } - return "" -} - -func (x *EventResult) GetEventId() string { - if x != nil { - return x.EventId - } - return "" -} - -func (x *EventResult) GetProcessorErrors() []*status.Status { - if x != nil { - return x.ProcessorErrors - } - return nil -} - -type StreamAcksRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Optional. Requests acknowledgements originating only from this source. - Source *Source `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` - // Optional. Requests acknowledgements for events for this data stream only. - DataStream *DataStream `protobuf:"bytes,2,opt,name=data_stream,json=dataStream,proto3" json:"data_stream,omitempty"` -} - -func (x *StreamAcksRequest) Reset() { - *x = StreamAcksRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[6] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StreamAcksRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StreamAcksRequest) ProtoMessage() {} - -func (x *StreamAcksRequest) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[6] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StreamAcksRequest.ProtoReflect.Descriptor instead. -func (*StreamAcksRequest) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{6} -} - -func (x *StreamAcksRequest) GetSource() *Source { - if x != nil { - return x.Source - } - return nil -} - -func (x *StreamAcksRequest) GetDataStream() *DataStream { - if x != nil { - return x.DataStream - } - return nil -} - -type StreamAcksReply struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Acks []*Acknowledgement `protobuf:"bytes,1,rep,name=acks,proto3" json:"acks,omitempty"` -} - -func (x *StreamAcksReply) Reset() { - *x = StreamAcksReply{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[7] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *StreamAcksReply) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*StreamAcksReply) ProtoMessage() {} - -func (x *StreamAcksReply) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[7] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use StreamAcksReply.ProtoReflect.Descriptor instead. -func (*StreamAcksReply) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{7} -} - -func (x *StreamAcksReply) GetAcks() []*Acknowledgement { - if x != nil { - return x.Acks - } - return nil -} - -type Acknowledgement struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // Timestamp of the event being acknowledged. - Timestamp *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - // ID derived from the queue offset that uniquely identifies the event within the shipper - // system. Used to track the progress of events through the queue system. - QueueId string `protobuf:"bytes,2,opt,name=queue_id,json=queueId,proto3" json:"queue_id,omitempty"` - // Optional. Event ID provided when the event was published, if it exists. - EventId string `protobuf:"bytes,3,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` - // Optional. Error status indicating the message failed to send. - Error *status.Status `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"` -} - -func (x *Acknowledgement) Reset() { - *x = Acknowledgement{} - if protoimpl.UnsafeEnabled { - mi := &file_shipper_proto_msgTypes[8] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Acknowledgement) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Acknowledgement) ProtoMessage() {} - -func (x *Acknowledgement) ProtoReflect() protoreflect.Message { - mi := &file_shipper_proto_msgTypes[8] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Acknowledgement.ProtoReflect.Descriptor instead. -func (*Acknowledgement) Descriptor() ([]byte, []int) { - return file_shipper_proto_rawDescGZIP(), []int{8} -} - -func (x *Acknowledgement) GetTimestamp() *timestamppb.Timestamp { - if x != nil { - return x.Timestamp - } - return nil -} - -func (x *Acknowledgement) GetQueueId() string { - if x != nil { - return x.QueueId - } - return "" -} - -func (x *Acknowledgement) GetEventId() string { - if x != nil { - return x.EventId - } - return "" -} - -func (x *Acknowledgement) GetError() *status.Status { - if x != nil { - return x.Error - } - return nil -} - -var File_shipper_proto protoreflect.FileDescriptor - -var file_shipper_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x18, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x73, - 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, - 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x17, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0x49, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0xc3, 0x02, 0x0a, - 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x12, 0x38, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x20, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x45, 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x65, 0x6c, 0x61, - 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, - 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x33, 0x0a, 0x08, - 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x12, 0x2f, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, - 0x64, 0x73, 0x22, 0x40, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x19, 0x0a, 0x08, - 0x69, 0x6e, 0x70, 0x75, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x69, 0x6e, 0x70, 0x75, 0x74, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x49, 0x64, 0x22, 0x58, 0x0a, 0x0a, 0x44, 0x61, 0x74, 0x61, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x64, 0x61, 0x74, 0x61, 0x73, 0x65, 0x74, - 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x22, 0x4f, - 0x0a, 0x0c, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x3f, - 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x25, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, - 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, - 0xbc, 0x01, 0x0a, 0x0b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, - 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, - 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x19, 0x0a, 0x08, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x49, 0x64, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, - 0x3d, 0x0a, 0x10, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x5f, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0f, 0x70, - 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x6f, 0x72, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x73, 0x22, 0x94, - 0x01, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x41, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, - 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x45, - 0x0a, 0x0b, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x44, - 0x61, 0x74, 0x61, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x22, 0x50, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x41, - 0x63, 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x3d, 0x0a, 0x04, 0x61, 0x63, 0x6b, 0x73, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, - 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, - 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x6e, 0x6f, 0x77, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x6d, 0x65, 0x6e, - 0x74, 0x52, 0x04, 0x61, 0x63, 0x6b, 0x73, 0x22, 0xab, 0x01, 0x0a, 0x0f, 0x41, 0x63, 0x6b, 0x6e, - 0x6f, 0x77, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x38, 0x0a, 0x09, 0x74, - 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x19, 0x0a, 0x08, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x69, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x71, 0x75, 0x65, 0x75, 0x65, 0x49, 0x64, - 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x32, 0xe1, 0x01, 0x0a, 0x08, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, - 0x65, 0x72, 0x12, 0x61, 0x0a, 0x0d, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x73, 0x12, 0x28, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x50, - 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, - 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, - 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x72, 0x0a, 0x16, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x41, - 0x63, 0x6b, 0x6e, 0x6f, 0x77, 0x6c, 0x65, 0x64, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, - 0x2b, 0x2e, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, - 0x73, 0x68, 0x69, 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x41, 0x63, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x65, - 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x73, 0x68, 0x69, - 0x70, 0x70, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x41, 0x63, - 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x30, 0x01, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2f, - 0x65, 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x2d, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2d, 0x73, 0x68, - 0x69, 0x70, 0x70, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, -} - -var ( - file_shipper_proto_rawDescOnce sync.Once - file_shipper_proto_rawDescData = file_shipper_proto_rawDesc -) - -func file_shipper_proto_rawDescGZIP() []byte { - file_shipper_proto_rawDescOnce.Do(func() { - file_shipper_proto_rawDescData = protoimpl.X.CompressGZIP(file_shipper_proto_rawDescData) - }) - return file_shipper_proto_rawDescData -} - -var file_shipper_proto_msgTypes = make([]protoimpl.MessageInfo, 9) -var file_shipper_proto_goTypes = []interface{}{ - (*PublishRequest)(nil), // 0: elastic.agent.shipper.v1.PublishRequest - (*Event)(nil), // 1: elastic.agent.shipper.v1.Event - (*Source)(nil), // 2: elastic.agent.shipper.v1.Source - (*DataStream)(nil), // 3: elastic.agent.shipper.v1.DataStream - (*PublishReply)(nil), // 4: elastic.agent.shipper.v1.PublishReply - (*EventResult)(nil), // 5: elastic.agent.shipper.v1.EventResult - (*StreamAcksRequest)(nil), // 6: elastic.agent.shipper.v1.StreamAcksRequest - (*StreamAcksReply)(nil), // 7: elastic.agent.shipper.v1.StreamAcksReply - (*Acknowledgement)(nil), // 8: elastic.agent.shipper.v1.Acknowledgement - (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp - (*structpb.Struct)(nil), // 10: google.protobuf.Struct - (*status.Status)(nil), // 11: google.rpc.Status -} -var file_shipper_proto_depIdxs = []int32{ - 1, // 0: elastic.agent.shipper.v1.PublishRequest.events:type_name -> elastic.agent.shipper.v1.Event - 9, // 1: elastic.agent.shipper.v1.Event.timestamp:type_name -> google.protobuf.Timestamp - 2, // 2: elastic.agent.shipper.v1.Event.source:type_name -> elastic.agent.shipper.v1.Source - 3, // 3: elastic.agent.shipper.v1.Event.data_stream:type_name -> elastic.agent.shipper.v1.DataStream - 10, // 4: elastic.agent.shipper.v1.Event.metadata:type_name -> google.protobuf.Struct - 10, // 5: elastic.agent.shipper.v1.Event.fields:type_name -> google.protobuf.Struct - 5, // 6: elastic.agent.shipper.v1.PublishReply.results:type_name -> elastic.agent.shipper.v1.EventResult - 9, // 7: elastic.agent.shipper.v1.EventResult.timestamp:type_name -> google.protobuf.Timestamp - 11, // 8: elastic.agent.shipper.v1.EventResult.processor_errors:type_name -> google.rpc.Status - 2, // 9: elastic.agent.shipper.v1.StreamAcksRequest.source:type_name -> elastic.agent.shipper.v1.Source - 3, // 10: elastic.agent.shipper.v1.StreamAcksRequest.data_stream:type_name -> elastic.agent.shipper.v1.DataStream - 8, // 11: elastic.agent.shipper.v1.StreamAcksReply.acks:type_name -> elastic.agent.shipper.v1.Acknowledgement - 9, // 12: elastic.agent.shipper.v1.Acknowledgement.timestamp:type_name -> google.protobuf.Timestamp - 11, // 13: elastic.agent.shipper.v1.Acknowledgement.error:type_name -> google.rpc.Status - 0, // 14: elastic.agent.shipper.v1.Producer.PublishEvents:input_type -> elastic.agent.shipper.v1.PublishRequest - 6, // 15: elastic.agent.shipper.v1.Producer.StreamAcknowledgements:input_type -> elastic.agent.shipper.v1.StreamAcksRequest - 4, // 16: elastic.agent.shipper.v1.Producer.PublishEvents:output_type -> elastic.agent.shipper.v1.PublishReply - 7, // 17: elastic.agent.shipper.v1.Producer.StreamAcknowledgements:output_type -> elastic.agent.shipper.v1.StreamAcksReply - 16, // [16:18] is the sub-list for method output_type - 14, // [14:16] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name -} - -func init() { file_shipper_proto_init() } -func file_shipper_proto_init() { - if File_shipper_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_shipper_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Event); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Source); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DataStream); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishReply); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*EventResult); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StreamAcksRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StreamAcksReply); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_shipper_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Acknowledgement); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_shipper_proto_rawDesc, - NumEnums: 0, - NumMessages: 9, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_shipper_proto_goTypes, - DependencyIndexes: file_shipper_proto_depIdxs, - MessageInfos: file_shipper_proto_msgTypes, - }.Build() - File_shipper_proto = out.File - file_shipper_proto_rawDesc = nil - file_shipper_proto_goTypes = nil - file_shipper_proto_depIdxs = nil -} diff --git a/libbeat/outputs/shipper/api/shipper_grpc.pb.go b/libbeat/outputs/shipper/api/shipper_grpc.pb.go deleted file mode 100644 index d6919871c024..000000000000 --- a/libbeat/outputs/shipper/api/shipper_grpc.pb.go +++ /dev/null @@ -1,211 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package api - -import ( - context "context" - - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// ProducerClient is the client API for Producer service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type ProducerClient interface { - // Publishes a list of events via the Elastic agent shipper. - // Blocks until all processing steps complete and data is written to the queue. - // The order of `PublishRequest.events` always matches `PublishReply.results`. - // - // Returns the `codes.ResourceExhausted` gRPC status code if the queue is full and none of the events - // can be accepted at the moment. - // - // If the queue could accept some events from the request, this returns a successful response - // containing results for the first K events that were accepted by the queue. - // The client is expected to retry sending the rest of the events in a separate request later. - // - // Inputs may execute multiple concurrent Produce requests for independent data streams. - // The order in which concurrent requests complete is not guaranteed. Use sequential requests to - // control ordering. - PublishEvents(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishReply, error) - // Returns a stream of acknowledgements from outputs. - StreamAcknowledgements(ctx context.Context, in *StreamAcksRequest, opts ...grpc.CallOption) (Producer_StreamAcknowledgementsClient, error) -} - -type producerClient struct { - cc grpc.ClientConnInterface -} - -func NewProducerClient(cc grpc.ClientConnInterface) ProducerClient { - return &producerClient{cc} -} - -func (c *producerClient) PublishEvents(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishReply, error) { - out := new(PublishReply) - err := c.cc.Invoke(ctx, "/elastic.agent.shipper.v1.Producer/PublishEvents", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *producerClient) StreamAcknowledgements(ctx context.Context, in *StreamAcksRequest, opts ...grpc.CallOption) (Producer_StreamAcknowledgementsClient, error) { - stream, err := c.cc.NewStream(ctx, &Producer_ServiceDesc.Streams[0], "/elastic.agent.shipper.v1.Producer/StreamAcknowledgements", opts...) - if err != nil { - return nil, err - } - x := &producerStreamAcknowledgementsClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Producer_StreamAcknowledgementsClient interface { - Recv() (*StreamAcksReply, error) - grpc.ClientStream -} - -type producerStreamAcknowledgementsClient struct { - grpc.ClientStream -} - -func (x *producerStreamAcknowledgementsClient) Recv() (*StreamAcksReply, error) { - m := new(StreamAcksReply) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// ProducerServer is the server API for Producer service. -// All implementations must embed UnimplementedProducerServer -// for forward compatibility -type ProducerServer interface { - // Publishes a list of events via the Elastic agent shipper. - // Blocks until all processing steps complete and data is written to the queue. - // The order of `PublishRequest.events` always matches `PublishReply.results`. - // - // Returns the `codes.ResourceExhausted` gRPC status code if the queue is full and none of the events - // can be accepted at the moment. - // - // If the queue could accept some events from the request, this returns a successful response - // containing results for the first K events that were accepted by the queue. - // The client is expected to retry sending the rest of the events in a separate request later. - // - // Inputs may execute multiple concurrent Produce requests for independent data streams. - // The order in which concurrent requests complete is not guaranteed. Use sequential requests to - // control ordering. - PublishEvents(context.Context, *PublishRequest) (*PublishReply, error) - // Returns a stream of acknowledgements from outputs. - StreamAcknowledgements(*StreamAcksRequest, Producer_StreamAcknowledgementsServer) error - mustEmbedUnimplementedProducerServer() -} - -// UnimplementedProducerServer must be embedded to have forward compatible implementations. -type UnimplementedProducerServer struct { -} - -func (UnimplementedProducerServer) PublishEvents(context.Context, *PublishRequest) (*PublishReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method PublishEvents not implemented") -} -func (UnimplementedProducerServer) StreamAcknowledgements(*StreamAcksRequest, Producer_StreamAcknowledgementsServer) error { - return status.Errorf(codes.Unimplemented, "method StreamAcknowledgements not implemented") -} -func (UnimplementedProducerServer) mustEmbedUnimplementedProducerServer() {} - -// UnsafeProducerServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to ProducerServer will -// result in compilation errors. -type UnsafeProducerServer interface { - mustEmbedUnimplementedProducerServer() -} - -func RegisterProducerServer(s grpc.ServiceRegistrar, srv ProducerServer) { - s.RegisterService(&Producer_ServiceDesc, srv) -} - -func _Producer_PublishEvents_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PublishRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ProducerServer).PublishEvents(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/elastic.agent.shipper.v1.Producer/PublishEvents", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ProducerServer).PublishEvents(ctx, req.(*PublishRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Producer_StreamAcknowledgements_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(StreamAcksRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(ProducerServer).StreamAcknowledgements(m, &producerStreamAcknowledgementsServer{stream}) -} - -type Producer_StreamAcknowledgementsServer interface { - Send(*StreamAcksReply) error - grpc.ServerStream -} - -type producerStreamAcknowledgementsServer struct { - grpc.ServerStream -} - -func (x *producerStreamAcknowledgementsServer) Send(m *StreamAcksReply) error { - return x.ServerStream.SendMsg(m) -} - -// Producer_ServiceDesc is the grpc.ServiceDesc for Producer service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Producer_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "elastic.agent.shipper.v1.Producer", - HandlerType: (*ProducerServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "PublishEvents", - Handler: _Producer_PublishEvents_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "StreamAcknowledgements", - Handler: _Producer_StreamAcknowledgements_Handler, - ServerStreams: true, - }, - }, - Metadata: "shipper.proto", -} diff --git a/libbeat/outputs/shipper/api/shipper_mock.go b/libbeat/outputs/shipper/api/shipper_mock.go index e7d3f5f7054b..fce9db750cd9 100644 --- a/libbeat/outputs/shipper/api/shipper_mock.go +++ b/libbeat/outputs/shipper/api/shipper_mock.go @@ -19,28 +19,29 @@ package api import ( context "context" + + pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" ) func NewProducerMock(cap int) *ProducerMock { return &ProducerMock{ - Q: make([]*Event, 0, cap), + Q: make([]*messages.Event, 0, cap), } } type ProducerMock struct { - UnimplementedProducerServer - Q []*Event + pb.UnimplementedProducerServer + Q []*messages.Event Error error } -func (p *ProducerMock) PublishEvents(ctx context.Context, r *PublishRequest) (*PublishReply, error) { +func (p *ProducerMock) PublishEvents(ctx context.Context, r *messages.PublishRequest) (*messages.PublishReply, error) { if p.Error != nil { return nil, p.Error } - resp := &PublishReply{ - Results: make([]*EventResult, 0, len(r.Events)), - } + resp := &messages.PublishReply{} for _, e := range r.Events { if len(p.Q) == cap(p.Q) { @@ -48,12 +49,7 @@ func (p *ProducerMock) PublishEvents(ctx context.Context, r *PublishRequest) (*P } p.Q = append(p.Q, e) - - resp.Results = append(resp.Results, &EventResult{ - Timestamp: e.GetTimestamp(), - QueueId: "queue", - EventId: e.GetEventId(), - }) + resp.AcceptedCount++ } return resp, nil diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 7755d18a256c..eceedff9cd79 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -24,8 +24,10 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" - sc "github.com/elastic/beats/v7/libbeat/outputs/shipper/api" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" + sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -37,7 +39,6 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -128,7 +129,7 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error { st.NewBatch(len(events)) nonDroppedEvents := make([]publisher.Event, 0, len(events)) - convertedEvents := make([]*sc.Event, 0, len(events)) + convertedEvents := make([]*messages.Event, 0, len(events)) c.log.Debugf("converting %d events to protobuf...", len(events)) @@ -152,7 +153,7 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error { ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() - resp, err := c.client.PublishEvents(ctx, &sc.PublishRequest{ + resp, err := c.client.PublishEvents(ctx, &messages.PublishRequest{ Events: convertedEvents, }) @@ -163,26 +164,26 @@ func (c *shipper) Publish(ctx context.Context, batch publisher.Batch) error { } // with a correct server implementation should never happen, this error is not recoverable - if len(resp.Results) > len(nonDroppedEvents) { + if int(resp.AcceptedCount) > len(nonDroppedEvents) { return fmt.Errorf( - "server returned unexpected results, expected maximum %d items, got %d", + "server returned unexpected results, expected maximum accepted items %d, got %d", len(nonDroppedEvents), - len(resp.Results), + resp.AcceptedCount, ) } // the server is supposed to retain the order of the initial events in the response // judging by the size of the result list we can determine what part of the initial // list was accepted and we can send the rest of the list for a retry - retries := nonDroppedEvents[len(resp.Results):] + retries := nonDroppedEvents[resp.AcceptedCount:] if len(retries) == 0 { batch.ACK() st.Acked(len(nonDroppedEvents)) - c.log.Debugf("%d events have been acknowledged, %d dropped", len(nonDroppedEvents), droppedCount) + c.log.Debugf("%d events have been accepted, %d dropped", len(nonDroppedEvents), droppedCount) } else { batch.RetryEvents(retries) // decreases TTL unless guaranteed delivery st.Failed(len(retries)) - c.log.Debugf("%d events have been acknowledged, %d sent for retry, %d dropped", len(resp.Results), len(retries), droppedCount) + c.log.Debugf("%d events have been accepted, %d sent for retry, %d dropped", resp.AcceptedCount, len(retries), droppedCount) } return nil @@ -206,23 +207,23 @@ func (c *shipper) String() string { return "shipper" } -func convertMapStr(m mapstr.M) (*structpb.Value, error) { +func convertMapStr(m mapstr.M) (*messages.Value, error) { if m == nil { - return structpb.NewNullValue(), nil + return helpers.NewNullValue(), nil } - fields := make(map[string]*structpb.Value, len(m)) + fields := make(map[string]*messages.Value, len(m)) for key, value := range m { var ( - protoValue *structpb.Value + protoValue *messages.Value err error ) switch v := value.(type) { case mapstr.M: protoValue, err = convertMapStr(v) default: - protoValue, err = structpb.NewValue(v) + protoValue, err = helpers.NewValue(v) } if err != nil { return nil, err @@ -230,14 +231,14 @@ func convertMapStr(m mapstr.M) (*structpb.Value, error) { fields[key] = protoValue } - s := &structpb.Struct{ - Fields: fields, + s := &messages.Struct{ + Data: fields, } - return structpb.NewStructValue(s), nil + return helpers.NewStructValue(s), nil } -func toShipperEvent(e publisher.Event) (*sc.Event, error) { +func toShipperEvent(e publisher.Event) (*messages.Event, error) { meta, err := convertMapStr(e.Content.Meta) if err != nil { return nil, fmt.Errorf("failed to convert event metadata to protobuf: %w", err) @@ -248,8 +249,8 @@ func toShipperEvent(e publisher.Event) (*sc.Event, error) { return nil, fmt.Errorf("failed to convert event fields to protobuf: %w", err) } - source := &sc.Source{} - ds := &sc.DataStream{} + source := &messages.Source{} + ds := &messages.DataStream{} inputIDVal, err := e.Content.Meta.GetValue("input_id") if err == nil { @@ -274,7 +275,7 @@ func toShipperEvent(e publisher.Event) (*sc.Event, error) { ds.Dataset, _ = dsDataset.(string) } - return &sc.Event{ + return &messages.Event{ Timestamp: timestamppb.New(e.Content.Timestamp), Metadata: meta.GetStructValue(), Fields: fields.GetStructValue(), diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index bac33d6490a1..f7c2b2cdf904 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -28,25 +28,28 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outest" - sc "github.com/elastic/beats/v7/libbeat/outputs/shipper/api" + "github.com/elastic/beats/v7/libbeat/outputs/shipper/api" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-shipper-client/pkg/helpers" + pb "github.com/elastic/elastic-agent-shipper-client/pkg/proto" + "github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages" ) func TestToShipperEvent(t *testing.T) { + wrong := struct{}{} ts := time.Now().Truncate(time.Second) cases := []struct { name string value publisher.Event - exp *sc.Event + exp *messages.Event expErr string }{ { @@ -62,10 +65,10 @@ func TestToShipperEvent(t *testing.T) { }, }, }, - exp: &sc.Event{ + exp: &messages.Event{ Timestamp: timestamppb.New(ts), - Source: &sc.Source{}, - DataStream: &sc.DataStream{}, + Source: &messages.Source{}, + DataStream: &messages.DataStream{}, Metadata: protoStruct(t, map[string]interface{}{ "metafield": 42, }), @@ -94,13 +97,13 @@ func TestToShipperEvent(t *testing.T) { }, }, }, - exp: &sc.Event{ + exp: &messages.Event{ Timestamp: timestamppb.New(ts), - Source: &sc.Source{ + Source: &messages.Source{ InputId: "input", StreamId: "stream", }, - DataStream: &sc.DataStream{ + DataStream: &messages.DataStream{ Type: "ds-type", Namespace: "ds-namespace", Dataset: "ds-dataset", @@ -126,7 +129,7 @@ func TestToShipperEvent(t *testing.T) { Content: beat.Event{ Timestamp: ts, Meta: mapstr.M{ - "metafield": ts, // timestamp is a wrong type + "metafield": wrong, }, }, }, @@ -138,7 +141,7 @@ func TestToShipperEvent(t *testing.T) { Content: beat.Event{ Timestamp: ts, Fields: mapstr.M{ - "field": ts, // timestamp is a wrong type + "field": wrong, }, }, }, @@ -163,12 +166,12 @@ func TestConvertMapStr(t *testing.T) { cases := []struct { name string value mapstr.M - exp *structpb.Value + exp *messages.Value expErr string }{ { name: "nil returns nil", - exp: structpb.NewNullValue(), + exp: helpers.NewNullValue(), }, { name: "empty map returns empty struct", @@ -178,9 +181,9 @@ func TestConvertMapStr(t *testing.T) { { name: "returns error when type is not supported", value: mapstr.M{ - "key": time.Now(), + "key": struct{}{}, }, - expErr: "invalid type: time.Time", + expErr: "invalid type: struct {}", }, { name: "values are preserved", @@ -411,10 +414,10 @@ func TestPublish(t *testing.T) { // `listenAddr` is the address for the server to listen // returns `actualAddr` where the listener actually is and the `stop` function to stop the server func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAddr string, stop func()) { - producer := sc.NewProducerMock(qSize) + producer := api.NewProducerMock(qSize) producer.Error = err grpcServer := grpc.NewServer() - sc.RegisterProducerServer(grpcServer, producer) + pb.RegisterProducerServer(grpcServer, producer) listener, err := net.Listen("tcp", listenAddr) require.NoError(t, err) @@ -431,14 +434,14 @@ func runServer(t *testing.T, qSize int, err error, listenAddr string) (actualAdd return actualAddr, stop } -func protoStruct(t *testing.T, values map[string]interface{}) *structpb.Struct { - s, err := structpb.NewStruct(values) +func protoStruct(t *testing.T, values map[string]interface{}) *messages.Struct { + s, err := helpers.NewStruct(values) require.NoError(t, err) return s } -func protoStructValue(t *testing.T, values map[string]interface{}) *structpb.Value { +func protoStructValue(t *testing.T, values map[string]interface{}) *messages.Value { s := protoStruct(t, values) - return structpb.NewStructValue(s) + return helpers.NewStructValue(s) } func requireEqualProto(t *testing.T, expected, actual proto.Message) {