Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add otel tracing support #182

Merged
merged 24 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"go.elastic.co/apm/module/apmzap/v2"
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -85,6 +87,10 @@ type Appender struct {
metrics metrics
mu sync.Mutex
closed chan struct{}

// tracer is an OTel tracer, and should not be confused with `a.config.Tracer`
// which is an Elastic APM Tracer.
tracer trace.Tracer
carsonip marked this conversation as resolved.
Show resolved Hide resolved
endorama marked this conversation as resolved.
Show resolved Hide resolved
}

// New returns a new Appender that indexes documents into Elasticsearch.
Expand Down Expand Up @@ -183,6 +189,11 @@ func New(client esapi.Transport, cfg Config) (*Appender, error) {
indexer.runActiveIndexer()
return nil
})

if cfg.Tracer == nil && cfg.TracerProvider != nil {
indexer.tracer = cfg.TracerProvider.Tracer("github.com/elastic/go-docappender.appender")
}

return indexer, nil
}

Expand Down Expand Up @@ -307,6 +318,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
defer a.addCount(1, &a.bulkRequests, a.metrics.bulkRequests)

logger := a.config.Logger
var span trace.Span
if a.tracingEnabled() {
tx := a.config.Tracer.StartTransaction("docappender.flush", "output")
tx.Context.SetLabel("documents", n)
Expand All @@ -316,6 +328,18 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(apmzap.TraceContext(ctx)...)
} else if a.otelTracingEnabled() {
ctx, span = a.tracer.Start(ctx, "docappender.flush", trace.WithAttributes(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: compared to APM this is missing "span type" information, but to my understanding this is delegated to an eventual otel<>Elastic APM bridge so is not a concern here.

attribute.Int("documents", n),
))
defer span.End()

// Add trace IDs to logger, to associate any per-item errors
// below with the trace.
logger = logger.With(
zap.String("traceId", span.SpanContext().TraceID().String()),
zap.String("spanId", span.SpanContext().SpanID().String()),
)
Comment on lines +339 to +342
Copy link
Member Author

@endorama endorama Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: A otel zap bridge exists at https://github.com/uptrace/opentelemetry-go-extra/tree/main/otelzap
This implementation is not compatible with a standard zap logger though, as it wraps the logger in a custom otelzap.Logger type. Passing the context also requires to use an additional .Ctx(ctx) call that is not supported by zap.Logger.
Due to this I chose not to use it an instead provide correlation information as standard otel attributes. I expect this to retain the same behaviour that we get with logger.With(apmzap.TraceContext(ctx)...) on line 320.

}

var flushCtx context.Context
Expand Down Expand Up @@ -346,6 +370,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
logger.Error("bulk indexing request failed", zap.Error(err))
if a.tracingEnabled() {
apm.CaptureError(ctx, err).Send()
} else if a.otelTracingEnabled() && span.IsRecording() {
span.RecordError(err)
span.SetStatus(codes.Error, "bulk indexing request failed")
}

if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
Expand Down Expand Up @@ -406,6 +433,10 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
failedCount[info]++
if a.tracingEnabled() {
apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send()
} else if a.otelTracingEnabled() && span.IsRecording() {
e := errors.New(info.Error.Reason)
span.RecordError(e)
span.SetStatus(codes.Error, e.Error())
}
}
for key, count := range failedCount {
Expand Down Expand Up @@ -450,6 +481,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
zap.Int64("docs_failed", docsFailed),
zap.Int64("docs_rate_limited", tooManyRequests),
)
if a.otelTracingEnabled() && span.IsRecording() {
span.SetStatus(codes.Ok, "")
}
return nil
}

Expand Down Expand Up @@ -687,6 +721,12 @@ func (a *Appender) tracingEnabled() bool {
return a.config.Tracer != nil && a.config.Tracer.Recording()
}

// otelTracingEnabled checks whether we should be doing tracing
// using otel tracer.
func (a *Appender) otelTracingEnabled() bool {
return a.tracer != nil
}

// activeLimit returns the value of GOMAXPROCS * cfg.ActiveRatio. Which limits
// the maximum number of active indexers to a % of GOMAXPROCS.
//
Expand Down
74 changes: 74 additions & 0 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ import (
"go.elastic.co/apm/v2/apmtest"
"go.elastic.co/apm/v2/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -1723,3 +1726,74 @@ func newJSONReader(v any) *bytes.Reader {
}
return bytes.NewReader(data)
}

func TestAppenderOtelTracing(t *testing.T) {
t.Run("success", func(t *testing.T) {
testTracedAppend(t, 200, sdktrace.Status{
Code: codes.Ok,
Description: "",
})
})
t.Run("failure", func(t *testing.T) {
testTracedAppend(t, 400, sdktrace.Status{
Code: codes.Error,
Description: "bulk indexing request failed",
})
})
}

func testTracedAppend(t *testing.T, responseCode int, status sdktrace.Status) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(responseCode)
_, result := docappendertest.DecodeBulkRequest(r)
json.NewEncoder(w).Encode(result)
})

core, observed := observer.New(zap.NewAtomicLevelAt(zapcore.DebugLevel))

exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exp),
)
defer tp.Shutdown(context.Background())

indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
Logger: zap.New(core),
// NOTE: Tracer must be nil to use otel tracing
Tracer: nil,
TracerProvider: tp,
})
require.NoError(t, err)
defer indexer.Close(context.Background())

const N = 100
for i := 0; i < N; i++ {
addMinimalDoc(t, indexer, "logs-foo-testing")
}

// Closing the indexer flushes enqueued documents.
_ = indexer.Close(context.Background())

spans := exp.GetSpans()
assert.NotEmpty(t, spans)

gotSpan := spans[0]
assert.Equal(t, "docappender.flush", gotSpan.Name)
assert.Equal(t, status, gotSpan.Status)

for _, a := range gotSpan.Attributes {
if a.Key == "documents" {
assert.Equal(t, int64(N), a.Value.AsInt64())
}
}

correlatedLogs := observed.FilterFieldKey("traceId").All()
assert.NotEmpty(t, correlatedLogs)

log := correlatedLogs[0]
expectedTraceID := gotSpan.SpanContext.TraceID().String()
assert.Equal(t, expectedTraceID, log.ContextMap()["traceId"])
expectedSpanID := gotSpan.SpanContext.SpanID().String()
assert.Equal(t, expectedSpanID, log.ContextMap()["spanId"])
}
15 changes: 14 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.elastic.co/apm/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

Expand All @@ -40,9 +41,21 @@ type Config struct {
// Tracer holds an optional apm.Tracer to use for tracing bulk requests
// to Elasticsearch. Each bulk request is traced as a transaction.
//
// If Tracer is nil, requests will not be traced.
// If Tracer is nil, requests will not be traced. Note however that
// OtelTracerProvider may not be nil, in which case the request will
// be traced by a different tracer.
Comment on lines +44 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to add a deprecation notice to this, WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, done in a5fa1ad (#182)

//
// Deprecated: Tracer is replaced by TracerProvider in a shift towards
// OpenTelemetry. Please use TracerProvider.
Tracer *apm.Tracer

// TracerProvider holds an optional otel TracerProvider for tracing
// flush requests.
//
// If TracerProvider is nil, requests will not be traced.
// To use this provider Tracer must be nil.
TracerProvider trace.TracerProvider

// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ require (
go.elastic.co/fastjson v1.3.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/metric v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/otel/sdk/metric v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
)
Expand All @@ -34,8 +36,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
go.elastic.co/apm/module/apmhttp/v2 v2.6.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/sys v0.20.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down