From 19a131b7640a6eba325327654d5a05a385a97f67 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Fri, 12 Jul 2024 13:13:56 -0700 Subject: [PATCH] Add integration tests for otlploggrpc exporter (#5614) Part of https://github.com/open-telemetry/opentelemetry-go/issues/5056 --- .../otlp/otlplog/otlploggrpc/exporter_test.go | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/exporters/otlp/otlplog/otlploggrpc/exporter_test.go b/exporters/otlp/otlplog/otlploggrpc/exporter_test.go index 3ab9479e44d..6153ce58316 100644 --- a/exporters/otlp/otlplog/otlploggrpc/exporter_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/exporter_test.go @@ -6,6 +6,7 @@ package otlploggrpc import ( "context" "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -14,8 +15,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/log" sdklog "go.opentelemetry.io/otel/sdk/log" + collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" logpb "go.opentelemetry.io/proto/otlp/logs/v1" ) @@ -149,3 +152,84 @@ func TestExporterConcurrentSafe(t *testing.T) { cancel() wg.Wait() } + +// TestExporter runs integration test against the real OTLP collector. +func TestExporter(t *testing.T) { + t.Run("ExporterHonorsContextErrors", func(t *testing.T) { + t.Run("Export", testCtxErrs(func() func(context.Context) error { + c, _ := clientFactory(t, nil) + e := newExporter(c) + return func(ctx context.Context) error { + return e.Export(ctx, []sdklog.Record{{}}) + } + })) + + t.Run("Shutdown", testCtxErrs(func() func(context.Context) error { + c, _ := clientFactory(t, nil) + e := newExporter(c) + return e.Shutdown + })) + }) + + t.Run("Export", func(t *testing.T) { + ctx := context.Background() + c, coll := clientFactory(t, nil) + e := newExporter(c) + + require.NoError(t, e.Export(ctx, records)) + require.NoError(t, e.Shutdown(ctx)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + require.Len(t, got[0].ScopeLogs, 1, "upload of one ScopeLogs") + require.Len(t, got[0].ScopeLogs[0].LogRecords, 2, "upload of two ScopeLogs") + + // Check body + assert.Equal(t, "A", got[0].ScopeLogs[0].LogRecords[0].Body.GetStringValue()) + assert.Equal(t, "B", got[0].ScopeLogs[0].LogRecords[1].Body.GetStringValue()) + }) + + t.Run("PartialSuccess", func(t *testing.T) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 3) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + // Should not be logged. + RejectedLogRecords: 0, + ErrorMessage: "", + }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{}, + } + + ctx := context.Background() + c, _ := clientFactory(t, rCh) + e := newExporter(c) + + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) + + var errs []error + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) + + require.NoError(t, e.Export(ctx, records)) + require.NoError(t, e.Export(ctx, records)) + require.NoError(t, e.Export(ctx, records)) + + require.Equal(t, 1, len(errs)) + want := fmt.Sprintf("%s (%d log records rejected)", msg, n) + assert.ErrorContains(t, errs[0], want) + }) +}