Skip to content

Commit

Permalink
handle cancellations
Browse files Browse the repository at this point in the history
  • Loading branch information
prasad-shirodkar committed Apr 17, 2024
1 parent 1cf07b6 commit 326e7f6
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 24 deletions.
7 changes: 5 additions & 2 deletions exporters/stdout/stdoutmetric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (e *exporter) Aggregation(k metric.InstrumentKind) metric.Aggregation {
}

func (e *exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics) error {
if err := ctx.Err(); err != nil {
return err
}
if e.redactTimestamps {
redactTimestamps(data)
}
Expand All @@ -60,12 +63,12 @@ func (e *exporter) Export(ctx context.Context, data *metricdata.ResourceMetrics)
return e.encVal.Load().(encoderHolder).Encode(data)
}

func (e *exporter) ForceFlush(ctx context.Context) error {
func (e *exporter) ForceFlush(context.Context) error {
// exporter holds no state, nothing to flush.
return nil
}

func (e *exporter) Shutdown(ctx context.Context) error {
func (e *exporter) Shutdown(context.Context) error {
e.shutdownOnce.Do(func() {
e.encVal.Store(encoderHolder{
encoder: shutdownEncoder{},
Expand Down
31 changes: 16 additions & 15 deletions exporters/stdout/stdoutmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,17 @@ func testCtxErrHonored(factory func(*testing.T) func(context.Context) error) fun
t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()

f := factory(t)
assert.NoError(t, f(innerCtx))
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})

t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()

f := factory(t)
assert.NoError(t, f(innerCtx))
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})

t.Run("NoError", func(t *testing.T) {
Expand All @@ -56,18 +55,6 @@ func testCtxErrHonored(factory func(*testing.T) func(context.Context) error) fun
}

func TestExporterHonorsContextErrors(t *testing.T) {
t.Run("Shutdown", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
return exp.Shutdown
}))

t.Run("ForceFlush", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
return exp.ForceFlush
}))

t.Run("Export", testCtxErrHonored(func(t *testing.T) func(context.Context) error {
exp, err := stdoutmetric.New(testEncoderOption())
require.NoError(t, err)
Expand All @@ -78,6 +65,20 @@ func TestExporterHonorsContextErrors(t *testing.T) {
}))
}

func TestExporterShutdown(t *testing.T) {
exporter, err := stdoutmetric.New(testEncoderOption())
assert.NoError(t, err)

assert.NoError(t, exporter.Shutdown(context.Background()))
}

func TestExporterForceFlush(t *testing.T) {
exporter, err := stdoutmetric.New(testEncoderOption())
assert.NoError(t, err)

assert.NoError(t, exporter.ForceFlush(context.Background()))
}

func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) {
var (
data = new(metricdata.ResourceMetrics)
Expand Down
3 changes: 3 additions & 0 deletions exporters/stdout/stdouttrace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Exporter struct {

// ExportSpans writes spans in json format to stdout.
func (e *Exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
if err := ctx.Err(); err != nil {
return err
}
e.stoppedMu.RLock()
stopped := e.stopped
e.stoppedMu.RUnlock()
Expand Down
36 changes: 29 additions & 7 deletions exporters/stdout/stdouttrace/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,53 @@ func TestExporterExportSpan(t *testing.T) {
tests := []struct {
opts []stdouttrace.Option
expectNow time.Time
ctx context.Context
wantError error
}{
{
opts: []stdouttrace.Option{stdouttrace.WithPrettyPrint()},
expectNow: now,
ctx: context.Background(),
},
{
opts: []stdouttrace.Option{stdouttrace.WithPrettyPrint(), stdouttrace.WithoutTimestamps()},
// expectNow is an empty time.Time
ctx: context.Background(),
},
{
opts: []stdouttrace.Option{},
ctx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}(),
wantError: context.Canceled,
},
{
opts: []stdouttrace.Option{},
ctx: func() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
t.Cleanup(cancel)
return ctx
}(),
wantError: context.DeadlineExceeded,
},
}

ctx := context.Background()
for _, tt := range tests {
// write to buffer for testing
var b bytes.Buffer
ex, err := stdouttrace.New(append(tt.opts, stdouttrace.WithWriter(&b))...)
require.Nil(t, err)

err = ex.ExportSpans(ctx, tracetest.SpanStubs{ss, ss}.Snapshots())
require.Nil(t, err)
err = ex.ExportSpans(tt.ctx, tracetest.SpanStubs{ss, ss}.Snapshots())
assert.Equal(t, tt.wantError, err)

got := b.String()
wantone := expectedJSON(tt.expectNow)
assert.Equal(t, wantone+wantone, got)
if tt.wantError == nil {
got := b.String()
wantone := expectedJSON(tt.expectNow)
assert.Equal(t, wantone+wantone, got)
}
}
}

Expand Down Expand Up @@ -192,7 +215,6 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) {

innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
defer innerCancel()
<-innerCtx.Done()
err = e.Shutdown(innerCtx)
assert.NoError(t, err)
}
Expand Down

0 comments on commit 326e7f6

Please sign in to comment.