diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a1480a19c4d..75b21786701 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Remove long deprecated `watch_poll` functionality. {pull}27166[27166] +- Fix inconsistency in `event.dataset` values between heartbeat and fleet by always setting this value to the monitor type / fleet dataset. {pull}27535[27535] *Journalbeat* diff --git a/heartbeat/config/config_test.go b/heartbeat/config/config_test.go index f4373cc528b..9b529728b45 100644 --- a/heartbeat/config/config_test.go +++ b/heartbeat/config/config_test.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build !integration // +build !integration package config diff --git a/heartbeat/magefile.go b/heartbeat/magefile.go index 0480c30a69e..40cc655ab1f 100644 --- a/heartbeat/magefile.go +++ b/heartbeat/magefile.go @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +//go:build mage // +build mage package main diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 5e82969fd18..d2b013d3c01 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -18,6 +18,8 @@ package monitors import ( + "fmt" + "github.com/elastic/beats/v7/heartbeat/monitors/plugin" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/scheduler" @@ -26,7 +28,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" "github.com/elastic/beats/v7/libbeat/processors" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/actions" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) @@ -51,10 +54,10 @@ type publishSettings struct { KeepNull bool `config:"keep_null"` // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - Index fmtstr.EventFormatString `config:"index"` // ES output index pattern - DataStream *add_data_stream_index.DataStream `config:"data_stream"` - DataSet string `config:"dataset"` + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern + DataStream *add_data_stream.DataStream `config:"data_stream"` + DataSet string `config:"dataset"` } // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. @@ -90,12 +93,13 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - stdFields, err := stdfields.ConfigToStdMonitorFields(cfg) + sf, err := stdfields.ConfigToStdMonitorFields(cfg) if err != nil { - return nil, err + return nil, fmt.Errorf("could not parse cfg for datastream %w", err) } - indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type) + // Early stage processors for setting data_stream, event.dataset, and index to write to + preProcs, err := preProcessors(info, settings, sf.Type) if err != nil { return nil, err } @@ -105,40 +109,20 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi return nil, err } - // TODO: Remove this logic in the 8.0/master branch, preserve only in 7.x - dataset := settings.DataSet - if dataset == "" { - if settings.DataStream != nil && settings.DataStream.Dataset != "" { - dataset = settings.DataStream.Dataset - } else { - dataset = "uptime" - } - } - return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { fields := clientCfg.Processing.Fields.Clone() - fields.Put("event.dataset", dataset) - - if settings.DataStream != nil { - fields.Put("data_stream", settings.DataStream) - } meta := clientCfg.Processing.Meta.Clone() if settings.Pipeline != "" { meta.Put("pipeline", settings.Pipeline) } - // assemble the processors. Ordering is important. - // 1. add support for index configuration via processor - // 2. add processors added by the input that wants to connect - // 3. add locally configured processors from the 'processors' settings procs := processors.NewList(nil) - if indexProcessor != nil { - procs.AddProcessor(indexProcessor) - } + if lst := clientCfg.Processing.Processor; lst != nil { procs.AddProcessor(lst) } + procs.AddProcessors(*preProcs) if userProcessors != nil { procs.AddProcessors(*userProcessors) } @@ -154,28 +138,50 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi }, nil } -func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) { - var indexProcessor processors.Processor +// preProcessors sets up the required event.dataset, data_stream.*, and write index processors for future event publishes. +func preProcessors(info beat.Info, settings publishSettings, monitorType string) (procs *processors.Processors, err error) { + procs = processors.NewList(nil) + + var dataset string + if settings.DataStream != nil && settings.DataStream.Dataset != "" { + dataset = settings.DataStream.Dataset + } else { + dataset = monitorType + } + + // Always set event.dataset + procs.AddProcessor(actions.NewAddFields(common.MapStr{"event": common.MapStr{"dataset": dataset}}, true, true)) + if settings.DataStream != nil { - ds := settings.DataStream + ds := *settings.DataStream if ds.Type == "" { ds.Type = "synthetics" } if ds.Dataset == "" { ds.Dataset = dataset } - return add_data_stream_index.New(*ds), nil + + procs.AddProcessor(add_data_stream.New(ds)) } if !settings.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) - - timestampFormat, err := - fmtstr.NewTimestampFormatString(&settings.Index, staticFields) + proc, err := indexProcessor(&settings.Index, info) if err != nil { return nil, err } - indexProcessor = add_formatted_index.New(timestampFormat) + procs.AddProcessor(proc) + } + + return procs, nil +} + +func indexProcessor(index *fmtstr.EventFormatString, info beat.Info) (beat.Processor, error) { + staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) + + timestampFormat, err := + fmtstr.NewTimestampFormatString(index, staticFields) + if err != nil { + return nil, err } - return indexProcessor, nil + return add_formatted_index.New(timestampFormat), nil } diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index 7c515885421..27b119392ee 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -18,6 +18,7 @@ package monitors import ( + "regexp" "testing" "github.com/stretchr/testify/require" @@ -26,25 +27,27 @@ import ( "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" ) -func TestSetupIndexProcessor(t *testing.T) { +func TestPreProcessors(t *testing.T) { binfo := beat.Info{ Beat: "heartbeat", IndexPrefix: "heartbeat", Version: "8.0.0", } tests := map[string]struct { - settings publishSettings - expectedIndex string - monitorType string - wantProc bool - wantErr bool + settings publishSettings + expectedIndex string + expectedDatastream *add_data_stream.DataStream + monitorType string + wantProc bool + wantErr bool }{ "no settings should yield no processor": { publishSettings{}, "", + nil, "browser", false, false, @@ -52,28 +55,39 @@ func TestSetupIndexProcessor(t *testing.T) { "exact index should be used exactly": { publishSettings{Index: *fmtstr.MustCompileEvent("test")}, "test", + nil, "browser", true, false, }, "data stream should be type-namespace-dataset": { publishSettings{ - DataStream: &add_data_stream_index.DataStream{ + DataStream: &add_data_stream.DataStream{ Namespace: "myNamespace", Dataset: "myDataset", Type: "myType", }, }, "myType-myDataset-myNamespace", + &add_data_stream.DataStream{ + Namespace: "myNamespace", + Dataset: "myDataset", + Type: "myType", + }, "myType", true, false, }, "data stream should use defaults": { publishSettings{ - DataStream: &add_data_stream_index.DataStream{}, + DataStream: &add_data_stream.DataStream{}, }, "synthetics-browser-default", + &add_data_stream.DataStream{ + Namespace: "default", + Dataset: "browser", + Type: "synthetics", + }, "browser", true, false, @@ -83,21 +97,49 @@ func TestSetupIndexProcessor(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}} - proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType) + procs, err := preProcessors(binfo, tt.settings, tt.monitorType) if tt.wantErr == true { require.Error(t, err) return } require.NoError(t, err) + // If we're just setting event.dataset we only get the 1 + // otherwise we get a second add_data_stream processor if !tt.wantProc { - require.Nil(t, proc) + require.Len(t, procs.List, 1) return } + require.Len(t, procs.List, 2) + + _, err = procs.Run(&e) + + t.Run("index name should be set", func(t *testing.T) { + require.NoError(t, err) + require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) + }) + + eventDs, err := e.GetValue("event.dataset") + require.NoError(t, err) + + t.Run("event.dataset should always be present, preferring data_stream", func(t *testing.T) { + dataset := tt.monitorType + if tt.settings.DataStream != nil && tt.settings.DataStream.Dataset != "" { + dataset = tt.settings.DataStream.Dataset + } + require.Equal(t, dataset, eventDs, "event.dataset be computed correctly") + require.Regexp(t, regexp.MustCompile(`^.+`), eventDs, "should be a string > 1 char") + }) + + t.Run("event.data_stream", func(t *testing.T) { + dataStreamRaw, _ := e.GetValue("data_stream") + if tt.expectedDatastream != nil { + dataStream := dataStreamRaw.(add_data_stream.DataStream) + require.Equal(t, eventDs, dataStream.Dataset, "event.dataset be identical to data_stream.dataset") - require.NotNil(t, proc) - _, err = proc.Run(&e) - require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex]) + require.Equal(t, *tt.expectedDatastream, dataStream) + } + }) }) } } diff --git a/heartbeat/tests/system/test_base.py b/heartbeat/tests/system/test_base.py index 643f9f31bf7..e0361761e04 100644 --- a/heartbeat/tests/system/test_base.py +++ b/heartbeat/tests/system/test_base.py @@ -206,6 +206,6 @@ def test_dataset(self): for output in self.read_output(): self.assertEqual( output["event.dataset"], - "uptime", + output["monitor.type"], "Check for event.dataset in {} failed".format(output) ) diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index.go b/libbeat/processors/add_data_stream/add_data_stream.go similarity index 57% rename from libbeat/processors/add_data_stream_index/add_data_stream_index.go rename to libbeat/processors/add_data_stream/add_data_stream.go index 7d770c6f082..d27f9960b50 100644 --- a/libbeat/processors/add_data_stream_index/add_data_stream_index.go +++ b/libbeat/processors/add_data_stream/add_data_stream.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package add_data_stream_index +package add_data_stream import ( "fmt" @@ -37,50 +37,59 @@ func SetEventDataset(event *beat.Event, ds string) { } } -// AddDataStreamIndex is a Processor to set an event's "raw_index" metadata field -// based on the given type, dataset, and namespace fields. -// If the event's metadata contains an -type AddDataStreamIndex struct { +// AddDataStream is a Processor to set an event's "raw_index" metadata field +// based on the given type, dataset, and namespace fields, as well as its +// `data_stream` field dynamically. +type AddDataStream struct { DataStream DataStream // cached, compiled version of the index name derived from the data stream - dsCached string - customDsCache string + idxNameCache string + // cached, compiled version of the index name derived from the data stream, sans datastream + // which is dynamic + idxNamePartialCache string } // New returns a new AddDataStreamIndex processor. -func New(ds DataStream) *AddDataStreamIndex { +func New(ds DataStream) *AddDataStream { if ds.Namespace == "" { ds.Namespace = "default" } if ds.Dataset == "" { ds.Dataset = "generic" } - return &AddDataStreamIndex{ - DataStream: ds, - dsCached: ds.indexName(), - customDsCache: ds.datasetFmtString(), + return &AddDataStream{ + DataStream: ds, + idxNameCache: ds.indexName(), + idxNamePartialCache: ds.idxNamePartialCache(), } } // Run runs the processor. -func (p *AddDataStreamIndex) Run(event *beat.Event) (*beat.Event, error) { +func (p *AddDataStream) Run(event *beat.Event) (*beat.Event, error) { + eventDataStream := p.DataStream if event.Meta == nil { event.Meta = common.MapStr{ - events.FieldMetaRawIndex: p.dsCached, + events.FieldMetaRawIndex: p.idxNameCache, } } else { - customDs, hasCustom := event.Meta[FieldMetaCustomDataset] + customDataset, hasCustom := event.Meta[FieldMetaCustomDataset] if !hasCustom { - event.Meta[events.FieldMetaRawIndex] = p.dsCached + event.Meta[events.FieldMetaRawIndex] = p.idxNameCache } else { - event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.customDsCache, customDs) + event.Meta[events.FieldMetaRawIndex] = fmt.Sprintf(p.idxNamePartialCache, customDataset) + eventDataStream.Dataset = customDataset.(string) } } + if event.Fields == nil { + event.Fields = common.MapStr{} + } + event.PutValue("event.dataset", eventDataStream.Dataset) + event.PutValue("data_stream", eventDataStream) return event, nil } -func (p *AddDataStreamIndex) String() string { +func (p *AddDataStream) String() string { return fmt.Sprintf("add_data_stream_index=%v", p.DataStream.indexName()) } @@ -92,10 +101,12 @@ type DataStream struct { Type string `config:"type"` } -func (ds DataStream) datasetFmtString() string { +// genIdxFmtStringForDataset returns a format string that takes a single argument, the dataset +// this is slightly more efficient than generating the entire string with all three vars every time +func (ds DataStream) idxNamePartialCache() string { return fmt.Sprintf("%s-%%s-%s", ds.Type, ds.Namespace) } func (ds DataStream) indexName() string { - return fmt.Sprintf(ds.datasetFmtString(), ds.Dataset) + return fmt.Sprintf(ds.idxNamePartialCache(), ds.Dataset) } diff --git a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go b/libbeat/processors/add_data_stream/add_data_stream_test.go similarity index 76% rename from libbeat/processors/add_data_stream_index/add_data_stream_index_test.go rename to libbeat/processors/add_data_stream/add_data_stream_test.go index d71181e3127..71d981d6034 100644 --- a/libbeat/processors/add_data_stream_index/add_data_stream_index_test.go +++ b/libbeat/processors/add_data_stream/add_data_stream_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package add_data_stream_index +package add_data_stream import ( "testing" @@ -27,24 +27,26 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -func TestAddDataStreamIndex(t *testing.T) { +func TestAddDataStream(t *testing.T) { simpleDs := DataStream{ "myns", "myds", "mytype", } tests := []struct { - name string - ds DataStream - event *beat.Event - want string - wantErr bool + name string + ds DataStream + event *beat.Event + wantIndex string + wantDataStream DataStream + wantErr bool }{ { "simple", simpleDs, &beat.Event{}, "mytype-myds-myns", + simpleDs, false, }, { @@ -52,6 +54,7 @@ func TestAddDataStreamIndex(t *testing.T) { simpleDs, &beat.Event{Meta: common.MapStr{}}, "mytype-myds-myns", + simpleDs, false, }, { @@ -61,6 +64,7 @@ func TestAddDataStreamIndex(t *testing.T) { FieldMetaCustomDataset: "custom-ds", }}, "mytype-custom-ds-myns", + DataStream{"myns", "custom-ds", "mytype"}, false, }, { @@ -70,6 +74,7 @@ func TestAddDataStreamIndex(t *testing.T) { }, &beat.Event{}, "mytype-generic-default", + DataStream{"default", "generic", "mytype"}, false, }, } @@ -81,7 +86,9 @@ func TestAddDataStreamIndex(t *testing.T) { t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) return } - require.Equal(t, tt.want, got.Meta[events.FieldMetaRawIndex]) + require.Equal(t, tt.wantIndex, got.Meta[events.FieldMetaRawIndex]) + require.Equal(t, tt.wantDataStream, got.Fields["data_stream"]) + require.Equal(t, tt.wantDataStream.Dataset, got.Fields["event"].(common.MapStr)["dataset"]) }) } } diff --git a/libbeat/processors/add_data_stream_index/datastream.go b/libbeat/processors/add_data_stream_index/datastream.go deleted file mode 100644 index c2a69dc5214..00000000000 --- a/libbeat/processors/add_data_stream_index/datastream.go +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package add_data_stream_index diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index a842dd28794..c3ab6c76faa 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -9,7 +9,7 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/beat/events" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/gofrs/uuid" @@ -122,9 +122,9 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e case "step/screenshot_ref": fallthrough case "screenshot/block": - add_data_stream_index.SetEventDataset(event, "browser.screenshot") + add_data_stream.SetEventDataset(event, "browser.screenshot") case "journey/network_info": - add_data_stream_index.SetEventDataset(event, "browser.network") + add_data_stream.SetEventDataset(event, "browser.network") } if se.Id != "" { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 8addfece0f4..629454f34c0 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index" + "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/testslike" ) @@ -151,7 +151,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "step/screenshot"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -160,7 +160,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "step/screenshot_ref"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -171,7 +171,7 @@ func TestEnrichSynthEvent(t *testing.T) { func(t *testing.T, e *beat.Event, je *journeyEnricher) { require.Equal(t, "my_id", e.Meta["_id"]) require.Equal(t, events.OpTypeCreate, e.Meta[events.FieldMetaOpType]) - require.Equal(t, "browser.screenshot", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.screenshot", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, { @@ -180,7 +180,7 @@ func TestEnrichSynthEvent(t *testing.T) { &SynthEvent{Type: "journey/network_info"}, false, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, "browser.network", e.Meta[add_data_stream_index.FieldMetaCustomDataset]) + require.Equal(t, "browser.network", e.Meta[add_data_stream.FieldMetaCustomDataset]) }, }, }