Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add otel tracing support #182

Merged
merged 24 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -307,6 +308,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

logger := a.config.Logger
var span trace.Span
if a.tracingEnabled() {
tx := a.config.Tracer.StartTransaction("docappender.flush", "output")
tx.Context.SetLabel("documents", n)
Expand All @@ -316,6 +318,19 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(apmzap.TraceContext(ctx)...)
} else if a.otelTracingEnabled() {
// NOTE: this is missing transaction type information. How is this conveyed in otel?
ctx, span = a.config.OtelTracer.Start(ctx, "docappender.flush", trace.WithAttributes(
attribute.Int("documents", n),
))
defer span.End()

// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(
zap.String("traceId", span.SpanContext().TraceID().String()),
zap.String("spanId", span.SpanContext().SpanID().String()),
)
Comment on lines +339 to +342
Copy link
Member Author

@endorama endorama Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: A otel zap bridge exists at https://github.com/uptrace/opentelemetry-go-extra/tree/main/otelzap
This implementation is not compatible with a standard zap logger though, as it wraps the logger in a custom otelzap.Logger type. Passing the context also requires to use an additional .Ctx(ctx) call that is not supported by zap.Logger.
Due to this I chose not to use it an instead provide correlation information as standard otel attributes. I expect this to retain the same behaviour that we get with logger.With(apmzap.TraceContext(ctx)...) on line 320.

}

var flushCtx context.Context
Expand Down Expand Up @@ -346,6 +361,8 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
logger.Error("bulk indexing request failed", zap.Error(err))
if a.tracingEnabled() {
apm.CaptureError(ctx, err).Send()
} else if a.otelTracingEnabled() {
span.RecordError(err)
}

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
Expand Down Expand Up @@ -397,6 +414,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
failedCount[info]++
if a.tracingEnabled() {
apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send()
} else if a.otelTracingEnabled() {
// trace.SpanFromContext(ctx).RecordError(errors.New(info.Error.Reason))
span.RecordError(errors.New(info.Error.Reason))
endorama marked this conversation as resolved.
Show resolved Hide resolved
}
}
for key, count := range failedCount {
Expand Down Expand Up @@ -682,9 +702,17 @@ func (a *Appender) indexFailureRate() float64 {

// tracingEnabled checks whether we should be doing tracing
func (a *Appender) tracingEnabled() bool {
// FIXME: remove
fmt.Println(a.config.Tracer != nil && a.config.Tracer.Recording())
return a.config.Tracer != nil && a.config.Tracer.Recording()
}

func (a *Appender) otelTracingEnabled() bool {
// FIXME: remove
fmt.Println(a.config.OtelTracer != nil)
return a.config.OtelTracer != nil
}

// activeLimit returns the value of GOMAXPROCS * cfg.ActiveRatio. Which limits
// the maximum number of active indexers to a % of GOMAXPROCS.
//
Expand Down
47 changes: 47 additions & 0 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -1647,6 +1649,51 @@ func TestAppenderTracing(t *testing.T) {
testAppenderTracing(t, 400, "failure")
}

func TestAppenderOtelTracing(t *testing.T) {
endorama marked this conversation as resolved.
Show resolved Hide resolved
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
_, result := docappendertest.DecodeBulkRequest(r)
json.NewEncoder(w).Encode(result)
})

core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))

exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exp),
)
defer tp.Shutdown(context.Background())

tr := tp.Tracer("test")
indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
Logger: zap.New(core),
// NOTE: Tracer must be nil to use otel tracing
Tracer: nil,
OtelTracer: tr,
})
require.NoError(t, err)

const N = 100
for i := 0; i < N; i++ {
addMinimalDoc(t, indexer, "logs-foo-testing")
}

// Closing the indexer flushes enqueued documents.
require.NoError(t, indexer.Close(context.Background()))

require.NoError(t, tp.Shutdown(context.Background()))
err = exp.Shutdown(context.Background())
require.NoError(t, err)

spans := exp.GetSpans()
fmt.Println(len(spans))
assert.NotEmpty(t, spans)

correlatedLogs := observed.FilterFieldKey("transaction.id").All()
assert.NotEmpty(t, correlatedLogs)
}

func testAppenderTracing(t *testing.T, statusCode int, expectedOutcome string) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(statusCode)
Expand Down
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand All @@ -43,6 +44,8 @@ type Config struct {
// If Tracer is nil, requests will not be traced.
Tracer *apm.Tracer

OtelTracer trace.Tracer
endorama marked this conversation as resolved.
Show resolved Hide resolved

// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
Expand Down
Loading