diff --git a/_meta/fields.common.yml b/_meta/fields.common.yml index 807961a35bb..d4d24ad0317 100644 --- a/_meta/fields.common.yml +++ b/_meta/fields.common.yml @@ -3,6 +3,21 @@ description: > Fields common to various APM events. fields: + - name: data_stream.type + type: keyword + description: "Data stream type: logs, metrics, or traces." + example: traces + + - name: data_stream.dataset + type: keyword + description: Data stream dataset name. + example: backend_service + + - name: data_stream.namespace + type: keyword + description: User-defined data stream namespace. + example: production + - name: processor.name type: keyword description: Processor name. diff --git a/beater/beater.go b/beater/beater.go index 80b38c07019..9e703796a39 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -36,8 +36,10 @@ import ( "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/apm-server/beater/config" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/ingest/pipeline" logs "github.com/elastic/apm-server/log" @@ -117,7 +119,6 @@ type beater struct { // Run runs the APM Server, blocking until the beater's Stop method is called, // or a fatal error occurs. func (bt *beater) Run(b *beat.Beat) error { - done := make(chan struct{}) var reloadOnce sync.Once @@ -127,6 +128,7 @@ func (bt *beater) Run(b *beat.Beat) error { // during startup. This might change when APM Server is included in Fleet reloadOnce.Do(func() { defer close(done) + // TODO(axw) config received from Fleet should be modified to set data_streams.enabled. var cfg *config.Config cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b)) if err != nil { @@ -365,11 +367,30 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis if err != nil { return nil, err } - return publish.NewPublisher(b.Publisher, tracer, &publish.PublisherConfig{ + publisherConfig := &publish.PublisherConfig{ Info: b.Info, Pipeline: cfg.Pipeline, TransformConfig: transformConfig, - }) + } + if !cfg.DataStreams.Enabled { + // Remove data_stream.* fields during publishing when data streams are disabled. + processors, err := processors.New(processors.PluginConfig{common.MustNewConfigFrom( + map[string]interface{}{ + "drop_fields": map[string]interface{}{ + "fields": []interface{}{ + datastreams.TypeField, + datastreams.DatasetField, + datastreams.NamespaceField, + }, + }, + }, + )}) + if err != nil { + return nil, err + } + publisherConfig.Processor = processors + } + return publish.NewPublisher(b.Publisher, tracer, publisherConfig) } func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) { diff --git a/beater/config/config.go b/beater/config/config.go index b40f00d6a62..0a086bf4c69 100644 --- a/beater/config/config.go +++ b/beater/config/config.go @@ -85,6 +85,7 @@ type Config struct { JaegerConfig JaegerConfig `config:"jaeger"` Aggregation AggregationConfig `config:"aggregation"` Sampling SamplingConfig `config:"sampling"` + DataStreams DataStreamsConfig `config:"data_streams"` Pipeline string } @@ -189,5 +190,6 @@ func DefaultConfig() *Config { JaegerConfig: defaultJaeger(), Aggregation: defaultAggregationConfig(), Sampling: defaultSamplingConfig(), + DataStreams: defaultDataStreamsConfig(), } } diff --git a/beater/config/data_streams.go b/beater/config/data_streams.go new file mode 100644 index 00000000000..1faf6144617 --- /dev/null +++ b/beater/config/data_streams.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +// DataStreamsConfig holds data streams configuration. +type DataStreamsConfig struct { + Enabled bool `config:"enabled"` +} + +func defaultDataStreamsConfig() DataStreamsConfig { + return DataStreamsConfig{Enabled: false} +} diff --git a/beater/http.go b/beater/http.go index 6ef8fcaa444..e07c2734363 100644 --- a/beater/http.go +++ b/beater/http.go @@ -100,8 +100,13 @@ func (h *httpServer) start() error { h.logger.Infof("Connection limit set to: %d", h.cfg.MaxConnections) } - // Create the "onboarding" document, which contains the server's listening address. - notifyListening(context.Background(), addr, h.reporter) + if !h.cfg.DataStreams.Enabled { + // Create the "onboarding" document, which contains the server's + // listening address. We only do this if data streams are not enabled, + // as onboarding documents are incompatible with data streams. + // Onboarding documents should be replaced by Fleet status later. + notifyListening(context.Background(), addr, h.reporter) + } if h.TLSConfig != nil { h.logger.Info("SSL enabled.") diff --git a/beater/telemetry.go b/beater/telemetry.go index 34d65c2eed4..8d6bbdff7a2 100644 --- a/beater/telemetry.go +++ b/beater/telemetry.go @@ -29,6 +29,7 @@ import ( var apmRegistry = monitoring.GetNamespace("state").GetRegistry().NewRegistry("apm-server") type configTelemetry struct { + dataStreamsEnabled *monitoring.Bool rumEnabled *monitoring.Bool apiKeysEnabled *monitoring.Bool kibanaEnabled *monitoring.Bool @@ -49,6 +50,7 @@ type configTelemetry struct { } var configMonitors = &configTelemetry{ + dataStreamsEnabled: monitoring.NewBool(apmRegistry, "data_streams.enabled"), rumEnabled: monitoring.NewBool(apmRegistry, "rum.enabled"), apiKeysEnabled: monitoring.NewBool(apmRegistry, "api_key.enabled"), kibanaEnabled: monitoring.NewBool(apmRegistry, "kibana.enabled"), @@ -73,6 +75,7 @@ func recordConfigs(info beat.Info, apmCfg *config.Config, rootCfg *common.Config if err != nil { return err } + configMonitors.dataStreamsEnabled.Set(apmCfg.DataStreams.Enabled) configMonitors.rumEnabled.Set(apmCfg.RumConfig.IsEnabled()) configMonitors.apiKeysEnabled.Set(apmCfg.APIKeyConfig.IsEnabled()) configMonitors.kibanaEnabled.Set(apmCfg.Kibana.Enabled) diff --git a/datastreams/constants.go b/datastreams/constants.go new file mode 100644 index 00000000000..2ab6f6f7f5f --- /dev/null +++ b/datastreams/constants.go @@ -0,0 +1,37 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package datastreams + +// Constants for data stream types. +const ( + LogsType = "logs" + MetricsType = "metrics" + TracesType = "traces" +) + +// Cosntants for data stream event metadata fields. +const ( + TypeField = "data_stream.type" + DatasetField = "data_stream.dataset" + NamespaceField = "data_stream.namespace" +) + +// IndexFormat holds the variable "index" format to use for the libbeat Elasticsearch output. +// Each event the server publishes is expected to contain data_stream.* fields, which will be +// added to the documents as well as be used for routing documents to the correct data stream. +const IndexFormat = "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}" diff --git a/datastreams/servicename.go b/datastreams/servicename.go new file mode 100644 index 00000000000..ac49422727b --- /dev/null +++ b/datastreams/servicename.go @@ -0,0 +1,45 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package datastreams + +import "strings" + +// NormalizeServiceName translates serviceName into a string suitable +// for inclusion in a data stream name. +// +// Concretely, this function will lowercase the string and replace any +// reserved characters with "_". +func NormalizeServiceName(s string) string { + s = strings.ToLower(s) + s = strings.Map(replaceReservedRune, s) + return s +} + +func replaceReservedRune(r rune) rune { + switch r { + case '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':': + // These characters are not permitted in data stream names + // by Elasticsearch. + return '_' + case '-': + // Hyphens are used to separate the data stream type, dataset, + // and namespace. + return '_' + } + return r +} diff --git a/datastreams/servicename_test.go b/datastreams/servicename_test.go new file mode 100644 index 00000000000..e71e1532c03 --- /dev/null +++ b/datastreams/servicename_test.go @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package datastreams_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/apm-server/datastreams" +) + +func TestNormalizeServiceName(t *testing.T) { + testNormalizeServiceName := func(expected, input string) { + t.Helper() + assert.Equal(t, expected, datastreams.NormalizeServiceName(input)) + } + testNormalizeServiceName("upper_case", "UPPER-CASE") + testNormalizeServiceName("____________", "\\/*?\"<>| ,#:") +} diff --git a/docs/fields.asciidoc b/docs/fields.asciidoc index b18c4201e69..a675e611972 100644 --- a/docs/fields.asciidoc +++ b/docs/fields.asciidoc @@ -38,6 +38,39 @@ Fields common to various APM events. +*`data_stream.type`*:: ++ +-- +Data stream type: logs, metrics, or traces. + +type: keyword + +example: traces + +-- + +*`data_stream.dataset`*:: ++ +-- +Data stream dataset name. + +type: keyword + +example: backend_service + +-- + +*`data_stream.namespace`*:: ++ +-- +User-defined data stream namespace. + +type: keyword + +example: production + +-- + *`processor.name`*:: + -- diff --git a/idxmgmt/supporter.go b/idxmgmt/supporter.go index 77543186223..e8a8f676f31 100644 --- a/idxmgmt/supporter.go +++ b/idxmgmt/supporter.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" libidxmgmt "github.com/elastic/beats/v7/libbeat/idxmgmt" libilm "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" "github.com/elastic/beats/v7/libbeat/logp" @@ -33,6 +34,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/template" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/idxmgmt/ilm" "github.com/elastic/apm-server/idxmgmt/unmanaged" ) @@ -49,6 +51,7 @@ const esKey = "elasticsearch" type supporter struct { log *logp.Logger info beat.Info + dataStreams bool templateConfig template.TemplateConfig ilmConfig ilm.Config unmanagedIdxConfig *unmanaged.Config @@ -63,6 +66,20 @@ type indexState struct { isSet atomic.Bool } +// newDataStreamSelector returns an outil.Selector which routes events to +// a data stream based on well-defined data_stream.* fields in events. +func newDataStreamSelector() (outputs.IndexSelector, error) { + fmtstr, err := fmtstr.CompileEvent(datastreams.IndexFormat) + if err != nil { + return nil, err + } + expr, err := outil.FmtSelectorExpr(fmtstr, "", outil.SelectorLowerCase) + if err != nil { + return nil, err + } + return outil.MakeSelector(expr), nil +} + type unmanagedIndexSelector outil.Selector type ilmIndexSelector struct { @@ -71,39 +88,41 @@ type ilmIndexSelector struct { st *indexState } -func newSupporter( - log *logp.Logger, - info beat.Info, - templateConfig template.TemplateConfig, - ilmConfig ilm.Config, - outConfig common.ConfigNamespace, -) (*supporter, error) { +func newSupporter(log *logp.Logger, info beat.Info, cfg *IndexManagementConfig) (*supporter, error) { var ( unmanagedIdxCfg unmanaged.Config - mode = ilmConfig.Mode + mode = cfg.ILM.Mode st = indexState{} ) - if outConfig.Name() == esKey { - if err := outConfig.Config().Unpack(&unmanagedIdxCfg); err != nil { + if cfg.Output.Name() == esKey { + if err := cfg.Output.Config().Unpack(&unmanagedIdxCfg); err != nil { return nil, fmt.Errorf("unpacking output elasticsearch index config fails: %+v", err) } - if err := checkTemplateESSettings(templateConfig, &unmanagedIdxCfg); err != nil { + if err := checkTemplateESSettings(cfg.Template, &unmanagedIdxCfg); err != nil { return nil, err } } - if outConfig.Name() != esKey || - ilmConfig.Mode == libilm.ModeDisabled || - ilmConfig.Mode == libilm.ModeAuto && unmanagedIdxCfg.Customized() { - + var disableILM bool + if cfg.Output.Name() != esKey || cfg.ILM.Mode == libilm.ModeDisabled { + disableILM = true + } else if cfg.ILM.Mode == libilm.ModeAuto { + // ILM is set to "auto": disable if we're using data streams, + // or if we're not using data streams but we're using customised, + // unmanaged indices. + if cfg.DataStreams || unmanagedIdxCfg.Customized() { + disableILM = true + } + } + if disableILM { mode = libilm.ModeDisabled st.isSet.CAS(false, true) } - ilmSupporters, err := ilm.MakeDefaultSupporter(log, mode, ilmConfig) + ilmSupporters, err := ilm.MakeDefaultSupporter(log, mode, cfg.ILM) if err != nil { return nil, err } @@ -111,8 +130,9 @@ func newSupporter( return &supporter{ log: log, info: info, - templateConfig: templateConfig, - ilmConfig: ilmConfig, + dataStreams: cfg.DataStreams, + templateConfig: cfg.Template, + ilmConfig: cfg.ILM, unmanagedIdxConfig: &unmanagedIdxCfg, migration: false, st: st, @@ -145,6 +165,10 @@ func (s *supporter) Manager( // depending on the supporter's config an ILM instance or an unmanaged index selector instance is returned. // The ILM instance decides on every Select call whether or not to return ILM indices or regular ones. func (s *supporter) BuildSelector(_ *common.Config) (outputs.IndexSelector, error) { + if s.dataStreams { + return newDataStreamSelector() + } + sel, err := s.buildSelector(s.unmanagedIdxConfig.SelectorConfig()) if err != nil { return nil, err diff --git a/idxmgmt/supporter_factory.go b/idxmgmt/supporter_factory.go index bb22010140d..9c8c801f6c4 100644 --- a/idxmgmt/supporter_factory.go +++ b/idxmgmt/supporter_factory.go @@ -33,31 +33,43 @@ import ( // functionality largely copied from libbeat type IndexManagementConfig struct { - Template template.TemplateConfig - ILM ilm.Config - Output common.ConfigNamespace + DataStreams bool + Template template.TemplateConfig + ILM ilm.Config + Output common.ConfigNamespace } -// MakeDefaultSupporter creates the index management supporter for APM that is passed to libbeat. +// MakeDefaultSupporter creates a new idxmgmt.Supporter, using the given root config. +// +// The Supporter will operate in one of three modes: data streams, legacy +// managed, and legacy unmanaged. The legacy modes exist purely to run +// apm-server without data streams or Fleet integration. +// +// If (Fleet) management is enabled, then no index, template, or ILM config +// should be set. Index (data stream) names will be well defined, based on +// the data type, service name, and user-defined namespace. +// +// If management is disabled, then the Supporter will operate in one of the +// legacy modes based on configuration. func MakeDefaultSupporter(log *logp.Logger, info beat.Info, configRoot *common.Config) (idxmgmt.Supporter, error) { cfg, err := NewIndexManagementConfig(info, configRoot) if err != nil { return nil, err } - if log == nil { log = logp.NewLogger(logs.IndexManagement) } else { log = log.Named(logs.IndexManagement) } - return newSupporter(log, info, cfg.Template, cfg.ILM, cfg.Output) + return newSupporter(log, info, cfg) } func NewIndexManagementConfig(info beat.Info, configRoot *common.Config) (*IndexManagementConfig, error) { cfg := struct { - ILM *common.Config `config:"apm-server.ilm"` - Template *common.Config `config:"setup.template"` - Output common.ConfigNamespace `config:"output"` + DataStreams *common.Config `config:"apm-server.data_streams"` + ILM *common.Config `config:"apm-server.ilm"` + Template *common.Config `config:"setup.template"` + Output common.ConfigNamespace `config:"output"` }{} if configRoot != nil { if err := configRoot.Unpack(&cfg); err != nil { @@ -76,9 +88,10 @@ func NewIndexManagementConfig(info beat.Info, configRoot *common.Config) (*Index } return &IndexManagementConfig{ - Template: tmplConfig, - ILM: ilmConfig, - Output: cfg.Output, + DataStreams: cfg.DataStreams.Enabled(), + Template: tmplConfig, + ILM: ilmConfig, + Output: cfg.Output, }, nil } diff --git a/include/fields.go b/include/fields.go index b223ce8442a..488e242db46 100644 --- a/include/fields.go +++ b/include/fields.go @@ -32,5 +32,5 @@ func init() { // AssetBuildFieldsFieldsYml returns asset data. // This is the base64 encoded gzipped contents of build/fields/fields.yml. func AssetBuildFieldsFieldsYml() string { - return "" + return "" } diff --git a/model/error.go b/model/error.go index e9d85fc63b3..e6ef6bf3c62 100644 --- a/model/error.go +++ b/model/error.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -109,7 +110,15 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve addStacktraceCounter(e.Log.Stacktrace) } + // Errors are stored in an APM errors-specific "logs" data stream, per service. + // By storing errors in a "logs" data stream, they can be viewed in the Logs app + // in Kibana. + dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) + fields := common.MapStr{ + datastreams.TypeField: datastreams.LogsType, + datastreams.DatasetField: dataset, + "error": e.fields(ctx, cfg), "processor": errorProcessorEntry, } diff --git a/model/error_test.go b/model/error_test.go index 02bc5262228..0ad89f04bf1 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -307,8 +307,10 @@ func TestEvents(t *testing.T) { "valid": { Transformable: &Error{Timestamp: timestamp, Metadata: md}, Output: common.MapStr{ - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "data_stream.type": "logs", + "data_stream.dataset": "apm.error.myservice", + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -320,9 +322,11 @@ func TestEvents(t *testing.T) { "notSampled": { Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse}, Output: common.MapStr{ - "transaction": common.MapStr{"sampled": false}, - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "data_stream.type": "logs", + "data_stream.dataset": "apm.error.myservice", + "transaction": common.MapStr{"sampled": false}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -334,7 +338,9 @@ func TestEvents(t *testing.T) { "withMeta": { Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionType: &transactionType}, Output: common.MapStr{ - "transaction": common.MapStr{"type": "request"}, + "data_stream.type": "logs", + "data_stream.dataset": "apm.error.myservice", + "transaction": common.MapStr{"type": "request"}, "error": common.MapStr{ "grouping_key": "d41d8cd98f00b204e9800998ecf8427e", }, @@ -363,13 +369,15 @@ func TestEvents(t *testing.T) { }, Output: common.MapStr{ - "labels": common.MapStr{"key": true, "label": 101}, - "service": common.MapStr{"name": "myservice", "version": "1.0"}, - "agent": common.MapStr{"name": "go", "version": "1.0"}, - "user": common.MapStr{"id": uid, "email": email}, - "client": common.MapStr{"ip": userIP}, - "source": common.MapStr{"ip": userIP}, - "user_agent": common.MapStr{"original": userAgent}, + "data_stream.type": "logs", + "data_stream.dataset": "apm.error.myservice", + "labels": common.MapStr{"key": true, "label": 101}, + "service": common.MapStr{"name": "myservice", "version": "1.0"}, + "agent": common.MapStr{"name": "go", "version": "1.0"}, + "user": common.MapStr{"id": uid, "email": email}, + "client": common.MapStr{"ip": userIP}, + "source": common.MapStr{"ip": userIP}, + "user_agent": common.MapStr{"original": userAgent}, "error": common.MapStr{ "custom": common.MapStr{ "foo": "bar", diff --git a/model/metricset.go b/model/metricset.go index 4b1a7f57de8..f3afda7d5a3 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/apm-server/datastreams" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" @@ -161,15 +162,19 @@ func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat. } } - fields["processor"] = metricsetProcessorEntry me.Metadata.Set(fields) + + var isInternal bool if eventFields := me.Event.fields(); eventFields != nil { + isInternal = true utility.DeepUpdate(fields, metricsetEventKey, eventFields) } if transactionFields := me.Transaction.fields(); transactionFields != nil { + isInternal = true utility.DeepUpdate(fields, metricsetTransactionKey, transactionFields) } if spanFields := me.Span.fields(); spanFields != nil { + isInternal = true utility.DeepUpdate(fields, metricsetSpanKey, spanFields) } @@ -180,6 +185,20 @@ func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat. fields["timeseries"] = common.MapStr{"instance": me.TimeseriesInstanceID} } + // Metrics are stored in "metrics" data streams. + dataset := "apm." + if isInternal { + // Metrics that include well-defined transaction/span fields + // (i.e. breakdown metrics, transaction and span metrics) will + // be stored separately from application and runtime metrics. + dataset += "internal." + } + dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name) + + fields["processor"] = metricsetProcessorEntry + fields[datastreams.TypeField] = datastreams.MetricsType + fields[datastreams.DatasetField] = dataset + return []beat.Event{{ Fields: fields, Timestamp: me.Timestamp, diff --git a/model/metricset_test.go b/model/metricset_test.go index fb6975b1899..29d5e658de4 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -62,7 +62,9 @@ func TestTransform(t *testing.T) { Metricset: &Metricset{Timestamp: timestamp, Metadata: metadata}, Output: []common.MapStr{ { - "processor": common.MapStr{"event": "metric", "name": "metric"}, + "data_stream.type": "metrics", + "data_stream.dataset": "apm.myservice", + "processor": common.MapStr{"event": "metric", "name": "metric"}, "service": common.MapStr{ "name": "myservice", }, @@ -73,106 +75,105 @@ func TestTransform(t *testing.T) { { Metricset: &Metricset{ Metadata: metadata, + Labels: common.MapStr{"a.b": "a.b.value"}, Timestamp: timestamp, - Samples: []Sample{{ - Name: "transaction.duration.histogram", - Counts: []int64{1}, - Values: []float64{42}, - }}, - Transaction: MetricsetTransaction{ - Type: trType, - Name: trName, - Result: trResult, - Root: true, + Samples: []Sample{ + { + Name: "a.counter", + Value: 612, + }, + { + Name: "some.gauge", + Value: 9.16, + }, }, - TimeseriesInstanceID: "foo", }, Output: []common.MapStr{ { - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, - "timeseries": common.MapStr{"instance": "foo"}, - "transaction": common.MapStr{ - "name": trName, - "type": trType, - "result": trResult, - "root": true, - "duration": common.MapStr{ - "histogram": common.MapStr{ - "counts": []int64{1}, - "values": []float64{42}, - }, - }, - }, + "data_stream.type": "metrics", + "data_stream.dataset": "apm.myservice", + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "labels": common.MapStr{"a.b": "a.b.value"}, + + "a": common.MapStr{"counter": float64(612)}, + "some": common.MapStr{"gauge": float64(9.16)}, }, }, - Msg: "Payload with extended transaction metadata.", + Msg: "Payload with valid metric.", }, { Metricset: &Metricset{ - Metadata: metadata, - Timestamp: timestamp, + Timestamp: timestamp, + Metadata: metadata, + Span: MetricsetSpan{Type: spType, Subtype: spSubtype}, + Transaction: MetricsetTransaction{Type: trType, Name: trName}, Samples: []Sample{{ - Name: "metric_field", + Name: "span.self_time.count", Value: 123, }}, - Event: MetricsetEventCategorization{ - Outcome: eventOutcome, - }, }, Output: []common.MapStr{ { - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, - "event": common.MapStr{"outcome": eventOutcome}, - "metric_field": 123.0, + "data_stream.type": "metrics", + "data_stream.dataset": "apm.internal.myservice", + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "transaction": common.MapStr{"type": trType, "name": trName}, + "span": common.MapStr{ + "type": spType, "subtype": spSubtype, + "self_time": common.MapStr{ + "count": 123.0, + }, + }, }, }, - Msg: "Payload with event categorization metadata.", + Msg: "Payload with breakdown metrics.", }, { Metricset: &Metricset{ - Metadata: metadata, - Labels: common.MapStr{"a.b": "a.b.value"}, Timestamp: timestamp, + Metadata: metadata, + Event: MetricsetEventCategorization{Outcome: eventOutcome}, + Transaction: MetricsetTransaction{ + Type: trType, + Name: trName, + Result: trResult, + Root: true, + }, + TimeseriesInstanceID: "foo", Samples: []Sample{ { - Name: "a.counter", - Value: 612, - }, - { - Name: "some.gauge", - Value: 9.16, - }, - { - Name: "histo.gram", + Name: "transaction.duration.histogram", Value: 666, // Value is ignored when Counts/Values are specified Counts: []int64{1, 2, 3}, Values: []float64{4.5, 6.0, 9.0}, }, }, - Span: MetricsetSpan{Type: spType, Subtype: spSubtype}, - Transaction: MetricsetTransaction{Type: trType, Name: trName}, }, Output: []common.MapStr{ { - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, - "transaction": common.MapStr{"name": trName, "type": trType}, - "span": common.MapStr{"type": spType, "subtype": spSubtype}, - "labels": common.MapStr{"a.b": "a.b.value"}, - - "a": common.MapStr{"counter": float64(612)}, - "some": common.MapStr{"gauge": float64(9.16)}, - "histo": common.MapStr{ - "gram": common.MapStr{ - "counts": []int64{1, 2, 3}, - "values": []float64{4.5, 6.0, 9.0}, + "data_stream.type": "metrics", + "data_stream.dataset": "apm.internal.myservice", + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, + "event": common.MapStr{"outcome": eventOutcome}, + "timeseries": common.MapStr{"instance": "foo"}, + "transaction": common.MapStr{ + "type": trType, + "name": trName, + "result": trResult, + "root": true, + "duration": common.MapStr{ + "histogram": common.MapStr{ + "counts": []int64{1, 2, 3}, + "values": []float64{4.5, 6.0, 9.0}, + }, }, }, }, }, - Msg: "Payload with valid metric.", + Msg: "Payload with transaction duration.", }, { Metricset: &Metricset{ @@ -194,8 +195,10 @@ func TestTransform(t *testing.T) { }, Output: []common.MapStr{ { - "processor": common.MapStr{"event": "metric", "name": "metric"}, - "service": common.MapStr{"name": "myservice"}, + "data_stream.type": "metrics", + "data_stream.dataset": "apm.internal.myservice", + "processor": common.MapStr{"event": "metric", "name": "metric"}, + "service": common.MapStr{"name": "myservice"}, "span": common.MapStr{"type": spType, "subtype": spSubtype, "destination": common.MapStr{"service": common.MapStr{"resource": resource}}}, "destination": common.MapStr{"service": common.MapStr{"response_time": common.MapStr{ diff --git a/model/modeldecoder/rumv3/metricset_test.go b/model/modeldecoder/rumv3/metricset_test.go index 809827b538f..a2c80b991ee 100644 --- a/model/modeldecoder/rumv3/metricset_test.go +++ b/model/modeldecoder/rumv3/metricset_test.go @@ -22,12 +22,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modeldecoder" "github.com/elastic/apm-server/model/modeldecoder/modeldecodertest" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestResetMetricsetOnRelease(t *testing.T) { diff --git a/model/profile.go b/model/profile.go index 666ca477976..566938453c2 100644 --- a/model/profile.go +++ b/model/profile.go @@ -26,6 +26,7 @@ import ( "github.com/gofrs/uuid" "github.com/google/pprof/profile" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" "github.com/elastic/beats/v7/libbeat/beat" @@ -66,6 +67,11 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea profileID = fmt.Sprintf("%x", uuid) } + // Profiles are stored in their own "metrics" data stream, with a data + // set per service. This enables managing retention of profiling data + // per-service, and indepedently of lower volume metrics. + dataset := fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name)) + samples := make([]beat.Event, len(pp.Profile.Sample)) for i, sample := range pp.Profile.Sample { profileFields := common.MapStr{} @@ -116,8 +122,10 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea event := beat.Event{ Timestamp: profileTimestamp, Fields: common.MapStr{ - "processor": profileProcessorEntry, - profileDocType: profileFields, + datastreams.TypeField: datastreams.MetricsType, + datastreams.DatasetField: dataset, + "processor": profileProcessorEntry, + profileDocType: profileFields, }, } pp.Metadata.Set(event.Fields) diff --git a/model/profile_test.go b/model/profile_test.go index 0a021193e84..8bfd75b9017 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -97,7 +97,9 @@ func TestPprofProfileTransform(t *testing.T) { assert.Equal(t, beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ - "processor": common.MapStr{"event": "profile", "name": "profile"}, + "data_stream.type": "metrics", + "data_stream.dataset": "apm.profiling.myservice", + "processor": common.MapStr{"event": "profile", "name": "profile"}, "service": common.MapStr{ "name": "myService", "environment": "staging", diff --git a/model/span.go b/model/span.go index fd767cf56cf..f14c6174b22 100644 --- a/model/span.go +++ b/model/span.go @@ -19,6 +19,7 @@ package model import ( "context" + "fmt" "net" "time" @@ -26,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -189,7 +191,13 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even spanFrameCounter.Add(int64(frames)) } + // Spans are stored in a "traces" data stream along with transactions. + dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) + fields := common.MapStr{ + datastreams.TypeField: datastreams.TracesType, + datastreams.DatasetField: dataset, + "processor": spanProcessorEntry, spanDocType: e.fields(ctx, cfg), } diff --git a/model/span_test.go b/model/span_test.go index d1e2a85ced6..2d1f8e26165 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -58,8 +58,10 @@ func TestSpanTransform(t *testing.T) { Msg: "Span without a Stacktrace", Span: Span{Timestamp: timestamp, Metadata: metadata}, Output: common.MapStr{ - "processor": common.MapStr{"event": "span", "name": "transaction"}, - "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, + "data_stream.type": "traces", + "data_stream.dataset": "apm.myservice", + "processor": common.MapStr{"event": "span", "name": "transaction"}, + "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -74,8 +76,10 @@ func TestSpanTransform(t *testing.T) { Msg: "Span with outcome", Span: Span{Timestamp: timestamp, Metadata: metadata, Outcome: "success"}, Output: common.MapStr{ - "processor": common.MapStr{"event": "span", "name": "transaction"}, - "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, + "data_stream.type": "traces", + "data_stream.dataset": "apm.myservice", + "processor": common.MapStr{"event": "span", "name": "transaction"}, + "service": common.MapStr{"name": serviceName, "environment": env, "version": serviceVersion}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -121,6 +125,8 @@ func TestSpanTransform(t *testing.T) { Message: &Message{QueueName: tests.StringPtr("users")}, }, Output: common.MapStr{ + "data_stream.type": "traces", + "data_stream.dataset": "apm.myservice", "span": common.MapStr{ "id": hexID, "duration": common.MapStr{"us": 1200}, diff --git a/model/transaction.go b/model/transaction.go index 4ae0815ae53..586aef85a41 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -19,12 +19,14 @@ package model import ( "context" + "fmt" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" ) @@ -111,7 +113,13 @@ func (e *Transaction) fields() common.MapStr { func (e *Transaction) Transform(_ context.Context, _ *transform.Config) []beat.Event { transactionTransformations.Inc() + // Transactions are stored in a "traces" data stream along with spans. + dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) + fields := common.MapStr{ + datastreams.TypeField: datastreams.TracesType, + datastreams.DatasetField: dataset, + "processor": transactionProcessorEntry, transactionDocType: e.fields(), } diff --git a/model/transaction_test.go b/model/transaction_test.go index 294b4fb236f..4fd04bacd41 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -181,10 +181,12 @@ func TestEventsTransformWithMetadata(t *testing.T) { events := txWithContext.Transform(context.Background(), &transform.Config{}) require.Len(t, events, 1) assert.Equal(t, events[0].Fields, common.MapStr{ - "user": common.MapStr{"id": "123", "name": "jane"}, - "client": common.MapStr{"ip": ip}, - "source": common.MapStr{"ip": ip}, - "user_agent": common.MapStr{"original": userAgent}, + "data_stream.type": "traces", + "data_stream.dataset": "apm." + serviceName, + "user": common.MapStr{"id": "123", "name": "jane"}, + "client": common.MapStr{"ip": ip}, + "source": common.MapStr{"ip": ip}, + "user_agent": common.MapStr{"original": userAgent}, "host": common.MapStr{ "architecture": "darwin", "hostname": "a.b.c", diff --git a/processor/otel/test_approved/consume_span.approved.json b/processor/otel/test_approved/consume_span.approved.json index 27532f1aa4c..38d72b66e0b 100644 --- a/processor/otel/test_approved/consume_span.approved.json +++ b/processor/otel/test_approved/consume_span.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -38,6 +40,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/jaeger_sampling_rate.approved.json b/processor/otel/test_approved/jaeger_sampling_rate.approved.json index 62dd8bc9954..cb858f95734 100644 --- a/processor/otel/test_approved/jaeger_sampling_rate.approved.json +++ b/processor/otel/test_approved/jaeger_sampling_rate.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -45,6 +47,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/metadata_jaeger-no-language.approved.json b/processor/otel/test_approved/metadata_jaeger-no-language.approved.json index c2b0e86f0ee..7947c0590d6 100644 --- a/processor/otel/test_approved/metadata_jaeger-no-language.approved.json +++ b/processor/otel/test_approved/metadata_jaeger-no-language.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "3.4.12" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/metadata_jaeger-version.approved.json b/processor/otel/test_approved/metadata_jaeger-version.approved.json index d3254dc9a11..1968c9e3e6c 100644 --- a/processor/otel/test_approved/metadata_jaeger-version.approved.json +++ b/processor/otel/test_approved/metadata_jaeger-version.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger/PHP", "version": "3.4.12" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/metadata_jaeger.approved.json b/processor/otel/test_approved/metadata_jaeger.approved.json index 878f9244434..12045a12e69 100644 --- a/processor/otel/test_approved/metadata_jaeger.approved.json +++ b/processor/otel/test_approved/metadata_jaeger.approved.json @@ -7,6 +7,8 @@ "name": "Jaeger/C++", "version": "3.2.1" }, + "data_stream.dataset": "apm.foo", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/metadata_jaeger_full-traceid.approved.json b/processor/otel/test_approved/metadata_jaeger_full-traceid.approved.json index c5c53999fe4..f08c8b749b5 100644 --- a/processor/otel/test_approved/metadata_jaeger_full-traceid.approved.json +++ b/processor/otel/test_approved/metadata_jaeger_full-traceid.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/metadata_jaeger_minimal.approved.json b/processor/otel/test_approved/metadata_jaeger_minimal.approved.json index 23443e1c37e..2ea014e11d3 100644 --- a/processor/otel/test_approved/metadata_jaeger_minimal.approved.json +++ b/processor/otel/test_approved/metadata_jaeger_minimal.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/metadata_minimal.approved.json b/processor/otel/test_approved/metadata_minimal.approved.json index 34faf081ee8..34008f72ff6 100644 --- a/processor/otel/test_approved/metadata_minimal.approved.json +++ b/processor/otel/test_approved/metadata_minimal.approved.json @@ -6,6 +6,8 @@ "name": "Foo", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/span_jaeger_custom.approved.json b/processor/otel/test_approved/span_jaeger_custom.approved.json index 52caa6e6884..1ac4cb50267 100644 --- a/processor/otel/test_approved/span_jaeger_custom.approved.json +++ b/processor/otel/test_approved/span_jaeger_custom.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/otel/test_approved/span_jaeger_db.approved.json b/processor/otel/test_approved/span_jaeger_db.approved.json index d09666b5ab8..af989813dc9 100644 --- a/processor/otel/test_approved/span_jaeger_db.approved.json +++ b/processor/otel/test_approved/span_jaeger_db.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "destination": { "address": "db", "port": 3306 diff --git a/processor/otel/test_approved/span_jaeger_http.approved.json b/processor/otel/test_approved/span_jaeger_http.approved.json index f0730302c8d..8bc36446961 100644 --- a/processor/otel/test_approved/span_jaeger_http.approved.json +++ b/processor/otel/test_approved/span_jaeger_http.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 80 @@ -79,6 +81,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -138,6 +142,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c", "log": { @@ -192,6 +198,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -248,6 +256,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -304,6 +314,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -360,6 +372,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "grouping_key": "c9221918248f05433f6b81c46a666aee", "log": { diff --git a/processor/otel/test_approved/span_jaeger_http_status_code.approved.json b/processor/otel/test_approved/span_jaeger_http_status_code.approved.json index b0eaa635f99..7ee56b57b25 100644 --- a/processor/otel/test_approved/span_jaeger_http_status_code.approved.json +++ b/processor/otel/test_approved/span_jaeger_http_status_code.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 80 diff --git a/processor/otel/test_approved/span_jaeger_https_default_port.approved.json b/processor/otel/test_approved/span_jaeger_https_default_port.approved.json index 118982f1120..af475472371 100644 --- a/processor/otel/test_approved/span_jaeger_https_default_port.approved.json +++ b/processor/otel/test_approved/span_jaeger_https_default_port.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "destination": { "address": "foo.bar.com", "port": 443 diff --git a/processor/otel/test_approved/span_jaeger_messaging.approved.json b/processor/otel/test_approved/span_jaeger_messaging.approved.json index 09358952df0..032f4516de9 100644 --- a/processor/otel/test_approved/span_jaeger_messaging.approved.json +++ b/processor/otel/test_approved/span_jaeger_messaging.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "destination": { "address": "mq", "port": 1234 diff --git a/processor/otel/test_approved/transaction_jaeger_custom.approved.json b/processor/otel/test_approved/transaction_jaeger_custom.approved.json index 7146dcf5257..cf301f83203 100644 --- a/processor/otel/test_approved/transaction_jaeger_custom.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_custom.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/transaction_jaeger_full.approved.json b/processor/otel/test_approved/transaction_jaeger_full.approved.json index d99eeeb0ba5..7ffece71652 100644 --- a/processor/otel/test_approved/transaction_jaeger_full.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_full.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -74,6 +76,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -139,6 +143,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "grouping_key": "23b7ac1bdf1ca957f9f581cfadee467c", "log": { @@ -199,6 +205,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -261,6 +269,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -323,6 +333,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "exception": [ { @@ -385,6 +397,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.error.unknown", + "data_stream.type": "logs", "error": { "grouping_key": "c9221918248f05433f6b81c46a666aee", "log": { diff --git a/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json b/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json index b25646f7724..81eb60c050b 100644 --- a/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_no_attrs.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "failure" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_component.approved.json b/processor/otel/test_approved/transaction_jaeger_type_component.approved.json index 1a54e23acd3..b414c1171e4 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_component.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_component.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json b/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json index bcb21fb6051..3ca0d234b8f 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_messaging.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_request.approved.json b/processor/otel/test_approved/transaction_jaeger_type_request.approved.json index 649a4bbc419..9f10e0ec06f 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_request.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_request.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "failure" }, diff --git a/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json b/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json index 965c35a5587..679228c4af0 100644 --- a/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json +++ b/processor/otel/test_approved/transaction_jaeger_type_request_result.approved.json @@ -6,6 +6,8 @@ "name": "Jaeger", "version": "unknown" }, + "data_stream.dataset": "apm.unknown", + "data_stream.type": "traces", "event": { "outcome": "success" }, diff --git a/processor/stream/package_tests/error_attrs_test.go b/processor/stream/package_tests/error_attrs_test.go index bc234c4747d..5a9c7eb5b82 100644 --- a/processor/stream/package_tests/error_attrs_test.go +++ b/processor/stream/package_tests/error_attrs_test.go @@ -110,6 +110,7 @@ func errorCondRequiredKeys() map[string]tests.Condition { func errorKeywordExceptionKeys() *tests.Set { return tests.NewSet( + "data_stream.type", "data_stream.dataset", "data_stream.namespace", "processor.event", "processor.name", "error.grouping_key", "context.tags", "transaction.name", "event.outcome", // not relevant diff --git a/processor/stream/package_tests/intake_test_processor.go b/processor/stream/package_tests/intake_test_processor.go index c85fb7c4438..e5efab0e89e 100644 --- a/processor/stream/package_tests/intake_test_processor.go +++ b/processor/stream/package_tests/intake_test_processor.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modeldecoder" @@ -142,6 +143,17 @@ func (p *intakeTestProcessor) Process(buf []byte) ([]beat.Event, error) { } } + for _, event := range events { + // TODO(axw) migrate all of these tests to systemtest, + // so we can use the proper event publishing pipeline. + // https://github.com/elastic/apm-server/issues/4408 + // + // We need to set the data_stream.namespace field manually; + // it would normally be set by the libbeat pipeline by a + // processor. + event.Fields.Put(datastreams.NamespaceField, "default") + } + if len(result.Errors) > 0 { return events, errors.New(result.Error()) } diff --git a/processor/stream/package_tests/metadata_attrs_test.go b/processor/stream/package_tests/metadata_attrs_test.go index 67883ef8992..de05c682361 100644 --- a/processor/stream/package_tests/metadata_attrs_test.go +++ b/processor/stream/package_tests/metadata_attrs_test.go @@ -124,7 +124,9 @@ func TestMetadataPayloadAttrsMatchFields(t *testing.T) { func TestKeywordLimitationOnMetadataAttrs(t *testing.T) { metadataProcSetup().KeywordLimitation( t, - tests.NewSet("processor.event", "processor.name", + tests.NewSet( + "data_stream.type", "data_stream.dataset", "data_stream.namespace", + "processor.event", "processor.name", "process.args", tests.Group("observer"), tests.Group("event"), diff --git a/processor/stream/package_tests/span_attrs_test.go b/processor/stream/package_tests/span_attrs_test.go index 5d83945bc91..4eb0c252689 100644 --- a/processor/stream/package_tests/span_attrs_test.go +++ b/processor/stream/package_tests/span_attrs_test.go @@ -129,6 +129,7 @@ func transactionContext() *tests.Set { func spanKeywordExceptionKeys() *tests.Set { return tests.Union(tests.NewSet( + "data_stream.type", "data_stream.dataset", "data_stream.namespace", "processor.event", "processor.name", "context.tags", "transaction.type", "transaction.name", "event.outcome", diff --git a/processor/stream/package_tests/transaction_attrs_test.go b/processor/stream/package_tests/transaction_attrs_test.go index bee0a593cdd..4da0d241ccb 100644 --- a/processor/stream/package_tests/transaction_attrs_test.go +++ b/processor/stream/package_tests/transaction_attrs_test.go @@ -92,6 +92,7 @@ func transactionRequiredKeys() *tests.Set { func transactionKeywordExceptionKeys() *tests.Set { return tests.NewSet( + "data_stream.type", "data_stream.dataset", "data_stream.namespace", "processor.event", "processor.name", "transaction.marks", "context.tags", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json index 54f576a7064..0389ee8613c 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationErrors.approved.json @@ -33,6 +33,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.error.service1", + "data_stream.type": "logs", "error": { "culprit": "my.module.function_name", "custom": { @@ -375,6 +377,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.error.1234_service_12a3", + "data_stream.type": "logs", "error": { "grouping_key": "dc8dd667f7036ec5f0bae87bf2188243", "id": "xFoaabb123FFFFFF", @@ -479,6 +483,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.error.1234_service_12a3", + "data_stream.type": "logs", "error": { "exception": [ { @@ -579,6 +585,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.error.service1", + "data_stream.type": "logs", "error": { "exception": [ { @@ -685,6 +693,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.error.1234_service_12a3", + "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json index 599c731aad3..d44e28bd1f4 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationEvents.approved.json @@ -13,6 +13,8 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, + "data_stream.dataset": "apm.experimental_java", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -194,6 +196,8 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -339,6 +343,8 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, + "data_stream.dataset": "apm.internal.1234_service_12a3", + "data_stream.type": "metrics", "dotted": { "float": { "gauge": 6.12 @@ -463,6 +469,8 @@ "container": { "id": "8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4" }, + "data_stream.dataset": "apm.error.service1", + "data_stream.type": "logs", "error": { "culprit": "opbeans.controllers.DTInterceptor.preHandle(DTInterceptor.java:73)", "custom": { diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json index bb7538be513..f659fa936a9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidEvent.approved.json @@ -6,6 +6,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json index bb7538be513..f659fa936a9 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationInvalidJSONEvent.approved.json @@ -6,6 +6,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json index a287923824a..28a23968ebf 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetadataNullValues.approved.json @@ -6,6 +6,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.error.1234_service_12a3", + "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json index 19762863b35..7290188947c 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMetricsets.approved.json @@ -7,6 +7,8 @@ "version": "3.14.0" }, "byte_counter": 1, + "data_stream.dataset": "apm.internal.1234_service_12a3", + "data_stream.type": "metrics", "dotted": { "float": { "gauge": 6.12 @@ -97,6 +99,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "metrics", "go": { "memstats": { "heap": { @@ -141,6 +145,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "metrics", "host": { "ip": "192.0.0.1" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json index bc593968dfd..640713b5f48 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationMinimalService.approved.json @@ -6,6 +6,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "metrics", "go": { "memstats": { "heap": { @@ -35,6 +37,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.error.1234_service_12a3", + "data_stream.type": "logs", "error": { "grouping_key": "d6b3f958dfea98dc9ed2b57d5f0c48bb", "id": "abcdef0123456789", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json index e4c1afa17f6..00525a34146 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationOptionalTimestamps.approved.json @@ -6,6 +6,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -81,6 +83,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -161,6 +165,8 @@ "name": "elastic-node", "version": "3.14.0" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "metrics", "host": { "architecture": "x64", "hostname": "prod1.example.com", diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json index 75223646be4..943b273fc39 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumErrors.approved.json @@ -9,6 +9,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.error.apm_agent_js", + "data_stream.type": "logs", "error": { "culprit": "test/e2e/general-usecase/bundle.js.map", "exception": [ diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json index 8fc3791df8c..79051b7e7da 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationRumTransactions.approved.json @@ -9,6 +9,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_agent_js", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -75,6 +77,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_agent_js", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json index 0d46024b789..f822471e299 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationSpans.approved.json @@ -34,6 +34,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -149,6 +151,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -265,6 +269,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -385,6 +391,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -502,6 +510,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "destination": { "address": "0:0::0:1", "ip": "0:0::0:1", @@ -701,6 +711,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.backendspans", + "data_stream.type": "traces", "destination": { "address": "0:0::0:1", "ip": "0:0::0:1" diff --git a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json index 7ff6a5f5ca1..40942881c59 100644 --- a/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeIntegrationTransactions.approved.json @@ -29,6 +29,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -146,6 +148,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -354,6 +358,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.service1", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -494,6 +500,8 @@ "container": { "id": "container-id" }, + "data_stream.dataset": "apm.1234_service_12a3", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json index c35eb2463e9..9d4b28ce273 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Errors.approved.json @@ -9,6 +9,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.error.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "logs", "error": { "culprit": "test/e2e/general-usecase/app.e2e-bundle.min.js?token=secret", "custom": { diff --git a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json index e5c85d57ebe..6cf3af3b8c7 100644 --- a/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json +++ b/processor/stream/test_approved_es_documents/testIntakeRUMV3Events.approved.json @@ -9,6 +9,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -155,6 +157,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -224,6 +228,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -293,6 +299,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8000 @@ -383,6 +391,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -451,6 +461,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8000 @@ -541,6 +553,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8003 @@ -631,6 +645,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "destination": { "address": "localhost", "port": 8003 @@ -722,6 +738,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -813,6 +831,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.internal.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -868,6 +888,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.internal.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -923,6 +945,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.internal.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "metrics", "labels": { "testTagKey": "testTagValue" }, @@ -978,6 +1002,8 @@ "client": { "ip": "192.0.0.1" }, + "data_stream.dataset": "apm.internal.apm_a_rum_test_e2e_general_usecase", + "data_stream.type": "metrics", "labels": { "tag1": "value1", "testTagKey": "testTagValue" diff --git a/publish/pub.go b/publish/pub.go index 97c3fb5256c..2df520ffc76 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "go.elastic.co/apm" + "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/transform" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -61,6 +62,7 @@ type PendingReq struct { type PublisherConfig struct { Info beat.Info Pipeline string + Processor beat.ProcessorList TransformConfig *transform.Config } @@ -86,6 +88,7 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf processingCfg := beat.ProcessingConfig{ Fields: common.MapStr{ + datastreams.NamespaceField: "default", "observer": common.MapStr{ "type": cfg.Info.Beat, "hostname": cfg.Info.Hostname, @@ -95,6 +98,7 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf "ephemeral_id": cfg.Info.EphemeralID.String(), }, }, + Processor: cfg.Processor, } if cfg.Pipeline != "" { processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline} diff --git a/testdata/jaeger/batch_0.approved.json b/testdata/jaeger/batch_0.approved.json index e30bc33d761..f84030bb217 100644 --- a/testdata/jaeger/batch_0.approved.json +++ b/testdata/jaeger/batch_0.approved.json @@ -7,6 +7,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.driver", + "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -60,6 +62,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.error.driver", + "data_stream.type": "logs", "error": { "exception": [ { @@ -110,6 +114,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.error.driver", + "data_stream.type": "logs", "error": { "exception": [ { @@ -160,6 +166,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.error.driver", + "data_stream.type": "logs", "error": { "exception": [ { diff --git a/testdata/jaeger/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json index 336fc199743..5a6e6669a20 100644 --- a/testdata/jaeger/batch_1.approved.json +++ b/testdata/jaeger/batch_1.approved.json @@ -7,6 +7,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -56,6 +58,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -106,6 +110,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -155,6 +161,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -204,6 +212,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -253,6 +263,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -302,6 +314,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -352,6 +366,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -401,6 +417,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -450,6 +468,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -499,6 +519,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -548,6 +570,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -598,6 +622,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -647,6 +673,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.redis", + "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -696,6 +724,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.error.redis", + "data_stream.type": "logs", "error": { "exception": [ { @@ -742,6 +772,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.error.redis", + "data_stream.type": "logs", "error": { "exception": [ { @@ -788,6 +820,8 @@ "name": "Jaeger/Go", "version": "2.20.1" }, + "data_stream.dataset": "apm.error.redis", + "data_stream.type": "logs", "error": { "exception": [ { diff --git a/tests/fields.go b/tests/fields.go index a33c49d4d4e..03693012f9a 100644 --- a/tests/fields.go +++ b/tests/fields.go @@ -44,6 +44,7 @@ import ( // not part of the payload, e.g. Kibana visualisation attributes. func (ps *ProcessorSetup) PayloadAttrsMatchFields(t *testing.T, payloadAttrsNotInFields, fieldsNotInPayload *Set) { notInFields := Union(payloadAttrsNotInFields, NewSet( + Group("data_stream"), Group("processor"), //dynamically indexed: Group("labels"),