Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Add distributed tracing framework #1211

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ require (
github.com/fsnotify/fsnotify v1.4.9
github.com/go-logr/logr v0.2.1
github.com/go-logr/zapr v0.2.0
github.com/google/go-cmp v0.5.2 // indirect
github.com/googleapis/gnostic v0.5.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/onsi/ginkgo v1.14.1
github.com/onsi/gomega v1.10.2
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/otel v0.13.0
go.opentelemetry.io/otel/exporters/otlp v0.13.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.13.0
go.opentelemetry.io/otel/sdk v0.13.0
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.15.0
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
gomodules.xyz/jsonpatch/v2 v2.1.0
google.golang.org/appengine v1.6.6 // indirect
k8s.io/api v0.19.2
k8s.io/apiextensions-apiserver v0.19.2
k8s.io/apimachinery v0.19.2
Expand Down
187 changes: 187 additions & 0 deletions go.sum

Large diffs are not rendered by default.

84 changes: 84 additions & 0 deletions pkg/tracing/annotation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package tracing

import (
"context"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

// TraceAnnotationPrefix is where we store span contexts in Kubernetes annotations
const TraceAnnotationPrefix string = "trace.kubernetes.io/"

// Store tracing propagation inside Kubernetes annotations
type annotationsCarrier map[string]string

// Get implements otel.TextMapCarrier
func (a annotationsCarrier) Get(key string) string {
return a[TraceAnnotationPrefix+key]
}

// Set implements otel.TextMapCarrier
func (a annotationsCarrier) Set(key string, value string) {
a[TraceAnnotationPrefix+key] = value
}

// SpanFromAnnotations takes a map as found in Kubernetes objects and
// makes a new Span parented on the context found there, or nil if not found.
func SpanFromAnnotations(ctx context.Context, name string, annotations map[string]string) (context.Context, trace.Span) {
innerCtx := spanContextFromAnnotations(ctx, annotations)
if innerCtx == ctx {
return ctx, nil
}
return global.Tracer(libName).Start(innerCtx, name)
}

func spanContextFromAnnotations(ctx context.Context, annotations map[string]string) context.Context {
return global.TextMapPropagator().Extract(ctx, annotationsCarrier(annotations))
}

// AddTraceAnnotation adds an annotation encoding current span ID
func AddTraceAnnotation(ctx context.Context, annotations map[string]string) {
global.TextMapPropagator().Inject(ctx, annotationsCarrier(annotations))
}

// AddTraceAnnotationToUnstructured adds an annotation encoding current span ID to all objects
// Objects are modified in-place.
func AddTraceAnnotationToUnstructured(ctx context.Context, objs []unstructured.Unstructured) error {
for _, o := range objs {
a := o.GetAnnotations()
if a == nil {
a = make(map[string]string)
}
AddTraceAnnotation(ctx, a)
o.SetAnnotations(a)
}

return nil
}

// AddTraceAnnotationToObject - if there is a span for the current context, and
// the object doesn't already have one set, adds it as an annotation
func AddTraceAnnotationToObject(ctx context.Context, obj runtime.Object) error {
m, err := meta.Accessor(obj)
if err != nil {
return err
}
annotations := m.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
} else {
// Check if the object already has some context set.
for _, key := range global.TextMapPropagator().Fields() {
if annotationsCarrier(annotations).Get(key) != "" {
return nil // Don't override
}
}
}
AddTraceAnnotation(ctx, annotations)
m.SetAnnotations(annotations)
return nil
}
45 changes: 45 additions & 0 deletions pkg/tracing/annotation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package tracing

import (
"context"
"log"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/trace"
corev1 "k8s.io/api/core/v1"
)

func TestInjectExtract(t *testing.T) {
tracingCloser, err := SetupOTLP("some-controller")
if err != nil {
log.Fatalf("failed to set up Jaeger: %v", err)
}
defer tracingCloser.Close()

var testNode corev1.Node
ctx, sp := global.Tracer("test").Start(context.Background(), "foo")

err = AddTraceAnnotationToObject(ctx, &testNode)
assert.NoError(t, err)
{
ctx := spanContextFromAnnotations(context.Background(), testNode.Annotations)
assert.NoError(t, err)
sc := trace.RemoteSpanContextFromContext(ctx)
assert.Equal(t, sp.SpanContext(), sc)
}

// Check that adding a different span leaves the original in place
ctx, sp2 := global.Tracer("test").Start(ctx, "bar")

err = AddTraceAnnotationToObject(ctx, &testNode)
assert.NoError(t, err)
{
ctx := spanContextFromAnnotations(context.Background(), testNode.Annotations)
assert.NoError(t, err)
sc := trace.RemoteSpanContextFromContext(ctx)
assert.Equal(t, sp.SpanContext(), sc)
assert.NotEqual(t, sp2.SpanContext(), sc)
}
}
39 changes: 39 additions & 0 deletions pkg/tracing/jaeger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package tracing

import (
"io"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
"go.opentelemetry.io/otel/propagators"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// SetupJaeger sets up Jaeger with some defaults
func SetupJaeger(serviceName string) (io.Closer, error) {
// Create and install Jaeger export pipeline
flush, err := jaeger.InstallNewPipeline(
jaeger.WithCollectorEndpoint("http://jaeger-agent.default:14268/api/traces"), // FIXME name?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow to specify the endpoint? (parameter/env variable/other?)

jaeger.WithProcess(jaeger.Process{
ServiceName: serviceName,
}),
jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
return nil, err
}

// set global propagator to tracecontext (the default is no-op).
global.SetTextMapPropagator(propagators.TraceContext{})

return funcCloser{f: flush}, nil
}

type funcCloser struct {
f func()
}

func (c funcCloser) Close() error {
c.f()
return nil
}
56 changes: 56 additions & 0 deletions pkg/tracing/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package tracing

import (
"context"

"github.com/go-logr/logr"
"go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/otel/label"
)

type tracingLogger struct {
logr.Logger
trace.Span
}

func (t tracingLogger) Enabled() bool {
return t.Logger.Enabled()
}

func (t tracingLogger) Info(msg string, keysAndValues ...interface{}) {
t.Logger.Info(msg, keysAndValues...)
t.Span.AddEvent(context.Background(), "info", keyValues(keysAndValues...)...)
}

func (t tracingLogger) Error(err error, msg string, keysAndValues ...interface{}) {
t.Logger.Error(err, msg, keysAndValues...)
kvs := append([]label.KeyValue{label.String("message", msg)}, keyValues(keysAndValues...)...)
t.Span.AddEvent(context.Background(), "error", kvs...)
t.Span.RecordError(context.Background(), err)
}

func (t tracingLogger) V(level int) logr.Logger {
return tracingLogger{Logger: t.Logger.V(level), Span: t.Span}
}

func keyValues(keysAndValues ...interface{}) []label.KeyValue {
attrs := make([]label.KeyValue, 0, len(keysAndValues)/2)
for i := 0; i+1 < len(keysAndValues); i += 2 {
key, ok := keysAndValues[i].(string)
if !ok {
key = "non-string"
}
attrs = append(attrs, label.Any(key, keysAndValues[i+1]))
}
return attrs
}

func (t tracingLogger) WithValues(keysAndValues ...interface{}) logr.Logger {
t.Span.SetAttributes(keyValues(keysAndValues...)...)
return tracingLogger{Logger: t.Logger.WithValues(keysAndValues...), Span: t.Span}
}

func (t tracingLogger) WithName(name string) logr.Logger {
t.Span.SetAttributes(label.String("name", name))
return tracingLogger{Logger: t.Logger.WithName(name), Span: t.Span}
}
46 changes: 46 additions & 0 deletions pkg/tracing/otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package tracing

import (
"context"
"io"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/propagators"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
)

// SetupOTLP sets up a global trace provider sending to OpenTelemetry with some defaults
func SetupOTLP(serviceName string) (io.Closer, error) {
exp, err := otlp.NewExporter(
otlp.WithInsecure(),
otlp.WithAddress("otlp-collector.default:55680"),
)
if err != nil {
return nil, err
}

bsp := sdktrace.NewBatchSpanProcessor(exp)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithResource(resource.New(semconv.ServiceNameKey.String(serviceName))),
sdktrace.WithSpanProcessor(bsp),
)

// set global propagator to tracecontext (the default is no-op).
global.SetTextMapPropagator(propagators.TraceContext{})
global.SetTracerProvider(tracerProvider)

return otlpCloser{exp: exp, bsp: bsp}, nil
}

type otlpCloser struct {
exp *otlp.Exporter
bsp *sdktrace.BatchSpanProcessor
}

func (s otlpCloser) Close() error {
s.bsp.Shutdown() // shutdown the processor
return s.exp.Shutdown(context.Background())
}
Loading