Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ApplyConfig for Tempo #424

Merged
merged 4 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.7.0
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20210112142934-23c8d7fa6120
github.com/wrouesnel/postgres_exporter v0.0.0-00010101000000-000000000000
go.opencensus.io v0.22.5
Expand All @@ -47,6 +48,7 @@ require (
google.golang.org/grpc v1.34.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

// Needed for Cortex's dependencies to work properly.
Expand Down
84 changes: 72 additions & 12 deletions pkg/tempo/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package tempo
import (
"context"
"fmt"
"sync"
"time"

"github.com/google/go-cmp/cmp"
"github.com/grafana/agent/pkg/build"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats/view"
Expand All @@ -17,6 +19,8 @@ import (

// Instance wraps the OpenTelemetry collector to enable tracing pipelines
type Instance struct {
mut sync.Mutex
cfg InstanceConfig
logger *zap.Logger
metricViews []*view.View

Expand All @@ -36,33 +40,89 @@ func NewInstance(reg prometheus.Registerer, cfg InstanceConfig, logger *zap.Logg
return nil, fmt.Errorf("failed to create metric views: %w", err)
}

if err := instance.ApplyConfig(cfg); err != nil {
return nil, err
}
return instance, nil
}

func (i *Instance) ApplyConfig(cfg InstanceConfig) error {
i.mut.Lock()
defer i.mut.Unlock()

if cmp.Equal(cfg, i.cfg) {
// No config change
return nil
}
i.cfg = cfg

// Shut down any existing pipeline
i.stop()

createCtx := context.Background()
err = instance.buildAndStartPipeline(createCtx, cfg)
err := i.buildAndStartPipeline(createCtx, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create exporter: %w", err)
return fmt.Errorf("failed to create pipeline: %w", err)
}

return instance, nil
return nil
}

// Stop stops the OpenTelemetry collector subsystem
func (i *Instance) Stop() {
i.mut.Lock()
defer i.mut.Unlock()

view.Unregister(i.metricViews...)
}

func (i *Instance) stop() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := i.receivers.ShutdownAll(shutdownCtx); err != nil {
i.logger.Error("failed to shutdown receiver", zap.Error(err))
}

if err := i.pipelines.ShutdownProcessors(shutdownCtx); err != nil {
i.logger.Error("failed to shutdown processors", zap.Error(err))
dependencies := []struct {
name string
shutdown func() error
}{
{
name: "receiver",
shutdown: func() error {
if i.receivers == nil {
return nil
}
return i.receivers.ShutdownAll(shutdownCtx)
},
},
{
name: "processors",
shutdown: func() error {
if i.pipelines == nil {
return nil
}
return i.pipelines.ShutdownProcessors(shutdownCtx)
},
},
{
name: "exporters",
shutdown: func() error {
if i.exporter == nil {
return nil
}
return i.exporter.ShutdownAll(shutdownCtx)
},
},
}

if err := i.receivers.ShutdownAll(shutdownCtx); err != nil {
i.logger.Error("failed to shutdown receivers", zap.Error(err))
for _, dep := range dependencies {
i.logger.Info(fmt.Sprintf("shutting down %s", dep.name))
if err := dep.shutdown(); err != nil {
i.logger.Error(fmt.Sprintf("failed to shutdown %s", dep.name), zap.Error(err))
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joe-elliott heads up: you were shutting down receivers twice, but I think you meant to shutdown the exporter here instead.


view.Unregister(i.metricViews...)
i.receivers = nil
i.pipelines = nil
i.exporter = nil
}

func (i *Instance) buildAndStartPipeline(ctx context.Context, cfg InstanceConfig) error {
Expand Down
230 changes: 230 additions & 0 deletions pkg/tempo/internal/tempoutils/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package tempoutils

import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/grafana/agent/pkg/util"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/service/builder"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)

type Server struct {
receivers builder.Receivers
pipelines builder.BuiltPipelines
exporters builder.Exporters
}

// NewTestServer creates a new Server for testing, where received traces will
// call the callback function. The returned string is the address where traces
// can be sent using OTLP.
func NewTestServer(t *testing.T, callback func(pdata.Traces)) string {
t.Helper()

srv, listenAddr, err := NewServerWithRandomPort(callback)
if err != nil {
t.Fatalf("failed to create OTLP server: %s", err)
}
t.Cleanup(func() {
err := srv.Stop()
assert.NoError(t, err)
})

return listenAddr
}

// NewServerWithRandomAddress calls NewServer with a random port >49152 and
// <65535. It will try up to five times before failing.
func NewServerWithRandomPort(callback func(pdata.Traces)) (srv *Server, addr string, err error) {
var lastError error

for i := 0; i < 5; i++ {
port := rand.Intn(65535-49152) + 49152
listenAddr := fmt.Sprintf("127.0.0.1:%d", port)

srv, err = NewServer(listenAddr, callback)
if err != nil {
lastError = err
continue
}

return srv, listenAddr, nil
}

return nil, "", fmt.Errorf("failed 5 times to create a server. last error: %w", lastError)
}

// NewServer creates an OTLP-accepting server that calls a function when a
// trace is received. This is primarily useful for testing.
func NewServer(addr string, callback func(pdata.Traces)) (*Server, error) {
conf := util.Untab(fmt.Sprintf(`
processors:
func_processor:
receivers:
otlp:
protocols:
grpc:
endpoint: %s
service:
pipelines:
traces:
receivers: [otlp]
processors: [func_processor]
exporters: []
`, addr))

var cfg map[string]interface{}
if err := yaml.NewDecoder(strings.NewReader(conf)).Decode(&cfg); err != nil {
panic("could not decode config: " + err.Error())
}

v := viper.New()
if err := v.MergeConfigMap(cfg); err != nil {
return nil, fmt.Errorf("failed to merge in mapstructure config: %w", err)
}

extensionsFactory, err := component.MakeExtensionFactoryMap()
if err != nil {
return nil, fmt.Errorf("failed to make extension factory map: %w", err)
}

receiversFactory, err := component.MakeReceiverFactoryMap(otlpreceiver.NewFactory())
if err != nil {
return nil, fmt.Errorf("failed to make receiver factory map: %w", err)
}

exportersFactory, err := component.MakeExporterFactoryMap()
if err != nil {
return nil, fmt.Errorf("failed to make exporter factory map: %w", err)
}

processorsFactory, err := component.MakeProcessorFactoryMap(
newFuncProcessorFactory(callback),
)
if err != nil {
return nil, fmt.Errorf("failed to make processor factory map: %w", err)
}

factories := component.Factories{
Extensions: extensionsFactory,
Receivers: receiversFactory,
Processors: processorsFactory,
Exporters: exportersFactory,
}
otelCfg, err := config.Load(v, factories)
if err != nil {
return nil, fmt.Errorf("failed to make otel config: %w", err)
}

var (
logger = zap.NewNop()
startInfo component.ApplicationStartInfo
)

exporters, err := builder.NewExportersBuilder(logger, startInfo, otelCfg, factories.Exporters).Build()
if err != nil {
return nil, fmt.Errorf("failed to build exporters: %w", err)
}
if err := exporters.StartAll(context.Background(), nil); err != nil {
return nil, fmt.Errorf("failed to start exporters: %w", err)
}

pipelines, err := builder.NewPipelinesBuilder(logger, startInfo, otelCfg, exporters, factories.Processors).Build()
if err != nil {
return nil, fmt.Errorf("failed to build pipelines: %w", err)
}
if err := pipelines.StartProcessors(context.Background(), nil); err != nil {
return nil, fmt.Errorf("failed to start pipelines: %w", err)
}

receivers, err := builder.NewReceiversBuilder(logger, startInfo, otelCfg, pipelines, factories.Receivers).Build()
if err != nil {
return nil, fmt.Errorf("failed to build receivers: %w", err)
}
if err := receivers.StartAll(context.Background(), nil); err != nil {
return nil, fmt.Errorf("failed to start recievers: %w", err)
}

return &Server{
receivers: receivers,
pipelines: pipelines,
exporters: exporters,
}, nil
}

func (s *Server) Stop() error {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

var firstErr error

deps := []func(context.Context) error{
s.receivers.ShutdownAll,
s.pipelines.ShutdownProcessors,
s.exporters.ShutdownAll,
}
for _, dep := range deps {
err := dep(shutdownCtx)
if err != nil && firstErr == nil {
firstErr = err
}
}

return firstErr
}

func newFuncProcessorFactory(callback func(pdata.Traces)) component.ProcessorFactory {
return processorhelper.NewFactory(
"func_processor",
func() configmodels.Processor {
return &configmodels.ProcessorSettings{
TypeVal: "func_processor",
NameVal: "func_processor",
}
},
processorhelper.WithTraces(func(
_ context.Context,
_ component.ProcessorCreateParams,
_ configmodels.Processor,
next consumer.TracesConsumer,
) (component.TracesProcessor, error) {
return &funcProcessor{
Callback: callback,
Next: next,
}, nil
}),
)
}

type funcProcessor struct {
Callback func(pdata.Traces)
Next consumer.TracesConsumer
}

func (p *funcProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
if p.Callback != nil {
p.Callback(td)
}
return p.Next.ConsumeTraces(ctx, td)
}

func (p *funcProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: true}
}

func (p *funcProcessor) Start(context.Context, component.Host) error { return nil }
func (p *funcProcessor) Shutdown(context.Context) error { return nil }
Loading