Skip to content

Commit

Permalink
Split large jaeger span batch to admire the udp packet size limit (op…
Browse files Browse the repository at this point in the history
…en-telemetry#1853)

* Split large jaeger span batch to admire the udp packet size

* Refactory EmitBatch and produce complaining error msg when serialization fails

* Add tests for large jaeger spans
Update CHANGELOG.md

* Update CHANGELOG.md

* Fix compatibility-test on windows.

* Add test case for exporting spans with multiple errors.
  • Loading branch information
tianyaqu committed May 4, 2021
1 parent 42a8450 commit c99d5e9
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE`
- `OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE`
- Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821)
- Adds `jaeger.WithMaxPacketSize` option for configuring maximum UDP packet size used when connecting to the Jaeger agent. (#1853)

### Fixed

Expand All @@ -104,6 +105,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Zipkin Exporter: Ensure mapping between OTel and Zipkin span data complies with the specification. (#1688)
- Fixed typo for default service name in Jaeger Exporter. (#1797)
- Fix flaky OTLP for the reconnnection of the client connection. (#1527, #1814)
- Fix Jaeger exporter dropping of span batches that exceed the UDP packet size limit.
Instead, the exporter now splits the batch into smaller sendable batches. (#1828)

### Changed

Expand Down
91 changes: 82 additions & 9 deletions exporters/trace/jaeger/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"log"
"net"
"strings"
"time"

"go.opentelemetry.io/otel/exporters/trace/jaeger/internal/third_party/thrift/lib/go/thrift"
Expand All @@ -36,10 +37,11 @@ type agentClientUDP struct {
genAgent.Agent
io.Closer

connUDP udpConn
client *genAgent.AgentClient
maxPacketSize int // max size of datagram in bytes
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
connUDP udpConn
client *genAgent.AgentClient
maxPacketSize int // max size of datagram in bytes
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
thriftProtocol thrift.TProtocol
}

type udpConn interface {
Expand Down Expand Up @@ -75,6 +77,7 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {

thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
protocolFactory := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{})
thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
client := genAgent.NewAgentClientFactory(thriftBuffer, protocolFactory)

var connUDP udpConn
Expand Down Expand Up @@ -103,15 +106,78 @@ func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
}

return &agentClientUDP{
connUDP: connUDP,
client: client,
maxPacketSize: params.MaxPacketSize,
thriftBuffer: thriftBuffer,
connUDP: connUDP,
client: client,
maxPacketSize: params.MaxPacketSize,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol,
}, nil
}

// EmitBatch implements EmitBatch() of Agent interface
// EmitBatch buffers batch to fit into UDP packets and sends the data to the agent.
func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error {
var errs []error
processSize, err := a.calcSizeOfSerializedThrift(ctx, batch.Process)
if err != nil {
// drop the batch if serialization of process fails.
return err
}
totalSize := processSize
var spans []*gen.Span
for _, span := range batch.Spans {
spanSize, err := a.calcSizeOfSerializedThrift(ctx, span)
if err != nil {
errs = append(errs, fmt.Errorf("thrift serialization failed: %v", span))
continue
}
if spanSize+processSize >= a.maxPacketSize {
// drop the span that exceeds the limit.
errs = append(errs, fmt.Errorf("span too large to send: %v", span))
continue
}
if totalSize+spanSize >= a.maxPacketSize {
if err := a.flush(ctx, &gen.Batch{
Process: batch.Process,
Spans: spans,
}); err != nil {
errs = append(errs, err)
}
spans = spans[:0]
totalSize = processSize
}
totalSize += spanSize
spans = append(spans, span)
}

if len(spans) > 0 {
if err := a.flush(ctx, &gen.Batch{
Process: batch.Process,
Spans: spans,
}); err != nil {
errs = append(errs, err)
}
}

if len(errs) == 1 {
return errs[0]
} else if len(errs) > 1 {
joined := a.makeJoinedErrorString(errs)
return fmt.Errorf("multiple errors during transform: %s", joined)
}
return nil
}

// makeJoinedErrorString join all the errors to one error message.
func (a *agentClientUDP) makeJoinedErrorString(errs []error) string {
var errMsgs []string
for _, err := range errs {
errMsgs = append(errMsgs, err.Error())
}
return strings.Join(errMsgs, ", ")
}

// flush will send the batch of spans to the agent.
func (a *agentClientUDP) flush(ctx context.Context, batch *gen.Batch) error {
a.thriftBuffer.Reset()
if err := a.client.EmitBatch(ctx, batch); err != nil {
return err
Expand All @@ -124,6 +190,13 @@ func (a *agentClientUDP) EmitBatch(ctx context.Context, batch *gen.Batch) error
return err
}

// calcSizeOfSerializedThrift calculate the serialized thrift packet size.
func (a *agentClientUDP) calcSizeOfSerializedThrift(ctx context.Context, thriftStruct thrift.TStruct) (int, error) {
a.thriftBuffer.Reset()
err := thriftStruct.Write(ctx, a.thriftProtocol)
return a.thriftBuffer.Len(), err
}

// Close implements Close() of io.Closer and closes the underlying UDP connection.
func (a *agentClientUDP) Close() error {
return a.connUDP.Close()
Expand Down
75 changes: 75 additions & 0 deletions exporters/trace/jaeger/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
package jaeger

import (
"context"
"log"
"net"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
)

func TestNewAgentClientUDPWithParamsBadHostport(t *testing.T) {
Expand Down Expand Up @@ -99,3 +103,74 @@ func TestNewAgentClientUDPWithParamsReconnectingDisabled(t *testing.T) {

assert.NoError(t, agentClient.Close())
}

type errorHandler struct{ t *testing.T }

func (eh errorHandler) Handle(err error) { assert.NoError(eh.t, err) }

func TestJaegerAgentUDPLimitBatching(t *testing.T) {
otel.SetErrorHandler(errorHandler{t})

// 1500 spans, size 79559, does not fit within one UDP packet with the default size of 65000.
n := 1500
s := make([]*tracesdk.SpanSnapshot, n)
for i := 0; i < n; i++ {
s[i] = &tracesdk.SpanSnapshot{}
}

exp, err := NewRawExporter(
WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831")),
)
require.NoError(t, err)

ctx := context.Background()
assert.NoError(t, exp.ExportSpans(ctx, s))
assert.NoError(t, exp.Shutdown(ctx))
}

// generateALargeSpan generates a span with a long name.
func generateALargeSpan() *tracesdk.SpanSnapshot {
span := &tracesdk.SpanSnapshot{
Name: "a-longer-name-that-makes-it-exceeds-limit",
}
return span
}

func TestSpanExceedsMaxPacketLimit(t *testing.T) {
otel.SetErrorHandler(errorHandler{t})

// 106 is the serialized size of a span with default values.
maxSize := 106
span := generateALargeSpan()

largeSpans := []*tracesdk.SpanSnapshot{span, {}}
normalSpans := []*tracesdk.SpanSnapshot{{}, {}}

exp, err := NewRawExporter(
WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831"), WithMaxPacketSize(maxSize+1)),
)
require.NoError(t, err)

ctx := context.Background()
assert.Error(t, exp.ExportSpans(ctx, largeSpans))
assert.NoError(t, exp.ExportSpans(ctx, normalSpans))
assert.NoError(t, exp.Shutdown(ctx))
}

func TestEmitBatchWithMultipleErrors(t *testing.T) {
otel.SetErrorHandler(errorHandler{t})

span := generateALargeSpan()
largeSpans := []*tracesdk.SpanSnapshot{span, span}
// make max packet size smaller than span
maxSize := len(span.Name)
exp, err := NewRawExporter(
WithAgentEndpoint(WithAgentHost("localhost"), WithAgentPort("6831"), WithMaxPacketSize(maxSize)),
)
require.NoError(t, err)

ctx := context.Background()
err = exp.ExportSpans(ctx, largeSpans)
assert.Error(t, err)
require.Contains(t, err.Error(), "multiple errors")
}
7 changes: 7 additions & 0 deletions exporters/trace/jaeger/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption
}
}

// WithMaxPacketSize sets the maximum UDP packet size for transport to the Jaeger agent.
func WithMaxPacketSize(size int) AgentEndpointOption {
return func(o *AgentEndpointOptions) {
o.MaxPacketSize = size
}
}

// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector. This will
// use the following environment variables for configuration if no explicit option is provided:
//
Expand Down

0 comments on commit c99d5e9

Please sign in to comment.