diff --git a/beater/beater.go b/beater/beater.go index 9e703796a39..3bb2fd6d4eb 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -36,10 +36,8 @@ import ( "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" - "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/apm-server/beater/config" - "github.com/elastic/apm-server/datastreams" "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/ingest/pipeline" logs "github.com/elastic/apm-server/log" @@ -372,29 +370,12 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis Pipeline: cfg.Pipeline, TransformConfig: transformConfig, } - if !cfg.DataStreams.Enabled { - // Remove data_stream.* fields during publishing when data streams are disabled. - processors, err := processors.New(processors.PluginConfig{common.MustNewConfigFrom( - map[string]interface{}{ - "drop_fields": map[string]interface{}{ - "fields": []interface{}{ - datastreams.TypeField, - datastreams.DatasetField, - datastreams.NamespaceField, - }, - }, - }, - )}) - if err != nil { - return nil, err - } - publisherConfig.Processor = processors - } return publish.NewPublisher(b.Publisher, tracer, publisherConfig) } func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) { transformConfig := &transform.Config{ + DataStreams: cfg.DataStreams.Enabled, RUM: transform.RUMConfig{ LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern), ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping), diff --git a/model/error.go b/model/error.go index e6ef6bf3c62..0c247c1a3bf 100644 --- a/model/error.go +++ b/model/error.go @@ -110,19 +110,20 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve addStacktraceCounter(e.Log.Stacktrace) } - // Errors are stored in an APM errors-specific "logs" data stream, per service. - // By storing errors in a "logs" data stream, they can be viewed in the Logs app - // in Kibana. - dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) - fields := common.MapStr{ - datastreams.TypeField: datastreams.LogsType, - datastreams.DatasetField: dataset, - "error": e.fields(ctx, cfg), "processor": errorProcessorEntry, } + if cfg.DataStreams { + // Errors are stored in an APM errors-specific "logs" data stream, per service. + // By storing errors in a "logs" data stream, they can be viewed in the Logs app + // in Kibana. + dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) + fields[datastreams.TypeField] = datastreams.LogsType + fields[datastreams.DatasetField] = dataset + } + // first set the generic metadata (order is relevant) e.Metadata.Set(fields) utility.Set(fields, "source", fields["client"]) diff --git a/model/error_test.go b/model/error_test.go index 0ad89f04bf1..441c87a5f20 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -411,7 +411,8 @@ func TestEvents(t *testing.T) { } { t.Run(name, func(t *testing.T) { outputEvents := tc.Transformable.Transform(context.Background(), &transform.Config{ - RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, + DataStreams: true, + RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, }) require.Len(t, outputEvents, 1) outputEvent := outputEvents[0] diff --git a/model/metricset.go b/model/metricset.go index f3afda7d5a3..099e987d686 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -148,13 +148,13 @@ type MetricsetSpan struct { DestinationService DestinationService } -func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat.Event { +func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { metricsetTransformations.Inc() if me == nil { return nil } - fields := common.MapStr{} + fields := make(common.MapStr, 5) for _, sample := range me.Samples { if err := sample.set(fields); err != nil { logp.NewLogger(logs.Transform).Warnf("failed to transform sample %#v", sample) @@ -185,19 +185,21 @@ func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat. fields["timeseries"] = common.MapStr{"instance": me.TimeseriesInstanceID} } - // Metrics are stored in "metrics" data streams. - dataset := "apm." - if isInternal { - // Metrics that include well-defined transaction/span fields - // (i.e. breakdown metrics, transaction and span metrics) will - // be stored separately from application and runtime metrics. - dataset += "internal." - } - dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name) - fields["processor"] = metricsetProcessorEntry - fields[datastreams.TypeField] = datastreams.MetricsType - fields[datastreams.DatasetField] = dataset + + if cfg.DataStreams { + // Metrics are stored in "metrics" data streams. + dataset := "apm." + if isInternal { + // Metrics that include well-defined transaction/span fields + // (i.e. breakdown metrics, transaction and span metrics) will + // be stored separately from application and runtime metrics. + dataset = "apm.internal." + } + dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name) + fields[datastreams.TypeField] = datastreams.MetricsType + fields[datastreams.DatasetField] = dataset + } return []beat.Event{{ Fields: fields, diff --git a/model/metricset_test.go b/model/metricset_test.go index 29d5e658de4..33f55abe7a6 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -214,7 +214,7 @@ func TestTransform(t *testing.T) { } for idx, test := range tests { - outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{}) + outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{DataStreams: true}) for j, outputEvent := range outputEvents { assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) diff --git a/model/profile.go b/model/profile.go index 566938453c2..e60ae24e6a5 100644 --- a/model/profile.go +++ b/model/profile.go @@ -50,7 +50,7 @@ type PprofProfile struct { } // Transform transforms a Profile into a sequence of beat.Events: one per profile sample. -func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []beat.Event { +func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { // Precompute value field names for use in each event. // TODO(axw) limit to well-known value names? profileTimestamp := time.Unix(0, pp.Profile.TimeNanos) @@ -70,7 +70,10 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea // Profiles are stored in their own "metrics" data stream, with a data // set per service. This enables managing retention of profiling data // per-service, and indepedently of lower volume metrics. - dataset := fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name)) + var dataset string + if cfg.DataStreams { + dataset = fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name)) + } samples := make([]beat.Event, len(pp.Profile.Sample)) for i, sample := range pp.Profile.Sample { @@ -122,12 +125,14 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea event := beat.Event{ Timestamp: profileTimestamp, Fields: common.MapStr{ - datastreams.TypeField: datastreams.MetricsType, - datastreams.DatasetField: dataset, - "processor": profileProcessorEntry, - profileDocType: profileFields, + "processor": profileProcessorEntry, + profileDocType: profileFields, }, } + if cfg.DataStreams { + event.Fields[datastreams.TypeField] = datastreams.MetricsType + event.Fields[datastreams.DatasetField] = dataset + } pp.Metadata.Set(event.Fields) if len(sample.Label) > 0 { labels := make(common.MapStr) diff --git a/model/span.go b/model/span.go index f14c6174b22..7c6ca7230e6 100644 --- a/model/span.go +++ b/model/span.go @@ -191,17 +191,18 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even spanFrameCounter.Add(int64(frames)) } - // Spans are stored in a "traces" data stream along with transactions. - dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) - fields := common.MapStr{ - datastreams.TypeField: datastreams.TracesType, - datastreams.DatasetField: dataset, - "processor": spanProcessorEntry, spanDocType: e.fields(ctx, cfg), } + if cfg.DataStreams { + // Spans are stored in a "traces" data stream along with transactions. + dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) + fields[datastreams.TypeField] = datastreams.TracesType + fields[datastreams.DatasetField] = dataset + } + // first set the generic metadata e.Metadata.Set(fields) diff --git a/model/span_test.go b/model/span_test.go index 2d1f8e26165..3a803ca4bbf 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -177,7 +177,8 @@ func TestSpanTransform(t *testing.T) { for _, test := range tests { output := test.Span.Transform(context.Background(), &transform.Config{ - RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, + DataStreams: true, + RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, }) fields := output[0].Fields assert.Equal(t, test.Output, fields, test.Msg) diff --git a/model/transaction.go b/model/transaction.go index e0c1959243c..c1e57793fd6 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -110,20 +110,21 @@ func (e *Transaction) fields() common.MapStr { return common.MapStr(fields) } -func (e *Transaction) Transform(_ context.Context, _ *transform.Config) []beat.Event { +func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat.Event { transactionTransformations.Inc() - // Transactions are stored in a "traces" data stream along with spans. - dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) - fields := common.MapStr{ - datastreams.TypeField: datastreams.TracesType, - datastreams.DatasetField: dataset, - "processor": transactionProcessorEntry, transactionDocType: e.fields(), } + if cfg.DataStreams { + // Transactions are stored in a "traces" data stream along with spans. + dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name)) + fields[datastreams.TypeField] = datastreams.TracesType + fields[datastreams.DatasetField] = dataset + } + // first set generic metadata (order is relevant) e.Metadata.Set(fields) utility.Set(fields, "source", fields["client"]) diff --git a/model/transaction_test.go b/model/transaction_test.go index 4fd04bacd41..981b57b3bb5 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -178,7 +178,7 @@ func TestEventsTransformWithMetadata(t *testing.T) { Custom: &Custom{"foo": "bar"}, Message: &Message{QueueName: tests.StringPtr("routeUser")}, } - events := txWithContext.Transform(context.Background(), &transform.Config{}) + events := txWithContext.Transform(context.Background(), &transform.Config{DataStreams: true}) require.Len(t, events, 1) assert.Equal(t, events[0].Fields, common.MapStr{ "data_stream.type": "traces", diff --git a/processor/otel/consumer_test.go b/processor/otel/consumer_test.go index d0abf94a4a4..09796037861 100644 --- a/processor/otel/consumer_test.go +++ b/processor/otel/consumer_test.go @@ -504,7 +504,7 @@ func testAttributeStringValue(s string) *tracepb.AttributeValue { func transformAll(ctx context.Context, p publish.PendingReq) []beat.Event { var events []beat.Event for _, transformable := range p.Transformables { - events = append(events, transformable.Transform(ctx, &transform.Config{})...) + events = append(events, transformable.Transform(ctx, &transform.Config{DataStreams: true})...) } return events } diff --git a/processor/stream/processor_test.go b/processor/stream/processor_test.go index 9ca9ac3d5da..16cf380995f 100644 --- a/processor/stream/processor_test.go +++ b/processor/stream/processor_test.go @@ -227,7 +227,7 @@ func makeApproveEventsReporter(t *testing.T, name string) publish.Reporter { return func(ctx context.Context, p publish.PendingReq) error { var events []beat.Event for _, transformable := range p.Transformables { - events = append(events, transformable.Transform(ctx, &transform.Config{})...) + events = append(events, transformable.Transform(ctx, &transform.Config{DataStreams: true})...) } docs := beatertest.EncodeEventDocs(events...) approvaltest.ApproveEventDocs(t, name, docs) diff --git a/publish/pub.go b/publish/pub.go index 5a06f2c064e..b6e918df970 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -88,7 +88,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf processingCfg := beat.ProcessingConfig{ Fields: common.MapStr{ - datastreams.NamespaceField: "default", "observer": common.MapStr{ "type": cfg.Info.Beat, "hostname": cfg.Info.Hostname, @@ -100,6 +99,9 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf }, Processor: cfg.Processor, } + if cfg.TransformConfig.DataStreams { + processingCfg.Fields[datastreams.NamespaceField] = "default" + } if cfg.Pipeline != "" { processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline} } diff --git a/testdata/jaeger/batch_0.approved.json b/testdata/jaeger/batch_0.approved.json index f84030bb217..e30bc33d761 100644 --- a/testdata/jaeger/batch_0.approved.json +++ b/testdata/jaeger/batch_0.approved.json @@ -7,8 +7,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.driver", - "data_stream.type": "traces", "event": { "outcome": "success" }, @@ -62,8 +60,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.error.driver", - "data_stream.type": "logs", "error": { "exception": [ { @@ -114,8 +110,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.error.driver", - "data_stream.type": "logs", "error": { "exception": [ { @@ -166,8 +160,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.error.driver", - "data_stream.type": "logs", "error": { "exception": [ { diff --git a/testdata/jaeger/batch_1.approved.json b/testdata/jaeger/batch_1.approved.json index 5a6e6669a20..336fc199743 100644 --- a/testdata/jaeger/batch_1.approved.json +++ b/testdata/jaeger/batch_1.approved.json @@ -7,8 +7,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -58,8 +56,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -110,8 +106,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -161,8 +155,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -212,8 +204,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -263,8 +253,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -314,8 +302,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -366,8 +352,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -417,8 +401,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -468,8 +450,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -519,8 +499,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -570,8 +548,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -622,8 +598,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -673,8 +647,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.redis", - "data_stream.type": "traces", "event": { "outcome": "unknown" }, @@ -724,8 +696,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.error.redis", - "data_stream.type": "logs", "error": { "exception": [ { @@ -772,8 +742,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.error.redis", - "data_stream.type": "logs", "error": { "exception": [ { @@ -820,8 +788,6 @@ "name": "Jaeger/Go", "version": "2.20.1" }, - "data_stream.dataset": "apm.error.redis", - "data_stream.type": "logs", "error": { "exception": [ { diff --git a/transform/transform.go b/transform/transform.go index d44eea774ec..459bc171985 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -32,6 +32,10 @@ type Transformable interface { // Config holds general transformation configuration. type Config struct { + // DataStreams records whether or not data streams are enabled. + // If true, then data_stream fields should be added to all events. + DataStreams bool + RUM RUMConfig }