From fa553d558a8c1a0becf85eaf6dda5755ce5294ca Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 20 Aug 2021 10:24:24 -0500 Subject: [PATCH 01/12] [Heartbeat] Fix broken data_stream assignment Fixes https://github.com/elastic/beats/issues/27478 This PR fixes the logic behind assigning the data_steam.dataset field. Previously this was static per monitor type, only the index would change, but not the field. This makes the processor more comprehensive handling not just index naming but field generation. --- heartbeat/monitors/factory.go | 16 ++++------ .../add_data_stream.go} | 31 ++++++++++++------- .../add_data_stream_test.go} | 2 +- .../add_data_stream_index/datastream.go | 18 ----------- .../monitors/browser/synthexec/enrich.go | 6 ++-- 5 files changed, 29 insertions(+), 44 deletions(-) rename libbeat/processors/{add_data_stream_index/add_data_stream_index.go => add_data_stream/add_data_stream.go} (71%) rename libbeat/processors/{add_data_stream_index/add_data_stream_index_test.go => add_data_stream/add_data_stream_test.go} (98%) delete mode 100644 libbeat/processors/add_data_stream_index/datastream.go diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 5e82969fd18..ea3da4a8c75 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) @@ -51,10 +51,10 @@ type publishSettings struct { KeepNull bool `config:"keep_null"` // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - Index fmtstr.EventFormatString `config:"index"` // ES output index pattern - DataStream *add_data_stream_index.DataStream `config:"data_stream"` - DataSet string `config:"dataset"` + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern + DataStream *add_data_stream.DataStream `config:"data_stream"` + DataSet string `config:"dataset"` } // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. @@ -119,10 +119,6 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi fields := clientCfg.Processing.Fields.Clone() fields.Put("event.dataset", dataset) - if settings.DataStream != nil { - fields.Put("data_stream", settings.DataStream) - } - meta := clientCfg.Processing.Meta.Clone() if settings.Pipeline != "" { meta.Put("pipeline", settings.Pipeline) @@ -164,7 +160,7 @@ func setupIndexProcessor(info beat.Info, settings publishSettings, dataset strin if ds.Dataset == "" { ds.Dataset = dataset } - return add_data_stream_index.New(*ds), nil + return add_data_stream.New(*ds), nil } if !settings.Index.IsEmpty() { diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index.go b/libbeat/processors/add_data_stream/add_data_stream.go similarity index 71% rename from libbeat/processors/add_data_stream_index/add_data_stream_index.go rename to libbeat/processors/add_data_stream/add_data_stream.go index 7d770c6f082..bc27e5c5ed2 100644 --- a/libbeat/processors/add_data_stream_index/add_data_stream_index.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package add_data_stream_index +package add_data_stream import ( "fmt" @@ -43,8 +43,10 @@ func SetEventDataset(event *beat.Event, ds string) { type AddDataStreamIndex struct { DataStream DataStream // cached, compiled version of the index name derived from the data stream - dsCached string - customDsCache string + idxNameCache string + // cached, compiled version of the index name derived from the data stream, sans datastream + // which is dynamic + idxNamePartialCache string } // New returns a new AddDataStreamIndex processor. @@ -56,26 +58,29 @@ func New(ds DataStream) *AddDataStreamIndex { ds.Dataset = "generic" } return &AddDataStreamIndex{ - DataStream: ds, - dsCached: ds.indexName(), - customDsCache: ds.datasetFmtString(), + DataStream: ds, + idxNameCache: ds.indexName(), + idxNamePartialCache: ds.idxNamePartialCache(), } } // Run runs the processor. func (p *AddDataStreamIndex) Run(event *beat.Event) (*beat.Event, error) { + eventDataStream := p.DataStream if event.Meta == nil { event.Meta = common.MapStr{ - events.FieldMetaRawIndex: p.dsCached, + events.FieldMetaRawIndex: p.idxNameCache, } } else { - customDs, hasCustom := event.Meta[FieldMetaCustomDataset] + customDataset, hasCustom := event.Meta[FieldMetaCustomDataset] if !hasCustom { - event.Meta[events.FieldMetaRawIndex] = p.dsCached + event.Meta[events.FieldMetaRawIndex] = p.idxNameCache } else { - event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.customDsCache, customDs) + event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.idxNamePartialCache, customDataset) + eventDataStream.Dataset = customDataset.(string) } } + event.PutValue("data_stream", eventDataStream) return event, nil } @@ -92,10 +97,12 @@ type DataStream struct { Type string `config:"type"` } -func (ds DataStream) datasetFmtString() string { +// genIdxFmtStringForDataset returns a format string that takes a single argument, the dataset +// this is slightly more efficient than generating the entire string with all three vars every time +func (ds DataStream) idxNamePartialCache() string { return fmt.Sprintf("%s-%%s-%s", ds.Type, ds.Namespace) } func (ds DataStream) indexName() string { - return fmt.Sprintf(ds.datasetFmtString(), ds.Dataset) + return fmt.Sprintf(ds.idxNamePartialCache(), ds.Dataset) } diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go b/libbeat/processors/add_data_stream/add_data_stream_test.go similarity index 98% rename from libbeat/processors/add_data_stream_index/add_data_stream_index_test.go rename to libbeat/processors/add_data_stream/add_data_stream_test.go index d71181e3127..f6201114fd3 100644 --- a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go +++ b/libbeat/processors/add_data_stream/add_data_stream_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package add_data_stream_index +package add_data_stream import ( "testing" diff --git a/libbeat/processors/add_data_stream_index/datastream.go b/libbeat/processors/add_data_stream_index/datastream.go deleted file mode 100644 index c2a69dc5214..00000000000 --- a/libbeat/processors/add_data_stream_index/datastream.go +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package add_data_stream_index diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index a842dd28794..c3ab6c76faa 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -9,7 +9,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/beat/events" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/gofrs/uuid" @@ -122,9 +122,9 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e case "step/screenshot_ref": fallthrough case "screenshot/block": - add_data_stream_index.SetEventDataset(event, "browser.screenshot") + add_data_stream.SetEventDataset(event, "browser.screenshot") case "journey/network_info": - add_data_stream_index.SetEventDataset(event, "browser.network") + add_data_stream.SetEventDataset(event, "browser.network") } if se.Id != "" { From 0069de40b722e1a3f5211d7b0696924f1c7001f0 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 20 Aug 2021 10:41:45 -0500 Subject: [PATCH 02/12] Fmt --- .../add_data_stream/add_data_stream.go | 19 ++++++++++-------- .../add_data_stream/add_data_stream_test.go | 20 ++++++++++++------- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/libbeat/processors/add_data_stream/add_data_stream.go b/libbeat/processors/add_data_stream/add_data_stream.go index bc27e5c5ed2..31a4565a0af 100644 --- a/libbeat/processors/add_data_stream/add_data_stream.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -37,10 +37,10 @@ func SetEventDataset(event *beat.Event, ds string) { } } -// AddDataStreamIndex is a Processor to set an event's "raw_index" metadata field -// based on the given type, dataset, and namespace fields. -// If the event's metadata contains an -type AddDataStreamIndex struct { +// AddDataStream is a Processor to set an event's "raw_index" metadata field +// based on the given type, dataset, and namespace fields, as well as its +// `data_stream` field dynamically. +type AddDataStream struct { DataStream DataStream // cached, compiled version of the index name derived from the data stream idxNameCache string @@ -50,14 +50,14 @@ type AddDataStreamIndex struct { } // New returns a new AddDataStreamIndex processor. -func New(ds DataStream) *AddDataStreamIndex { +func New(ds DataStream) *AddDataStream { if ds.Namespace == "" { ds.Namespace = "default" } if ds.Dataset == "" { ds.Dataset = "generic" } - return &AddDataStreamIndex{ + return &AddDataStream{ DataStream: ds, idxNameCache: ds.indexName(), idxNamePartialCache: ds.idxNamePartialCache(), @@ -65,7 +65,7 @@ func New(ds DataStream) *AddDataStreamIndex { } // Run runs the processor. -func (p *AddDataStreamIndex) Run(event *beat.Event) (*beat.Event, error) { +func (p *AddDataStream) Run(event *beat.Event) (*beat.Event, error) { eventDataStream := p.DataStream if event.Meta == nil { event.Meta = common.MapStr{ @@ -80,12 +80,15 @@ func (p *AddDataStreamIndex) Run(event *beat.Event) (*beat.Event, error) { eventDataStream.Dataset = customDataset.(string) } } + if event.Fields == nil { + event.Fields = common.MapStr{} + } event.PutValue("data_stream", eventDataStream) return event, nil } -func (p *AddDataStreamIndex) String() string { +func (p *AddDataStream) String() string { return fmt.Sprintf("add_data_stream_index=%v", p.DataStream.indexName()) } diff --git a/libbeat/processors/add_data_stream/add_data_stream_test.go b/libbeat/processors/add_data_stream/add_data_stream_test.go index f6201114fd3..8fd3e812d2b 100644 --- a/libbeat/processors/add_data_stream/add_data_stream_test.go +++ b/libbeat/processors/add_data_stream/add_data_stream_test.go @@ -27,24 +27,26 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -func TestAddDataStreamIndex(t *testing.T) { +func TestAddDataStream(t *testing.T) { simpleDs := DataStream{ "myns", "myds", "mytype", } tests := []struct { - name string - ds DataStream - event *beat.Event - want string - wantErr bool + name string + ds DataStream + event *beat.Event + wantIndex string + wantDataStream DataStream + wantErr bool }{ { "simple", simpleDs, &beat.Event{}, "mytype-myds-myns", + simpleDs, false, }, { @@ -52,6 +54,7 @@ func TestAddDataStreamIndex(t *testing.T) { simpleDs, &beat.Event{Meta: common.MapStr{}}, "mytype-myds-myns", + simpleDs, false, }, { @@ -61,6 +64,7 @@ func TestAddDataStreamIndex(t *testing.T) { FieldMetaCustomDataset: "custom-ds", }}, "mytype-custom-ds-myns", + DataStream{"myns", "custom-ds", "mytype"}, false, }, { @@ -70,6 +74,7 @@ func TestAddDataStreamIndex(t *testing.T) { }, &beat.Event{}, "mytype-generic-default", + DataStream{"default", "generic", "mytype"}, false, }, } @@ -81,7 +86,8 @@ func TestAddDataStreamIndex(t *testing.T) { t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, tt.want, got.Meta[events.FieldMetaRawIndex]) + require.Equal(t, tt.wantIndex, got.Meta[events.FieldMetaRawIndex]) + require.Equal(t, tt.wantDataStream, got.Fields["data_stream"]) }) } } From 591a9cc44652301ec373d85fe5c0dd1c771db432 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 20 Aug 2021 14:40:35 -0500 Subject: [PATCH 03/12] handle event dataset --- heartbeat/config/config_test.go | 1 + heartbeat/magefile.go | 1 + heartbeat/monitors/factory.go | 1 - heartbeat/monitors/factory_test.go | 6 +++--- libbeat/processors/add_data_stream/add_data_stream.go | 1 + libbeat/processors/add_data_stream/add_data_stream_test.go | 1 + 6 files changed, 7 insertions(+), 4 deletions(-) diff --git a/heartbeat/config/config_test.go b/heartbeat/config/config_test.go index f4373cc528b..9b529728b45 100644 --- a/heartbeat/config/config_test.go +++ b/heartbeat/config/config_test.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build !integration // +build !integration package config diff --git a/heartbeat/magefile.go b/heartbeat/magefile.go index 0480c30a69e..40cc655ab1f 100644 --- a/heartbeat/magefile.go +++ b/heartbeat/magefile.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build mage // +build mage package main diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index ea3da4a8c75..2c756dbea37 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -117,7 +117,6 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { fields := clientCfg.Processing.Fields.Clone() - fields.Put("event.dataset", dataset) meta := clientCfg.Processing.Meta.Clone() if settings.Pipeline != "" { diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 7c515885421..9ef916057df 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" ) func TestSetupIndexProcessor(t *testing.T) { @@ -58,7 +58,7 @@ func TestSetupIndexProcessor(t *testing.T) { }, "data stream should be type-namespace-dataset": { publishSettings{ - DataStream: &add_data_stream_index.DataStream{ + DataStream: &add_data_stream.DataStream{ Namespace: "myNamespace", Dataset: "myDataset", Type: "myType", @@ -71,7 +71,7 @@ func TestSetupIndexProcessor(t *testing.T) { }, "data stream should use defaults": { publishSettings{ - DataStream: &add_data_stream_index.DataStream{}, + DataStream: &add_data_stream.DataStream{}, }, "synthetics-browser-default", "browser", diff --git a/libbeat/processors/add_data_stream/add_data_stream.go b/libbeat/processors/add_data_stream/add_data_stream.go index 31a4565a0af..d27f9960b50 100644 --- a/libbeat/processors/add_data_stream/add_data_stream.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -83,6 +83,7 @@ func (p *AddDataStream) Run(event *beat.Event) (*beat.Event, error) { if event.Fields == nil { event.Fields = common.MapStr{} } + event.PutValue("event.dataset", eventDataStream.Dataset) event.PutValue("data_stream", eventDataStream) return event, nil diff --git a/libbeat/processors/add_data_stream/add_data_stream_test.go b/libbeat/processors/add_data_stream/add_data_stream_test.go index 8fd3e812d2b..71d981d6034 100644 --- a/libbeat/processors/add_data_stream/add_data_stream_test.go +++ b/libbeat/processors/add_data_stream/add_data_stream_test.go @@ -88,6 +88,7 @@ func TestAddDataStream(t *testing.T) { } require.Equal(t, tt.wantIndex, got.Meta[events.FieldMetaRawIndex]) require.Equal(t, tt.wantDataStream, got.Fields["data_stream"]) + require.Equal(t, tt.wantDataStream.Dataset, got.Fields["event"].(common.MapStr)["dataset"]) }) } } From 757553af27a2508ce881055673e0e71a6b7fd814 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 20 Aug 2021 14:46:38 -0500 Subject: [PATCH 04/12] Update enrich --- .../monitors/browser/synthexec/enrich_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 8addfece0f4..629454f34c0 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/testslike" ) @@ -151,7 +151,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "step/screenshot"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -160,7 +160,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "step/screenshot_ref"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -171,7 +171,7 @@ func TestEnrichSynthEvent(t *testing.T) { func(t *testing.T, e *beat.Event, je *journeyEnricher) { require.Equal(t, "my_id", e.Meta["_id"]) require.Equal(t, events.OpTypeCreate, e.Meta[events.FieldMetaOpType]) - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -180,7 +180,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "journey/network_info"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.network", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.network", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, } From bcb059245dedd42a2de269fa106e95e3e326c629 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Fri, 20 Aug 2021 16:57:44 -0500 Subject: [PATCH 05/12] Update dataset behavior to match expectations --- heartbeat/monitors/factory.go | 37 +++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 2c756dbea37..c6b92d89eef 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" @@ -95,17 +96,6 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type) - if err != nil { - return nil, err - } - - userProcessors, err := processors.New(settings.Processors) - if err != nil { - return nil, err - } - - // TODO: Remove this logic in the 8.0/master branch, preserve only in 7.x dataset := settings.DataSet if dataset == "" { if settings.DataStream != nil && settings.DataStream.Dataset != "" { @@ -114,6 +104,15 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi dataset = "uptime" } } + datastreamProcessor, err := setupDataStream(info, settings, stdFields.Type) + if err != nil { + return nil, err + } + + userProcessors, err := processors.New(settings.Processors) + if err != nil { + return nil, err + } return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { fields := clientCfg.Processing.Fields.Clone() @@ -128,8 +127,8 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi // 2. add processors added by the input that wants to connect // 3. add locally configured processors from the 'processors' settings procs := processors.NewList(nil) - if indexProcessor != nil { - procs.AddProcessor(indexProcessor) + if datastreamProcessor != nil { + procs.AddProcessor(datastreamProcessor) } if lst := clientCfg.Processing.Processor; lst != nil { procs.AddProcessor(lst) @@ -149,8 +148,8 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi }, nil } -func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) { - var indexProcessor processors.Processor +func setupDataStream(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) { + var processor processors.Processor if settings.DataStream != nil { ds := settings.DataStream if ds.Type == "" { @@ -160,6 +159,10 @@ func setupIndexProcessor(info beat.Info, settings publishSettings, dataset strin ds.Dataset = dataset } return add_data_stream.New(*ds), nil + } else { + // for monitors writing to traditional indices... + // TODO: Consider removing in 8.0.0 + processor = actions.NewAddFields(common.MapStr{"event.dataset": "uptime"}, true, true) } if !settings.Index.IsEmpty() { @@ -170,7 +173,7 @@ func setupIndexProcessor(info beat.Info, settings publishSettings, dataset strin if err != nil { return nil, err } - indexProcessor = add_formatted_index.New(timestampFormat) + processor = add_formatted_index.New(timestampFormat) } - return indexProcessor, nil + return processor, nil } From 782f2d0d7e3e1a75968770fe2b6cf35ca1b63bea Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 23 Aug 2021 11:02:01 -0500 Subject: [PATCH 06/12] Fix var name --- heartbeat/monitors/factory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 9ef916057df..2984ed51fad 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -83,7 +83,7 @@ func TestSetupIndexProcessor(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}} - proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType) + proc, err := setupDataStream(binfo, tt.settings, tt.monitorType) if tt.wantErr == true { require.Error(t, err) return From e36f0cf7656b783576f5899a0db9ba4a90b3a2f3 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 23 Aug 2021 16:11:07 -0500 Subject: [PATCH 07/12] Fix up logic --- heartbeat/monitors/factory.go | 33 ++++++++----------- .../add_data_stream/add_data_stream.go | 1 - 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index c6b92d89eef..682994e6043 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -91,22 +91,17 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - stdFields, err := stdfields.ConfigToStdMonitorFields(cfg) - if err != nil { - return nil, err - } - - dataset := settings.DataSet - if dataset == "" { - if settings.DataStream != nil && settings.DataStream.Dataset != "" { - dataset = settings.DataStream.Dataset - } else { - dataset = "uptime" + var preprocessor processors.Processor + if settings.DataStream != nil && settings.DataStream.Dataset != "" { + var err error + preprocessor, err = setupDataStream(info, settings, settings.DataStream.Dataset) + if err != nil { + return nil, err } - } - datastreamProcessor, err := setupDataStream(info, settings, stdFields.Type) - if err != nil { - return nil, err + } else { + // for monitors writing to traditional indices... + // TODO: Consider removing in 8.0.0 + preprocessor = actions.NewAddFields(common.MapStr{"event.dataset": "uptime"}, true, true) } userProcessors, err := processors.New(settings.Processors) @@ -127,8 +122,8 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi // 2. add processors added by the input that wants to connect // 3. add locally configured processors from the 'processors' settings procs := processors.NewList(nil) - if datastreamProcessor != nil { - procs.AddProcessor(datastreamProcessor) + if preprocessor != nil { + procs.AddProcessor(preprocessor) } if lst := clientCfg.Processing.Processor; lst != nil { procs.AddProcessor(lst) @@ -160,9 +155,7 @@ func setupDataStream(info beat.Info, settings publishSettings, dataset string) ( } return add_data_stream.New(*ds), nil } else { - // for monitors writing to traditional indices... - // TODO: Consider removing in 8.0.0 - processor = actions.NewAddFields(common.MapStr{"event.dataset": "uptime"}, true, true) + } if !settings.Index.IsEmpty() { diff --git a/libbeat/processors/add_data_stream/add_data_stream.go b/libbeat/processors/add_data_stream/add_data_stream.go index d27f9960b50..31a4565a0af 100644 --- a/libbeat/processors/add_data_stream/add_data_stream.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -83,7 +83,6 @@ func (p *AddDataStream) Run(event *beat.Event) (*beat.Event, error) { if event.Fields == nil { event.Fields = common.MapStr{} } - event.PutValue("event.dataset", eventDataStream.Dataset) event.PutValue("data_stream", eventDataStream) return event, nil From f06c43582803864aca6920460dfa524dc630bcec Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 23 Aug 2021 19:12:08 -0500 Subject: [PATCH 08/12] Scratch --- heartbeat/heartbeat.yml | 7 +-- heartbeat/monitors/factory.go | 48 +++++++++---------- .../add_data_stream/add_data_stream.go | 1 + 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index fdbb29c9661..0fcd76718f6 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -23,7 +23,7 @@ heartbeat.config.monitors: heartbeat.monitors: - type: http # Set enabled to true (or delete the following line) to enable this example monitor - enabled: false + enabled: true # ID used to uniquely identify this monitor in elasticsearch even if the config changes id: my-monitor # Human readable display name for this service in Uptime UI and elsewhere @@ -94,10 +94,11 @@ setup.kibana: # Configure what output to use when sending the data collected by the beat. +output.console: ~ # ---------------------------- Elasticsearch Output ---------------------------- -output.elasticsearch: +#output.elasticsearch: # Array of hosts to connect to. - hosts: ["localhost:9200"] + #hosts: ["localhost:9200"] # Protocol - either `http` (default) or `https`. #protocol: "https" diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 682994e6043..9b56fe63a75 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -18,6 +18,8 @@ package monitors import ( + "fmt" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/scheduler" @@ -26,7 +28,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" - "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" @@ -91,17 +92,13 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - var preprocessor processors.Processor - if settings.DataStream != nil && settings.DataStream.Dataset != "" { - var err error - preprocessor, err = setupDataStream(info, settings, settings.DataStream.Dataset) - if err != nil { - return nil, err - } - } else { - // for monitors writing to traditional indices... - // TODO: Consider removing in 8.0.0 - preprocessor = actions.NewAddFields(common.MapStr{"event.dataset": "uptime"}, true, true) + sf, err := stdfields.ConfigToStdMonitorFields(cfg) + if err != nil { + return nil, fmt.Errorf("could not parse cfg for datastream %w", err) + } + datastreamProcessor, err := setupDataStream(info, settings, sf.Type) + if err != nil { + return nil, err } userProcessors, err := processors.New(settings.Processors) @@ -122,8 +119,8 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi // 2. add processors added by the input that wants to connect // 3. add locally configured processors from the 'processors' settings procs := processors.NewList(nil) - if preprocessor != nil { - procs.AddProcessor(preprocessor) + if datastreamProcessor != nil { + procs.AddProcessor(datastreamProcessor) } if lst := clientCfg.Processing.Processor; lst != nil { procs.AddProcessor(lst) @@ -143,19 +140,20 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi }, nil } -func setupDataStream(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) { - var processor processors.Processor - if settings.DataStream != nil { - ds := settings.DataStream - if ds.Type == "" { - ds.Type = "synthetics" - } - if ds.Dataset == "" { - ds.Dataset = dataset - } - return add_data_stream.New(*ds), nil +func setupDataStream(info beat.Info, settings publishSettings, monitorType string) (processors.Processor, error) { + var dataset string + if settings.DataStream != nil && settings.DataStream.Dataset != "" { + dataset = settings.DataStream.Dataset } else { + dataset = monitorType + } + ds := settings.DataStream + if ds.Type == "" { + ds.Type = "synthetics" + } + if ds.Dataset == "" { + ds.Dataset = dataset } if !settings.Index.IsEmpty() { diff --git a/libbeat/processors/add_data_stream/add_data_stream.go b/libbeat/processors/add_data_stream/add_data_stream.go index 31a4565a0af..d27f9960b50 100644 --- a/libbeat/processors/add_data_stream/add_data_stream.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -83,6 +83,7 @@ func (p *AddDataStream) Run(event *beat.Event) (*beat.Event, error) { if event.Fields == nil { event.Fields = common.MapStr{} } + event.PutValue("event.dataset", eventDataStream.Dataset) event.PutValue("data_stream", eventDataStream) return event, nil From 2ae7154d4e33fe4aa6e6e1fcaae848b25cb1814d Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 24 Aug 2021 21:55:41 -0500 Subject: [PATCH 09/12] Refactor monitor dataset assignment to be consistent and make sense --- heartbeat/monitors/factory.go | 59 +++++++++++++++++---------- heartbeat/monitors/factory_test.go | 64 +++++++++++++++++++++++++----- 2 files changed, 91 insertions(+), 32 deletions(-) diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 9b56fe63a75..d2b013d3c01 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/actions" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" @@ -96,7 +97,9 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi if err != nil { return nil, fmt.Errorf("could not parse cfg for datastream %w", err) } - datastreamProcessor, err := setupDataStream(info, settings, sf.Type) + + // Early stage processors for setting data_stream, event.dataset, and index to write to + preProcs, err := preProcessors(info, settings, sf.Type) if err != nil { return nil, err } @@ -114,17 +117,12 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi meta.Put("pipeline", settings.Pipeline) } - // assemble the processors. Ordering is important. - // 1. add support for index configuration via processor - // 2. add processors added by the input that wants to connect - // 3. add locally configured processors from the 'processors' settings procs := processors.NewList(nil) - if datastreamProcessor != nil { - procs.AddProcessor(datastreamProcessor) - } + if lst := clientCfg.Processing.Processor; lst != nil { procs.AddProcessor(lst) } + procs.AddProcessors(*preProcs) if userProcessors != nil { procs.AddProcessors(*userProcessors) } @@ -140,7 +138,10 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi }, nil } -func setupDataStream(info beat.Info, settings publishSettings, monitorType string) (processors.Processor, error) { +// preProcessors sets up the required event.dataset, data_stream.*, and write index processors for future event publishes. +func preProcessors(info beat.Info, settings publishSettings, monitorType string) (procs *processors.Processors, err error) { + procs = processors.NewList(nil) + var dataset string if settings.DataStream != nil && settings.DataStream.Dataset != "" { dataset = settings.DataStream.Dataset @@ -148,23 +149,39 @@ func setupDataStream(info beat.Info, settings publishSettings, monitorType strin dataset = monitorType } - ds := settings.DataStream - if ds.Type == "" { - ds.Type = "synthetics" - } - if ds.Dataset == "" { - ds.Dataset = dataset + // Always set event.dataset + procs.AddProcessor(actions.NewAddFields(common.MapStr{"event": common.MapStr{"dataset": dataset}}, true, true)) + + if settings.DataStream != nil { + ds := *settings.DataStream + if ds.Type == "" { + ds.Type = "synthetics" + } + if ds.Dataset == "" { + ds.Dataset = dataset + } + + procs.AddProcessor(add_data_stream.New(ds)) } if !settings.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) - - timestampFormat, err := - fmtstr.NewTimestampFormatString(&settings.Index, staticFields) + proc, err := indexProcessor(&settings.Index, info) if err != nil { return nil, err } - processor = add_formatted_index.New(timestampFormat) + procs.AddProcessor(proc) + } + + return procs, nil +} + +func indexProcessor(index *fmtstr.EventFormatString, info beat.Info) (beat.Processor, error) { + staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) + + timestampFormat, err := + fmtstr.NewTimestampFormatString(index, staticFields) + if err != nil { + return nil, err } - return processor, nil + return add_formatted_index.New(timestampFormat), nil } diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 2984ed51fad..27b119392ee 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -18,6 +18,7 @@ package monitors import ( + "regexp" "testing" "github.com/stretchr/testify/require" @@ -29,22 +30,24 @@ import ( "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" ) -func TestSetupIndexProcessor(t *testing.T) { +func TestPreProcessors(t *testing.T) { binfo := beat.Info{ Beat: "heartbeat", IndexPrefix: "heartbeat", Version: "8.0.0", } tests := map[string]struct { - settings publishSettings - expectedIndex string - monitorType string - wantProc bool - wantErr bool + settings publishSettings + expectedIndex string + expectedDatastream *add_data_stream.DataStream + monitorType string + wantProc bool + wantErr bool }{ "no settings should yield no processor": { publishSettings{}, "", + nil, "browser", false, false, @@ -52,6 +55,7 @@ func TestSetupIndexProcessor(t *testing.T) { "exact index should be used exactly": { publishSettings{Index: *fmtstr.MustCompileEvent("test")}, "test", + nil, "browser", true, false, @@ -65,6 +69,11 @@ func TestSetupIndexProcessor(t *testing.T) { }, }, "myType-myDataset-myNamespace", + &add_data_stream.DataStream{ + Namespace: "myNamespace", + Dataset: "myDataset", + Type: "myType", + }, "myType", true, false, @@ -74,6 +83,11 @@ func TestSetupIndexProcessor(t *testing.T) { DataStream: &add_data_stream.DataStream{}, }, "synthetics-browser-default", + &add_data_stream.DataStream{ + Namespace: "default", + Dataset: "browser", + Type: "synthetics", + }, "browser", true, false, @@ -83,21 +97,49 @@ func TestSetupIndexProcessor(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}} - proc, err := setupDataStream(binfo, tt.settings, tt.monitorType) + procs, err := preProcessors(binfo, tt.settings, tt.monitorType) if tt.wantErr == true { require.Error(t, err) return } require.NoError(t, err) + // If we're just setting event.dataset we only get the 1 + // otherwise we get a second add_data_stream processor if !tt.wantProc { - require.Nil(t, proc) + require.Len(t, procs.List, 1) return } + require.Len(t, procs.List, 2) + + _, err = procs.Run(&e) + + t.Run("index name should be set", func(t *testing.T) { + require.NoError(t, err) + require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) + }) + + eventDs, err := e.GetValue("event.dataset") + require.NoError(t, err) + + t.Run("event.dataset should always be present, preferring data_stream", func(t *testing.T) { + dataset := tt.monitorType + if tt.settings.DataStream != nil && tt.settings.DataStream.Dataset != "" { + dataset = tt.settings.DataStream.Dataset + } + require.Equal(t, dataset, eventDs, "event.dataset be computed correctly") + require.Regexp(t, regexp.MustCompile(`^.+`), eventDs, "should be a string > 1 char") + }) + + t.Run("event.data_stream", func(t *testing.T) { + dataStreamRaw, _ := e.GetValue("data_stream") + if tt.expectedDatastream != nil { + dataStream := dataStreamRaw.(add_data_stream.DataStream) + require.Equal(t, eventDs, dataStream.Dataset, "event.dataset be identical to data_stream.dataset") - require.NotNil(t, proc) - _, err = proc.Run(&e) - require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) + require.Equal(t, *tt.expectedDatastream, dataStream) + } + }) }) } } From d8f9258de7f37a348717776904fdce0005c59a74 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 24 Aug 2021 22:01:35 -0500 Subject: [PATCH 10/12] Update python tests --- heartbeat/tests/system/test_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/tests/system/test_base.py b/heartbeat/tests/system/test_base.py index 643f9f31bf7..e0361761e04 100644 --- a/heartbeat/tests/system/test_base.py +++ b/heartbeat/tests/system/test_base.py @@ -206,6 +206,6 @@ def test_dataset(self): for output in self.read_output(): self.assertEqual( output["event.dataset"], - "uptime", + output["monitor.type"], "Check for event.dataset in {} failed".format(output) ) From a255573b6df23ab46ec55dc2b5d56328093ac0e1 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 24 Aug 2021 22:02:48 -0500 Subject: [PATCH 11/12] Add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2b87556b364..2f603d8fad2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Remove long deprecated `watch_poll` functionality. {pull}27166[27166] +- Fix inconsistency in `event.dataset` values between heartbeat and fleet by always setting this value to the monitor type / fleet dataset. {pull}27535[27535] *Journalbeat* From 7dc2b811f658fb33620da293a4a850296b9dbbbc Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 25 Aug 2021 08:30:13 -0500 Subject: [PATCH 12/12] Fix id invocation --- heartbeat/heartbeat.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index 0fcd76718f6..fdbb29c9661 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -23,7 +23,7 @@ heartbeat.config.monitors: heartbeat.monitors: - type: http # Set enabled to true (or delete the following line) to enable this example monitor - enabled: true + enabled: false # ID used to uniquely identify this monitor in elasticsearch even if the config changes id: my-monitor # Human readable display name for this service in Uptime UI and elsewhere @@ -94,11 +94,10 @@ setup.kibana: # Configure what output to use when sending the data collected by the beat. -output.console: ~ # ---------------------------- Elasticsearch Output ---------------------------- -#output.elasticsearch: +output.elasticsearch: # Array of hosts to connect to. - #hosts: ["localhost:9200"] + hosts: ["localhost:9200"] # Protocol - either `http` (default) or `https`. #protocol: "https"