Skip to content

Commit

Permalink
systemtest: add tests for aggregation and sampling
Browse files Browse the repository at this point in the history
Migrate aggregation and sampling system tests to Go.

As part of this we introduce "approval" test support.
The implementation is a little different to the one
in the Python system tests: the Python ones ignore
various dynamic fields when computing the diff; we
replace dynamic fields before comparing, so the
approved diffs don't change in unrelated ways every
time there is an expected change, such as a new field.
  • Loading branch information
axw committed Aug 19, 2020
1 parent 4bfd1be commit 8c14462
Show file tree
Hide file tree
Showing 13 changed files with 651 additions and 308 deletions.
109 changes: 109 additions & 0 deletions systemtest/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest_test

import (
"context"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
)

func TestTransactionAggregation(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Aggregation = &apmservertest.TransactionAggregationConfig{
Enabled: true,
Interval: time.Second,
}
srv.Config.Sampling = &apmservertest.SamplingConfig{
// Drop unsampled transaction events, to show
// that we aggregate before they are dropped.
KeepUnsampled: false,
}
err := srv.Start()
require.NoError(t, err)

// Send some transactions to the server to be aggregated.
//
// Mimic a RUM transaction by using the "page-load" transaction type,
// which causes user-agent to be parsed and included in the aggregation
// and added to the document fields.
tracer := srv.Tracer()
const chromeUserAgent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36"
for _, transactionType := range []string{"backend", "page-load"} {
tx := tracer.StartTransaction("name", transactionType)
req, _ := http.NewRequest("GET", "/", nil)
req.Header.Set("User-Agent", chromeUserAgent)
tx.Context.SetHTTPRequest(req)
tx.Duration = time.Second
tx.End()
}
tracer.Flush(nil)

var result estest.SearchResult
_, err = systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.BoolQuery{
Filter: []interface{}{
estest.ExistsQuery{Field: "transaction.duration.histogram"},
},
}).Do(context.Background(), &result,
estest.WithCondition(result.Hits.NonEmptyCondition()),
)
require.NoError(t, err)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits)
}

func TestTransactionAggregationShutdown(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Aggregation = &apmservertest.TransactionAggregationConfig{
Enabled: true,
// Set aggregation_interval to something that would cause
// a timeout if we were to wait that long. The server
// should flush metrics on shutdown without waiting for
// the configured interval.
Interval: time.Minute,
}
err := srv.Start()
require.NoError(t, err)

// Send a transaction to the server to be aggregated.
tracer := srv.Tracer()
tracer.StartTransaction("name", "type").End()
tracer.Flush(nil)

// Stop server to ensure metrics are flushed on shutdown.
assert.NoError(t, srv.Close())

var result estest.SearchResult
_, err = systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.BoolQuery{
Filter: []interface{}{
estest.ExistsQuery{Field: "transaction.duration.histogram"},
},
}).Do(context.Background(), &result,
estest.WithCondition(result.Hits.NonEmptyCondition()),
)
require.NoError(t, err)
}
32 changes: 29 additions & 3 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ const (

// Config holds APM Server configuration.
type Config struct {
SecretToken string `json:"apm-server.secret_token,omitempty"`
Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"`
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
SecretToken string `json:"apm-server.secret_token,omitempty"`
Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"`
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
Aggregation *TransactionAggregationConfig `json:"apm-server.aggregation,omitempty"`
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`

// Instrumentation holds configuration for libbeat and apm-server instrumentation.
Instrumentation *InstrumentationConfig `json:"instrumentation,omitempty"`
Expand Down Expand Up @@ -99,6 +101,11 @@ type JaegerConfig struct {
HTTPHost string `json:"http.host,omitempty"`
}

// SamplingConfig holds APM Server trace sampling configuration.
type SamplingConfig struct {
KeepUnsampled bool `json:"keep_unsampled"`
}

// InstrumentationConfig holds APM Server instrumentation configuration.
type InstrumentationConfig struct {
Enabled bool `json:"enabled"`
Expand Down Expand Up @@ -180,6 +187,25 @@ func (m *MonitoringConfig) MarshalJSON() ([]byte, error) {
})
}

// TransactionAggregationConfig holds APM Server transaction metrics aggregation configuration.
type TransactionAggregationConfig struct {
Enabled bool
Interval time.Duration
}

func (m *TransactionAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Interval: duration(m.Interval),
})
}

type duration time.Duration

func (d duration) MarshalText() (text []byte, err error) {
Expand Down
116 changes: 116 additions & 0 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmservertest

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"io"

"go.elastic.co/apm/model"
"go.elastic.co/apm/transport"
"go.elastic.co/fastjson"
)

// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.

// EventMetadata holds event metadata.
type EventMetadata struct {
System *model.System
Process *model.Process
Service *model.Service
Labels model.IfaceMap
}

// EventMetadata holds event metadata.
type EventMetadataFilter interface {
FilterEventMetadata(*EventMetadata)
}

type filteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}

// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
}
decoder := json.NewDecoder(zr)

// The first object of any request must be a metadata struct.
var metadataPayload struct {
Metadata EventMetadata `json:"metadata"`
}
if err := decoder.Decode(&metadataPayload); err != nil {
return err
}
t.filter.FilterEventMetadata(&metadataPayload.Metadata)

// Re-encode metadata.
var json fastjson.Writer
json.RawString(`{"metadata":`)
json.RawString(`{"system":`)
metadataPayload.Metadata.System.MarshalFastJSON(&json)
json.RawString(`,"process":`)
metadataPayload.Metadata.Process.MarshalFastJSON(&json)
json.RawString(`,"service":`)
metadataPayload.Metadata.Service.MarshalFastJSON(&json)
if len(metadataPayload.Metadata.Labels) > 0 {
json.RawString(`,"labels":`)
metadataPayload.Metadata.Labels.MarshalFastJSON(&json)
}
json.RawString("}}\n")

// Copy everything to a new zlib-encoded stream and send.
var buf bytes.Buffer
zw := zlib.NewWriter(&buf)
zw.Write(json.Bytes())
if _, err := io.Copy(zw, io.MultiReader(decoder.Buffered(), zr)); err != nil {
return err
}
if err := zw.Close(); err != nil {
return err
}
return t.HTTPTransport.SendStream(ctx, &buf)
}

type defaultMetadataFilter struct{}

func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
m.System.Kubernetes = nil
m.System.Hostname = "beowulf"
m.Process.Pid = 1
m.Process.Ppid = nil
m.Service.Agent.Version = "0.0.0"
m.Service.Language.Version = "2.0"
m.Service.Runtime.Version = "2.0"
m.Service.Node = nil
m.Service.Name = "systemtest"
}
28 changes: 22 additions & 6 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ type Server struct {
// JaegerHTTPURL holds the base URL for Jaeger HTTP, if enabled.
JaegerHTTPURL string

// EventMetadataFilter holds an optional EventMetadataFilter, which
// can modify event metadata before it is sent to the server.
//
// New(Unstarted)Server sets a default filter which removes or
// replaces environment-specific properties such as host name,
// container ID, etc., to enable repeatable tests across different
// test environments.
EventMetadataFilter EventMetadataFilter

tb testing.TB
args []string
cmd *ServerCmd
Expand All @@ -96,9 +105,10 @@ func NewServer(tb testing.TB, args ...string) *Server {
// apm-server command.
func NewUnstartedServer(tb testing.TB, args ...string) *Server {
return &Server{
Config: DefaultConfig(),
tb: tb,
args: args,
Config: DefaultConfig(),
EventMetadataFilter: defaultMetadataFilter{},
tb: tb,
args: args,
}
}

Expand Down Expand Up @@ -371,9 +381,15 @@ func (s *Server) Tracer() *apm.Tracer {
}
httpTransport.SetServerURL(serverURL)
httpTransport.SetSecretToken(s.Config.SecretToken)
tracer, err := apm.NewTracerOptions(apm.TracerOptions{
Transport: httpTransport,
})

var transport transport.Transport = httpTransport
if s.EventMetadataFilter != nil {
transport = &filteringTransport{
HTTPTransport: httpTransport,
filter: s.EventMetadataFilter,
}
}
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
if err != nil {
s.tb.Fatal(err)
}
Expand Down
Loading

0 comments on commit 8c14462

Please sign in to comment.