Skip to content

Commit

Permalink
Add initial support for indexing to data streams (elastic#4409)
Browse files Browse the repository at this point in the history
* idxmgmt: add support for data streams

Introduce `apm-server.data_streams.enabled` config,
which will be used by idxmgmt to route events to
data streams based on data_stream.* fields that are
expected to be in each published event.

* _meta/fields.common.yml: add data_stream.* fields

* model: add data_stream.{type,dataset} fields

When transforming model objects into beat.Events,
set the data_stream.{type,dataset} fields. We add
data_stream.namespace elsewhere, using an event
processor.

* beater: handle apm-server.data_streams.enabled

Handle the new apm-server.data_streams.enabled
config:
 - when enabled, we add data_stream.namespace to
   all published events
 - when disabled, we remove data_stream.* fields
   from all published events

For now we just set data_stream.namespace to "default".
Later this will be based on the config received from Fleet.

* processor/stream/package_tests: update tests

There is a hack in here to inject data_stream.namespace
in all published events, since the tests do not use the
standard libbeat pipeline code.

* Update approvals

* Add changelog entry

* datastreams: address review comments

* changelogs: clarify feature is experimental

* _meta: specify allowed values for data_stream.type

* model: update datasets

Use a common "apm." prefix, and place the service name last.
  • Loading branch information
axw committed Dec 10, 2020
1 parent 3fa3800 commit 5b48118
Show file tree
Hide file tree
Showing 68 changed files with 693 additions and 131 deletions.
15 changes: 15 additions & 0 deletions _meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
description: >
Fields common to various APM events.
fields:
- name: data_stream.type
type: keyword
description: "Data stream type: logs, metrics, or traces."
example: traces

- name: data_stream.dataset
type: keyword
description: Data stream dataset name.
example: backend_service

- name: data_stream.namespace
type: keyword
description: User-defined data stream namespace.
example: production

- name: processor.name
type: keyword
description: Processor name.
Expand Down
27 changes: 24 additions & 3 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
Expand Down Expand Up @@ -117,7 +119,6 @@ type beater struct {
// Run runs the APM Server, blocking until the beater's Stop method is called,
// or a fatal error occurs.
func (bt *beater) Run(b *beat.Beat) error {

done := make(chan struct{})

var reloadOnce sync.Once
Expand All @@ -127,6 +128,7 @@ func (bt *beater) Run(b *beat.Beat) error {
// during startup. This might change when APM Server is included in Fleet
reloadOnce.Do(func() {
defer close(done)
// TODO(axw) config received from Fleet should be modified to set data_streams.enabled.
var cfg *config.Config
cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b))
if err != nil {
Expand Down Expand Up @@ -365,11 +367,30 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis
if err != nil {
return nil, err
}
return publish.NewPublisher(b.Publisher, tracer, &publish.PublisherConfig{
publisherConfig := &publish.PublisherConfig{
Info: b.Info,
Pipeline: cfg.Pipeline,
TransformConfig: transformConfig,
})
}
if !cfg.DataStreams.Enabled {
// Remove data_stream.* fields during publishing when data streams are disabled.
processors, err := processors.New(processors.PluginConfig{common.MustNewConfigFrom(
map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": []interface{}{
datastreams.TypeField,
datastreams.DatasetField,
datastreams.NamespaceField,
},
},
},
)})
if err != nil {
return nil, err
}
publisherConfig.Processor = processors
}
return publish.NewPublisher(b.Publisher, tracer, publisherConfig)
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
Expand Down
2 changes: 2 additions & 0 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
JaegerConfig JaegerConfig `config:"jaeger"`
Aggregation AggregationConfig `config:"aggregation"`
Sampling SamplingConfig `config:"sampling"`
DataStreams DataStreamsConfig `config:"data_streams"`

Pipeline string
}
Expand Down Expand Up @@ -189,5 +190,6 @@ func DefaultConfig() *Config {
JaegerConfig: defaultJaeger(),
Aggregation: defaultAggregationConfig(),
Sampling: defaultSamplingConfig(),
DataStreams: defaultDataStreamsConfig(),
}
}
27 changes: 27 additions & 0 deletions beater/config/data_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

// DataStreamsConfig holds data streams configuration.
type DataStreamsConfig struct {
Enabled bool `config:"enabled"`
}

func defaultDataStreamsConfig() DataStreamsConfig {
return DataStreamsConfig{Enabled: false}
}
9 changes: 7 additions & 2 deletions beater/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ func (h *httpServer) start() error {
h.logger.Infof("Connection limit set to: %d", h.cfg.MaxConnections)
}

// Create the "onboarding" document, which contains the server's listening address.
notifyListening(context.Background(), addr, h.reporter)
if !h.cfg.DataStreams.Enabled {
// Create the "onboarding" document, which contains the server's
// listening address. We only do this if data streams are not enabled,
// as onboarding documents are incompatible with data streams.
// Onboarding documents should be replaced by Fleet status later.
notifyListening(context.Background(), addr, h.reporter)
}

if h.TLSConfig != nil {
h.logger.Info("SSL enabled.")
Expand Down
3 changes: 3 additions & 0 deletions beater/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
var apmRegistry = monitoring.GetNamespace("state").GetRegistry().NewRegistry("apm-server")

type configTelemetry struct {
dataStreamsEnabled *monitoring.Bool
rumEnabled *monitoring.Bool
apiKeysEnabled *monitoring.Bool
kibanaEnabled *monitoring.Bool
Expand All @@ -49,6 +50,7 @@ type configTelemetry struct {
}

var configMonitors = &configTelemetry{
dataStreamsEnabled: monitoring.NewBool(apmRegistry, "data_streams.enabled"),
rumEnabled: monitoring.NewBool(apmRegistry, "rum.enabled"),
apiKeysEnabled: monitoring.NewBool(apmRegistry, "api_key.enabled"),
kibanaEnabled: monitoring.NewBool(apmRegistry, "kibana.enabled"),
Expand All @@ -73,6 +75,7 @@ func recordConfigs(info beat.Info, apmCfg *config.Config, rootCfg *common.Config
if err != nil {
return err
}
configMonitors.dataStreamsEnabled.Set(apmCfg.DataStreams.Enabled)
configMonitors.rumEnabled.Set(apmCfg.RumConfig.IsEnabled())
configMonitors.apiKeysEnabled.Set(apmCfg.APIKeyConfig.IsEnabled())
configMonitors.kibanaEnabled.Set(apmCfg.Kibana.Enabled)
Expand Down
37 changes: 37 additions & 0 deletions datastreams/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package datastreams

// Constants for data stream types.
const (
LogsType = "logs"
MetricsType = "metrics"
TracesType = "traces"
)

// Cosntants for data stream event metadata fields.
const (
TypeField = "data_stream.type"
DatasetField = "data_stream.dataset"
NamespaceField = "data_stream.namespace"
)

// IndexFormat holds the variable "index" format to use for the libbeat Elasticsearch output.
// Each event the server publishes is expected to contain data_stream.* fields, which will be
// added to the documents as well as be used for routing documents to the correct data stream.
const IndexFormat = "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}"
45 changes: 45 additions & 0 deletions datastreams/servicename.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package datastreams

import "strings"

// NormalizeServiceName translates serviceName into a string suitable
// for inclusion in a data stream name.
//
// Concretely, this function will lowercase the string and replace any
// reserved characters with "_".
func NormalizeServiceName(s string) string {
s = strings.ToLower(s)
s = strings.Map(replaceReservedRune, s)
return s
}

func replaceReservedRune(r rune) rune {
switch r {
case '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':':
// These characters are not permitted in data stream names
// by Elasticsearch.
return '_'
case '-':
// Hyphens are used to separate the data stream type, dataset,
// and namespace.
return '_'
}
return r
}
35 changes: 35 additions & 0 deletions datastreams/servicename_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package datastreams_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/apm-server/datastreams"
)

func TestNormalizeServiceName(t *testing.T) {
testNormalizeServiceName := func(expected, input string) {
t.Helper()
assert.Equal(t, expected, datastreams.NormalizeServiceName(input))
}
testNormalizeServiceName("upper_case", "UPPER-CASE")
testNormalizeServiceName("____________", "\\/*?\"<>| ,#:")
}
33 changes: 33 additions & 0 deletions docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,39 @@ Fields common to various APM events.
*`data_stream.type`*::
+
--
Data stream type: logs, metrics, or traces.
type: keyword
example: traces
--
*`data_stream.dataset`*::
+
--
Data stream dataset name.
type: keyword
example: backend_service
--
*`data_stream.namespace`*::
+
--
User-defined data stream namespace.
type: keyword
example: production
--
*`processor.name`*::
+
--
Expand Down
Loading

0 comments on commit 5b48118

Please sign in to comment.