Skip to content

Commit

Permalink
systemtest: add tests for aggregation and sampling (#4084) (#4171)
Browse files Browse the repository at this point in the history
* systemtest: add tests for aggregation and sampling

Migrate aggregation and sampling system tests to Go.

As part of this we introduce "approval" test support.
The implementation is a little different to the one
in the Python system tests: the Python ones ignore
various dynamic fields when computing the diff; we
replace dynamic fields before comparing, so the
approved diffs don't change in unrelated ways every
time there is an expected change, such as a new field.

* systemtest: update aggregation config

* Check result in TestTransactionAggregationShutdown

* systemtest: fix logging test

If we don't consume the response, the server
may (non-determinstically) log an error.

* Update approved files
  • Loading branch information
axw authored Sep 8, 2020
1 parent f42cabb commit 236108e
Show file tree
Hide file tree
Showing 12 changed files with 544 additions and 311 deletions.
116 changes: 116 additions & 0 deletions systemtest/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest_test

import (
"context"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
)

func TestTransactionAggregation(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Aggregation = &apmservertest.AggregationConfig{
Transactions: &apmservertest.TransactionAggregationConfig{
Enabled: true,
Interval: time.Second,
},
}
srv.Config.Sampling = &apmservertest.SamplingConfig{
// Drop unsampled transaction events, to show
// that we aggregate before they are dropped.
KeepUnsampled: false,
}
err := srv.Start()
require.NoError(t, err)

// Send some transactions to the server to be aggregated.
//
// Mimic a RUM transaction by using the "page-load" transaction type,
// which causes user-agent to be parsed and included in the aggregation
// and added to the document fields.
tracer := srv.Tracer()
const chromeUserAgent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36"
for _, transactionType := range []string{"backend", "page-load"} {
tx := tracer.StartTransaction("name", transactionType)
req, _ := http.NewRequest("GET", "/", nil)
req.Header.Set("User-Agent", chromeUserAgent)
tx.Context.SetHTTPRequest(req)
tx.Duration = time.Second
tx.End()
}
tracer.Flush(nil)

var result estest.SearchResult
_, err = systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.BoolQuery{
Filter: []interface{}{
estest.ExistsQuery{Field: "transaction.duration.histogram"},
},
}).Do(context.Background(), &result,
estest.WithCondition(result.Hits.NonEmptyCondition()),
)
require.NoError(t, err)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp")
}

func TestTransactionAggregationShutdown(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Aggregation = &apmservertest.AggregationConfig{
Transactions: &apmservertest.TransactionAggregationConfig{
Enabled: true,
// Set aggregation_interval to something that would cause
// a timeout if we were to wait that long. The server
// should flush metrics on shutdown without waiting for
// the configured interval.
Interval: time.Minute,
},
}
err := srv.Start()
require.NoError(t, err)

// Send a transaction to the server to be aggregated.
tracer := srv.Tracer()
tx := tracer.StartTransaction("name", "type")
tx.Duration = time.Second
tx.End()
tracer.Flush(nil)

// Stop server to ensure metrics are flushed on shutdown.
assert.NoError(t, srv.Close())

var result estest.SearchResult
_, err = systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.BoolQuery{
Filter: []interface{}{
estest.ExistsQuery{Field: "transaction.duration.histogram"},
},
}).Do(context.Background(), &result,
estest.WithCondition(result.Hits.NonEmptyCondition()),
)
require.NoError(t, err)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp")
}
37 changes: 34 additions & 3 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ const (

// Config holds APM Server configuration.
type Config struct {
SecretToken string `json:"apm-server.secret_token,omitempty"`
Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"`
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
SecretToken string `json:"apm-server.secret_token,omitempty"`
Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"`
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"`
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`

// Instrumentation holds configuration for libbeat and apm-server instrumentation.
Instrumentation *InstrumentationConfig `json:"instrumentation,omitempty"`
Expand Down Expand Up @@ -99,6 +101,11 @@ type JaegerConfig struct {
HTTPHost string `json:"http.host,omitempty"`
}

// SamplingConfig holds APM Server trace sampling configuration.
type SamplingConfig struct {
KeepUnsampled bool `json:"keep_unsampled"`
}

// InstrumentationConfig holds APM Server instrumentation configuration.
type InstrumentationConfig struct {
Enabled bool `json:"enabled"`
Expand Down Expand Up @@ -180,6 +187,30 @@ func (m *MonitoringConfig) MarshalJSON() ([]byte, error) {
})
}

// AggregationConfig holds APM Server metrics aggregation configuration.
type AggregationConfig struct {
Transactions *TransactionAggregationConfig `json:"transactions,omitempty"`
}

// TransactionAggregationConfig holds APM Server transaction metrics aggregation configuration.
type TransactionAggregationConfig struct {
Enabled bool
Interval time.Duration
}

func (m *TransactionAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Interval: duration(m.Interval),
})
}

type duration time.Duration

func (d duration) MarshalText() (text []byte, err error) {
Expand Down
116 changes: 116 additions & 0 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmservertest

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"io"

"go.elastic.co/apm/model"
"go.elastic.co/apm/transport"
"go.elastic.co/fastjson"
)

// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.

// EventMetadata holds event metadata.
type EventMetadata struct {
System *model.System
Process *model.Process
Service *model.Service
Labels model.IfaceMap
}

// EventMetadata holds event metadata.
type EventMetadataFilter interface {
FilterEventMetadata(*EventMetadata)
}

type filteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}

// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
}
decoder := json.NewDecoder(zr)

// The first object of any request must be a metadata struct.
var metadataPayload struct {
Metadata EventMetadata `json:"metadata"`
}
if err := decoder.Decode(&metadataPayload); err != nil {
return err
}
t.filter.FilterEventMetadata(&metadataPayload.Metadata)

// Re-encode metadata.
var json fastjson.Writer
json.RawString(`{"metadata":`)
json.RawString(`{"system":`)
metadataPayload.Metadata.System.MarshalFastJSON(&json)
json.RawString(`,"process":`)
metadataPayload.Metadata.Process.MarshalFastJSON(&json)
json.RawString(`,"service":`)
metadataPayload.Metadata.Service.MarshalFastJSON(&json)
if len(metadataPayload.Metadata.Labels) > 0 {
json.RawString(`,"labels":`)
metadataPayload.Metadata.Labels.MarshalFastJSON(&json)
}
json.RawString("}}\n")

// Copy everything to a new zlib-encoded stream and send.
var buf bytes.Buffer
zw := zlib.NewWriter(&buf)
zw.Write(json.Bytes())
if _, err := io.Copy(zw, io.MultiReader(decoder.Buffered(), zr)); err != nil {
return err
}
if err := zw.Close(); err != nil {
return err
}
return t.HTTPTransport.SendStream(ctx, &buf)
}

type defaultMetadataFilter struct{}

func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
m.System.Kubernetes = nil
m.System.Hostname = "beowulf"
m.Process.Pid = 1
m.Process.Ppid = nil
m.Service.Agent.Version = "0.0.0"
m.Service.Language.Version = "2.0"
m.Service.Runtime.Version = "2.0"
m.Service.Node = nil
m.Service.Name = "systemtest"
}
28 changes: 22 additions & 6 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ type Server struct {
// JaegerHTTPURL holds the base URL for Jaeger HTTP, if enabled.
JaegerHTTPURL string

// EventMetadataFilter holds an optional EventMetadataFilter, which
// can modify event metadata before it is sent to the server.
//
// New(Unstarted)Server sets a default filter which removes or
// replaces environment-specific properties such as host name,
// container ID, etc., to enable repeatable tests across different
// test environments.