Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

systemtest: add tests for aggregation and sampling #4084

Merged
merged 6 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions systemtest/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package systemtest_test

import (
"context"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
)

func TestTransactionAggregation(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Aggregation = &apmservertest.AggregationConfig{
Transactions: &apmservertest.TransactionAggregationConfig{
Enabled: true,
Interval: time.Second,
},
}
srv.Config.Sampling = &apmservertest.SamplingConfig{
// Drop unsampled transaction events, to show
// that we aggregate before they are dropped.
KeepUnsampled: false,
}
err := srv.Start()
require.NoError(t, err)

// Send some transactions to the server to be aggregated.
//
// Mimic a RUM transaction by using the "page-load" transaction type,
// which causes user-agent to be parsed and included in the aggregation
// and added to the document fields.
tracer := srv.Tracer()
const chromeUserAgent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36"
for _, transactionType := range []string{"backend", "page-load"} {
tx := tracer.StartTransaction("name", transactionType)
req, _ := http.NewRequest("GET", "/", nil)
req.Header.Set("User-Agent", chromeUserAgent)
tx.Context.SetHTTPRequest(req)
tx.Duration = time.Second
tx.End()
}
tracer.Flush(nil)

var result estest.SearchResult
_, err = systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.BoolQuery{
Filter: []interface{}{
estest.ExistsQuery{Field: "transaction.duration.histogram"},
},
}).Do(context.Background(), &result,
estest.WithCondition(result.Hits.NonEmptyCondition()),
)
require.NoError(t, err)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp")
}

func TestTransactionAggregationShutdown(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
srv.Config.Aggregation = &apmservertest.AggregationConfig{
Transactions: &apmservertest.TransactionAggregationConfig{
Enabled: true,
// Set aggregation_interval to something that would cause
// a timeout if we were to wait that long. The server
// should flush metrics on shutdown without waiting for
// the configured interval.
Interval: time.Minute,
},
}
err := srv.Start()
require.NoError(t, err)

// Send a transaction to the server to be aggregated.
tracer := srv.Tracer()
tx := tracer.StartTransaction("name", "type")
tx.Duration = time.Second
tx.End()
tracer.Flush(nil)

// Stop server to ensure metrics are flushed on shutdown.
assert.NoError(t, srv.Close())

var result estest.SearchResult
_, err = systemtest.Elasticsearch.Search("apm-*").WithQuery(estest.BoolQuery{
Filter: []interface{}{
estest.ExistsQuery{Field: "transaction.duration.histogram"},
},
}).Do(context.Background(), &result,
estest.WithCondition(result.Hits.NonEmptyCondition()),
)
require.NoError(t, err)
axw marked this conversation as resolved.
Show resolved Hide resolved
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp")
}
37 changes: 34 additions & 3 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ const (

// Config holds APM Server configuration.
type Config struct {
SecretToken string `json:"apm-server.secret_token,omitempty"`
Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"`
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
SecretToken string `json:"apm-server.secret_token,omitempty"`
Jaeger *JaegerConfig `json:"apm-server.jaeger,omitempty"`
Kibana *KibanaConfig `json:"apm-server.kibana,omitempty"`
Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"`
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`

// Instrumentation holds configuration for libbeat and apm-server instrumentation.
Instrumentation *InstrumentationConfig `json:"instrumentation,omitempty"`
Expand Down Expand Up @@ -99,6 +101,11 @@ type JaegerConfig struct {
HTTPHost string `json:"http.host,omitempty"`
}

// SamplingConfig holds APM Server trace sampling configuration.
type SamplingConfig struct {
KeepUnsampled bool `json:"keep_unsampled"`
}

// InstrumentationConfig holds APM Server instrumentation configuration.
type InstrumentationConfig struct {
Enabled bool `json:"enabled"`
Expand Down Expand Up @@ -180,6 +187,30 @@ func (m *MonitoringConfig) MarshalJSON() ([]byte, error) {
})
}

// AggregationConfig holds APM Server metrics aggregation configuration.
type AggregationConfig struct {
Transactions *TransactionAggregationConfig `json:"transactions,omitempty"`
}

// TransactionAggregationConfig holds APM Server transaction metrics aggregation configuration.
type TransactionAggregationConfig struct {
Enabled bool
Interval time.Duration
}

func (m *TransactionAggregationConfig) MarshalJSON() ([]byte, error) {
// time.Duration is encoded as int64.
// Convert time.Durations to durations, to encode as duration strings.
type config struct {
Enabled bool `json:"enabled"`
Interval duration `json:"interval,omitempty"`
}
return json.Marshal(config{
Enabled: m.Enabled,
Interval: duration(m.Interval),
})
}

type duration time.Duration

func (d duration) MarshalText() (text []byte, err error) {
Expand Down
116 changes: 116 additions & 0 deletions systemtest/apmservertest/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmservertest

import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"io"

"go.elastic.co/apm/model"
"go.elastic.co/apm/transport"
"go.elastic.co/fastjson"
)

// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.

// EventMetadata holds event metadata.
type EventMetadata struct {
System *model.System
Process *model.Process
Service *model.Service
Labels model.IfaceMap
}

// EventMetadata holds event metadata.
type EventMetadataFilter interface {
simitt marked this conversation as resolved.
Show resolved Hide resolved
FilterEventMetadata(*EventMetadata)
}

type filteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}

// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
}
decoder := json.NewDecoder(zr)

// The first object of any request must be a metadata struct.
var metadataPayload struct {
Metadata EventMetadata `json:"metadata"`
}
if err := decoder.Decode(&metadataPayload); err != nil {
return err
}
t.filter.FilterEventMetadata(&metadataPayload.Metadata)

// Re-encode metadata.
var json fastjson.Writer
json.RawString(`{"metadata":`)
json.RawString(`{"system":`)
metadataPayload.Metadata.System.MarshalFastJSON(&json)
json.RawString(`,"process":`)
metadataPayload.Metadata.Process.MarshalFastJSON(&json)
json.RawString(`,"service":`)
metadataPayload.Metadata.Service.MarshalFastJSON(&json)
if len(metadataPayload.Metadata.Labels) > 0 {
json.RawString(`,"labels":`)
metadataPayload.Metadata.Labels.MarshalFastJSON(&json)
}
json.RawString("}}\n")

// Copy everything to a new zlib-encoded stream and send.
var buf bytes.Buffer
zw := zlib.NewWriter(&buf)
zw.Write(json.Bytes())
if _, err := io.Copy(zw, io.MultiReader(decoder.Buffered(), zr)); err != nil {
return err
}
if err := zw.Close(); err != nil {
return err
}
return t.HTTPTransport.SendStream(ctx, &buf)
}

type defaultMetadataFilter struct{}

func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
m.System.Kubernetes = nil
m.System.Hostname = "beowulf"
m.Process.Pid = 1
m.Process.Ppid = nil
m.Service.Agent.Version = "0.0.0"
m.Service.Language.Version = "2.0"
m.Service.Runtime.Version = "2.0"
m.Service.Node = nil
m.Service.Name = "systemtest"
}
28 changes: 22 additions & 6 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ type Server struct {
// JaegerHTTPURL holds the base URL for Jaeger HTTP, if enabled.
JaegerHTTPURL string

// EventMetadataFilter holds an optional EventMetadataFilter, which
// can modify event metadata before it is sent to the server.
//
// New(Unstarted)Server sets a default filter which removes or
// replaces environment-specific properties such as host name,
// container ID, etc., to enable repeatable tests across different
// test environments.
EventMetadataFilter EventMetadataFilter

tb testing.TB
args []string
cmd *ServerCmd
Expand All @@ -96,9 +105,10 @@ func NewServer(tb testing.TB, args ...string) *Server {
// apm-server command.
func NewUnstartedServer(tb testing.TB, args ...string) *Server {
return &Server{
Config: DefaultConfig(),
tb: tb,
args: args,
Config: DefaultConfig(),
EventMetadataFilter: defaultMetadataFilter{},
tb: tb,
args: args,
}
}

Expand Down Expand Up @@ -371,9 +381,15 @@ func (s *Server) Tracer() *apm.Tracer {
}
httpTransport.SetServerURL(serverURL)
httpTransport.SetSecretToken(s.Config.SecretToken)
tracer, err := apm.NewTracerOptions(apm.TracerOptions{
Transport: httpTransport,
})

var transport transport.Transport = httpTransport
if s.EventMetadataFilter != nil {
transport = &filteringTransport{
HTTPTransport: httpTransport,
filter: s.EventMetadataFilter,
}
}
tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport})
if err != nil {
s.tb.Fatal(err)
}
Expand Down
Loading