diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 904ea7f199a..9b1244c465f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -163,6 +163,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add ingress nginx controller fileset {pull}16197[16197] - move create-[module,fileset,fields] to mage and enable in x-pack/filebeat {pull}15836[15836] - Add ECS tls and categorization fields to apache module. {issue}16032[16032] {pull}16121[16121] +- Work on e2e ACK's for the azure-eventhub input {issue}15671[15671] {pull}16215[16215] - Add MQTT input. {issue}15602[15602] {pull}16204[16204] - Add ECS categorization fields to activemq module. {issue}16151[16151] {pull}16201[16201] - Add a TLS test and more debug output to httpjson input {pull}16315[16315] diff --git a/NOTICE.txt b/NOTICE.txt index 19fd97b1fdf..208cf82f637 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -518,7 +518,7 @@ License type (autodetected): MIT -------------------------------------------------------------------- Dependency: github.com/Azure/azure-event-hubs-go/v3 -Version: v3.1.0 +Version: v3.1.2 License type (autodetected): MIT ./vendor/github.com/Azure/azure-event-hubs-go/v3/LICENSE: -------------------------------------------------------------------- diff --git a/go.mod b/go.mod index f44be30d7fc..280fc6a2675 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( code.cloudfoundry.org/go-diodes v0.0.0-20190809170250-f77fb823c7ee // indirect code.cloudfoundry.org/go-loggregator v7.4.0+incompatible code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a // indirect - github.com/Azure/azure-event-hubs-go/v3 v3.1.0 + github.com/Azure/azure-event-hubs-go/v3 v3.1.2 github.com/Azure/azure-sdk-for-go v37.1.0+incompatible github.com/Azure/azure-storage-blob-go v0.8.0 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect diff --git a/go.sum b/go.sum index fbce282b594..26dcb9910c1 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,8 @@ code.cloudfoundry.org/rfc5424 v0.0.0-20180905210152-236a6d29298a/go.mod h1:tkZo8 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-amqp-common-go/v3 v3.0.0 h1:j9tjcwhypb/jek3raNrwlCIl7iKQYOug7CLpSyBBodc= github.com/Azure/azure-amqp-common-go/v3 v3.0.0/go.mod h1:SY08giD/XbhTz07tJdpw1SoxQXHPN30+DI3Z04SYqyg= -github.com/Azure/azure-event-hubs-go/v3 v3.1.0 h1:j+/WXzke3PTRu5gAgSpWgWJVfpwIyaedIqqgdgkjAe0= -github.com/Azure/azure-event-hubs-go/v3 v3.1.0/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k= +github.com/Azure/azure-event-hubs-go/v3 v3.1.2 h1:S/NjCZ1Z2R4rHJd2Hbbad6rIhxJ4lZZebKTsKHweX4A= +github.com/Azure/azure-event-hubs-go/v3 v3.1.2/go.mod h1:hR40byNJjKkS74+3RhloPQ8sJ8zFQeJ920Uk3oYY0+k= github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.1.9/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.2.1 h1:OLBdZJ3yvOn2MezlWvbrBMTEUQC72zAftRZOMdj5HYo= @@ -107,8 +107,8 @@ github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 h1:7rj9qZ63knnVo2Z github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20/go.mod h1:cI59GRkC2FRaFYtgbYEqMlgnnfvAwXzjojyZKXwklNg= github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43 h1:WFwa9pqou0Nb4DdfBOyaBTH0GqLE74Qwdf61E7ITHwQ= github.com/andrewkroh/sys v0.0.0-20151128191922-287798fe3e43/go.mod h1:tJPYQG4mnMeUtQvQKNkbsFrnmZOg59Qnf8CcctFv5v4= -github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= +github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 h1:nkZ9axP+MvUFCu8JRN/MCY+DmTfs6lY7hE0QnJbxSdI= github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5/go.mod h1:T7PbCXFs94rrTttyxjbyT5+/1V8T2TYDejxUfHJjw1Y= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= @@ -730,8 +730,8 @@ golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/changelog.md b/vendor/github.com/Azure/azure-event-hubs-go/v3/changelog.md index 2b901924a56..8e14f9d4236 100644 --- a/vendor/github.com/Azure/azure-event-hubs-go/v3/changelog.md +++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/changelog.md @@ -1,9 +1,15 @@ # Change Log -## `head` +## `v3.1.2` +- fix errors in message handling being ignored [#155](https://github.com/Azure/azure-event-hubs-go/issues/155) +## `v3.1.1` +- Azure storage SAS token regeneration fix [#157](https://github.com/Azure/azure-event-hubs-go/issues/157) + +## `v3.1.0` - add support for websocket connections with eph with `eph.WithWebSocketConnection()` + ## `v2.0.4` - add comment on the `PartitionID` field in `SystemProperties` to clarify that it will always return a nil value [#131](https://github.com/Azure/azure-event-hubs-go/issues/131) diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go b/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go index 38d05a146e5..fda6965e579 100644 --- a/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go +++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go @@ -60,22 +60,22 @@ const ( type ( // EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions EventProcessorHost struct { - namespace string - hubName string - name string - consumerGroup string - tokenProvider auth.TokenProvider - client *eventhub.Hub - leaser Leaser - checkpointer Checkpointer - scheduler *scheduler - handlers map[string]eventhub.Handler - hostMu sync.Mutex - handlersMu sync.Mutex - partitionIDs []string - noBanner bool + namespace string + hubName string + name string + consumerGroup string + tokenProvider auth.TokenProvider + client *eventhub.Hub + leaser Leaser + checkpointer Checkpointer + scheduler *scheduler + handlers map[string]eventhub.Handler + hostMu sync.Mutex + handlersMu sync.Mutex + partitionIDs []string + noBanner bool webSocketConnection bool - env *azure.Environment + env *azure.Environment } // EventProcessorHostOption provides configuration options for an EventProcessorHost @@ -428,18 +428,24 @@ func (h *EventProcessorHost) compositeHandlers() eventhub.Handler { h.handlersMu.Lock() defer h.handlersMu.Unlock() - var wg sync.WaitGroup - for _, handle := range h.handlers { + // we accept that this will contain any of the possible len(h.handlers) errors + // as it will be used to later decide of delivery is considered a failure + // and NOT further inspected + var lastError error + + wg := &sync.WaitGroup{} + for _, handler := range h.handlers { wg.Add(1) - go func(boundHandle eventhub.Handler) { - if err := boundHandle(ctx, event); err != nil { + go func(boundHandler eventhub.Handler) { + defer wg.Done() // consider if panics should be cought here, too. Currently would crash process + if err := boundHandler(ctx, event); err != nil { + lastError = err tab.For(ctx).Error(err) } - wg.Done() - }(handle) + }(handler) } wg.Wait() - return nil + return lastError } } diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum b/vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum index aa9f7c7d383..7156f037963 100644 --- a/vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum +++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/go.sum @@ -67,6 +67,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/storage/credential.go b/vendor/github.com/Azure/azure-event-hubs-go/v3/storage/credential.go index 3344a74de34..999287e0df5 100644 --- a/vendor/github.com/Azure/azure-event-hubs-go/v3/storage/credential.go +++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/storage/credential.go @@ -153,7 +153,7 @@ func (cred *AADSASCredential) getToken(ctx context.Context) (SASToken, error) { defer span.End() if cred.token != nil { - if cred.token.expiry.Before(time.Now().Add(-5 * time.Minute)) { + if !cred.token.expiry.Before(time.Now().Add(5 * time.Minute)) { return *cred.token, nil } } diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/version.go b/vendor/github.com/Azure/azure-event-hubs-go/v3/version.go index 3c676a70e3f..536932abe86 100644 --- a/vendor/github.com/Azure/azure-event-hubs-go/v3/version.go +++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/version.go @@ -2,5 +2,5 @@ package eventhub const ( // Version is the semantic version number - Version = "3.1.0" + Version = "3.1.2" ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 4f3775dfa22..12f9460241e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -44,7 +44,7 @@ github.com/Azure/azure-amqp-common-go/v3/internal/tracing github.com/Azure/azure-amqp-common-go/v3/rpc github.com/Azure/azure-amqp-common-go/v3/sas github.com/Azure/azure-amqp-common-go/v3/uuid -# github.com/Azure/azure-event-hubs-go/v3 v3.1.0 +# github.com/Azure/azure-event-hubs-go/v3 v3.1.2 github.com/Azure/azure-event-hubs-go/v3 github.com/Azure/azure-event-hubs-go/v3/atom github.com/Azure/azure-event-hubs-go/v3/eph diff --git a/x-pack/filebeat/input/azureeventhub/eph.go b/x-pack/filebeat/input/azureeventhub/eph.go index afd26264529..8848483c8be 100644 --- a/x-pack/filebeat/input/azureeventhub/eph.go +++ b/x-pack/filebeat/input/azureeventhub/eph.go @@ -6,6 +6,7 @@ package azureeventhub import ( "context" + "errors" "fmt" eventhub "github.com/Azure/azure-event-hubs-go/v3" @@ -48,8 +49,15 @@ func (a *azureInput) runWithEPH() error { // register a message handler -- many can be registered handlerID, err := a.processor.RegisterHandler(a.workerCtx, func(c context.Context, e *eventhub.Event) error { + var onEventErr error // partitionID is not yet mapped in the azure-eventhub sdk - return a.processEvents(e, "") + ok := a.processEvents(e, "") + if !ok { + onEventErr = errors.New("OnEvent function returned false. Stopping input worker") + a.log.Debug(onEventErr.Error()) + a.Stop() + } + return onEventErr }) if err != nil { return err diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 8f5068e8c96..1435801893d 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -42,6 +42,7 @@ type azureInput struct { workerWg sync.WaitGroup // waits on worker goroutine. processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option hub *eventhub.Hub // hub will be assigned + ackChannel chan int } const ( @@ -66,14 +67,6 @@ func NewInput( if err := cfg.Unpack(&config); err != nil { return nil, errors.Wrapf(err, "reading %s input config", inputName) } - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, - }) - if err != nil { - return nil, err - } inputCtx, cancelInputCtx := context.WithCancel(context.Background()) go func() { @@ -88,17 +81,24 @@ func NewInput( // to be recreated with each restart. workerCtx, workerCancel := context.WithCancel(inputCtx) - input := &azureInput{ + in := &azureInput{ config: config, log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", config.ConnectionString), - outlet: out, context: inputContext, workerCtx: workerCtx, workerCancel: workerCancel, } - - input.log.Infof("Initialized %s input.", inputName) - return input, nil + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + }) + if err != nil { + return nil, err + } + in.outlet = out + in.log.Infof("Initialized %s input.", inputName) + return in, nil } // Run starts the input worker then returns. Only the first invocation @@ -176,7 +176,7 @@ func (a *azureInput) Wait() { a.Stop() } -func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) error { +func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool { timestamp := time.Now() azure := common.MapStr{ // partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable @@ -195,12 +195,13 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) er "message": msg, "azure": azure, }, + Private: event.Data, }) if !ok { - return errors.New("event has not been sent") + return ok } } - return nil + return true } // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration @@ -209,7 +210,6 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { err := json.Unmarshal(bMessage, &obj) if err != nil { a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err) - return []string{string(bMessage)} } var messages []string if len(obj[expandEventListFromField]) > 0 { diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 367d5cae1e2..6e6cd47484c 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -57,9 +57,9 @@ func TestProcessEvents(t *testing.T) { Data: []byte(msg), SystemProperties: &properties, } - err = input.processEvents(&ev, "0") - if err != nil { - t.Fatal(err) + ok := input.processEvents(&ev, "0") + if !ok { + t.Fatal("OnEvent function returned false") } assert.Equal(t, len(o.Events), 1) message, err := o.Events[0].Fields.GetValue("message")