diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index 98fc3c0a239..21aafcd98db 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -204,6 +204,16 @@ func (c *client) exportContext(parent context.Context) (context.Context, context return ctx, cancel } +type noopClient struct{} + +func newNoopClient() *noopClient { + return &noopClient{} +} + +func (c *noopClient) UploadLogs(context.Context, []*logpb.ResourceLogs) error { return nil } + +func (c *noopClient) Shutdown(context.Context) error { return nil } + // retryable returns if err identifies a request that can be retried and a // duration to wait for if an explicit throttle time is included in err. func retryable(err error) (bool, time.Duration) { diff --git a/exporters/otlp/otlplog/otlploggrpc/client_test.go b/exporters/otlp/otlplog/otlploggrpc/client_test.go index 1fa69b94fa4..004a6f4131d 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/client_test.go @@ -450,113 +450,114 @@ func (c *grpcCollector) Collect() *storage { return c.storage } -func TestClient(t *testing.T) { - factory := func(rCh <-chan exportResult) (*client, *grpcCollector) { - coll, err := newGRPCCollector("", rCh) - require.NoError(t, err) - - addr := coll.listener.Addr().String() - opts := []Option{WithEndpoint(addr), WithInsecure()} - cfg := newConfig(opts) - client, err := newClient(cfg) - require.NoError(t, err) - return client, coll +func clientFactory(t *testing.T, rCh <-chan exportResult) (*client, *grpcCollector) { + t.Helper() + coll, err := newGRPCCollector("", rCh) + require.NoError(t, err) + + addr := coll.listener.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + cfg := newConfig(opts) + client, err := newClient(cfg) + require.NoError(t, err) + return client, coll +} + +func testCtxErrs(factory func() func(context.Context) error) func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + t.Run("DeadlineExceeded", func(t *testing.T) { + innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) + t.Cleanup(innerCancel) + <-innerCtx.Done() + + f := factory() + assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded) + }) + + t.Run("Canceled", func(t *testing.T) { + innerCtx, innerCancel := context.WithCancel(ctx) + innerCancel() + + f := factory() + assert.ErrorIs(t, f(innerCtx), context.Canceled) + }) } +} +func TestClient(t *testing.T) { t.Run("ClientHonorsContextErrors", func(t *testing.T) { - testCtxErrs := func(factory func() func(context.Context) error) func(t *testing.T) { - return func(t *testing.T) { - t.Helper() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - t.Run("DeadlineExceeded", func(t *testing.T) { - innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) - t.Cleanup(innerCancel) - <-innerCtx.Done() - - f := factory() - assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded) - }) - - t.Run("Canceled", func(t *testing.T) { - innerCtx, innerCancel := context.WithCancel(ctx) - innerCancel() - - f := factory() - assert.ErrorIs(t, f(innerCtx), context.Canceled) - }) - } - } - t.Run("Shutdown", testCtxErrs(func() func(context.Context) error { - c, _ := factory(nil) + c, _ := clientFactory(t, nil) return c.Shutdown })) t.Run("UploadLog", testCtxErrs(func() func(context.Context) error { - c, _ := factory(nil) + c, _ := clientFactory(t, nil) return func(ctx context.Context) error { return c.UploadLogs(ctx, nil) } })) + }) - t.Run("UploadLogs", func(t *testing.T) { - ctx := context.Background() - client, coll := factory(nil) - - require.NoError(t, client.UploadLogs(ctx, resourceLogs)) - require.NoError(t, client.Shutdown(ctx)) - got := coll.Collect().Dump() - require.Len(t, got, 1, "upload of one ResourceLogs") - diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) - if diff != "" { - t.Fatalf("unexpected ResourceLogs:\n%s", diff) - } - }) + t.Run("UploadLogs", func(t *testing.T) { + ctx := context.Background() + client, coll := clientFactory(t, nil) + + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.Shutdown(ctx)) + got := coll.Collect().Dump() + require.Len(t, got, 1, "upload of one ResourceLogs") + diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal)) + if diff != "" { + t.Fatalf("unexpected ResourceLogs:\n%s", diff) + } + }) - t.Run("PartialSuccess", func(t *testing.T) { - const n, msg = 2, "bad data" - rCh := make(chan exportResult, 3) - rCh <- exportResult{ - Response: &collogpb.ExportLogsServiceResponse{ - PartialSuccess: &collogpb.ExportLogsPartialSuccess{ - RejectedLogRecords: n, - ErrorMessage: msg, - }, + t.Run("PartialSuccess", func(t *testing.T) { + const n, msg = 2, "bad data" + rCh := make(chan exportResult, 3) + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + RejectedLogRecords: n, + ErrorMessage: msg, }, - } - rCh <- exportResult{ - Response: &collogpb.ExportLogsServiceResponse{ - PartialSuccess: &collogpb.ExportLogsPartialSuccess{ - // Should not be logged. - RejectedLogRecords: 0, - ErrorMessage: "", - }, + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{ + PartialSuccess: &collogpb.ExportLogsPartialSuccess{ + // Should not be logged. + RejectedLogRecords: 0, + ErrorMessage: "", }, - } - rCh <- exportResult{ - Response: &collogpb.ExportLogsServiceResponse{}, - } + }, + } + rCh <- exportResult{ + Response: &collogpb.ExportLogsServiceResponse{}, + } - ctx := context.Background() - client, _ := factory(rCh) + ctx := context.Background() + client, _ := clientFactory(t, rCh) - defer func(orig otel.ErrorHandler) { - otel.SetErrorHandler(orig) - }(otel.GetErrorHandler()) + defer func(orig otel.ErrorHandler) { + otel.SetErrorHandler(orig) + }(otel.GetErrorHandler()) - var errs []error - eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) - otel.SetErrorHandler(eh) + var errs []error + eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) }) + otel.SetErrorHandler(eh) - require.NoError(t, client.UploadLogs(ctx, resourceLogs)) - require.NoError(t, client.UploadLogs(ctx, resourceLogs)) - require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) + require.NoError(t, client.UploadLogs(ctx, resourceLogs)) - require.Equal(t, 1, len(errs)) - want := fmt.Sprintf("%s (%d log records rejected)", msg, n) - assert.ErrorContains(t, errs[0], want) - }) + require.Equal(t, 1, len(errs)) + want := fmt.Sprintf("%s (%d log records rejected)", msg, n) + assert.ErrorContains(t, errs[0], want) }) } diff --git a/exporters/otlp/otlplog/otlploggrpc/exporter.go b/exporters/otlp/otlplog/otlploggrpc/exporter.go index dc4e0e75d74..9eaac5deea9 100644 --- a/exporters/otlp/otlplog/otlploggrpc/exporter.go +++ b/exporters/otlp/otlplog/otlploggrpc/exporter.go @@ -5,14 +5,27 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "context" + "sync" + "sync/atomic" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform" "go.opentelemetry.io/otel/sdk/log" + logpb "go.opentelemetry.io/proto/otlp/logs/v1" ) +type logClient interface { + UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error + Shutdown(context.Context) error +} + // Exporter is a OpenTelemetry log Exporter. It transports log data encoded as // OTLP protobufs using gRPC. type Exporter struct { - // TODO: implement. + // Ensure synchronous access to the client across all functionality. + clientMu sync.Mutex + client logClient + + stopped atomic.Bool } // Compile-time check Exporter implements [log.Exporter]. @@ -25,29 +38,52 @@ func New(_ context.Context, options ...Option) (*Exporter, error) { if err != nil { return nil, err } - return newExporter(c, cfg) + return newExporter(c), nil } -func newExporter(*client, config) (*Exporter, error) { - // TODO: implement - return &Exporter{}, nil +func newExporter(c logClient) *Exporter { + var e Exporter + e.client = c + return &e } +var transformResourceLogs = transform.ResourceLogs + // Export transforms and transmits log records to an OTLP receiver. +// +// This method returns nil and drops records if called after Shutdown. +// This method returns an error if the method is canceled by the passed context. func (e *Exporter) Export(ctx context.Context, records []log.Record) error { - // TODO: implement. - return nil + if e.stopped.Load() { + return nil + } + + otlp := transformResourceLogs(records) + if otlp == nil { + return nil + } + + e.clientMu.Lock() + defer e.clientMu.Unlock() + return e.client.UploadLogs(ctx, otlp) } // Shutdown shuts down the Exporter. Calls to Export or ForceFlush will perform // no operation after this is called. func (e *Exporter) Shutdown(ctx context.Context) error { - // TODO: implement. - return nil + if e.stopped.Swap(true) { + return nil + } + + e.clientMu.Lock() + defer e.clientMu.Unlock() + + err := e.client.Shutdown(ctx) + e.client = newNoopClient() + return err } // ForceFlush does nothing. The Exporter holds no state. func (e *Exporter) ForceFlush(ctx context.Context) error { - // TODO: implement. return nil } diff --git a/exporters/otlp/otlplog/otlploggrpc/exporter_test.go b/exporters/otlp/otlplog/otlploggrpc/exporter_test.go new file mode 100644 index 00000000000..3ab9479e44d --- /dev/null +++ b/exporters/otlp/otlplog/otlploggrpc/exporter_test.go @@ -0,0 +1,151 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlploggrpc + +import ( + "context" + "errors" + "runtime" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/log" + sdklog "go.opentelemetry.io/otel/sdk/log" + logpb "go.opentelemetry.io/proto/otlp/logs/v1" +) + +var records []sdklog.Record + +func init() { + var r sdklog.Record + r.SetTimestamp(ts) + r.SetBody(log.StringValue("A")) + records = append(records, r) + + r.SetBody(log.StringValue("B")) + records = append(records, r) +} + +type mockClient struct { + err error + + uploads int +} + +func (m *mockClient) UploadLogs(context.Context, []*logpb.ResourceLogs) error { + m.uploads++ + return m.err +} + +func (m *mockClient) Shutdown(context.Context) error { + return m.err +} + +func TestExporterExport(t *testing.T) { + errClient := errors.New("client") + + testCases := []struct { + name string + logs []sdklog.Record + err error + + wantLogs []sdklog.Record + wantErr error + }{ + { + name: "NoError", + logs: make([]sdklog.Record, 2), + wantLogs: make([]sdklog.Record, 2), + }, + { + name: "Error", + logs: make([]sdklog.Record, 2), + err: errClient, + wantErr: errClient, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + orig := transformResourceLogs + var got []sdklog.Record + transformResourceLogs = func(r []sdklog.Record) []*logpb.ResourceLogs { + got = r + return make([]*logpb.ResourceLogs, len(r)) + } + t.Cleanup(func() { transformResourceLogs = orig }) + + mockCli := mockClient{err: tc.err} + + e := newExporter(&mockCli) + + err := e.Export(context.Background(), tc.logs) + assert.Equal(t, tc.wantErr, err) + assert.Equal(t, tc.logs, got) + assert.Equal(t, 1, mockCli.uploads) + }) + } +} + +func TestExporterShutdown(t *testing.T) { + ctx := context.Background() + e, err := New(ctx) + require.NoError(t, err, "New") + assert.NoError(t, e.Shutdown(ctx), "Shutdown Exporter") + + // After Shutdown is called, calls to Export, Shutdown, or ForceFlush + // should perform no operation and return nil error. + r := make([]sdklog.Record, 1) + assert.NoError(t, e.Export(ctx, r), "Export on Shutdown Exporter") + assert.NoError(t, e.ForceFlush(ctx), "ForceFlush on Shutdown Exporter") + assert.NoError(t, e.Shutdown(ctx), "Shutdown on Shutdown Exporter") +} + +func TestExporterForceFlush(t *testing.T) { + ctx := context.Background() + e, err := New(ctx) + require.NoError(t, err, "New") + + assert.NoError(t, e.ForceFlush(ctx), "ForceFlush") +} + +func TestExporterConcurrentSafe(t *testing.T) { + e := newExporter(&mockClient{}) + + const goroutines = 10 + + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + runs := new(uint64) + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + r := make([]sdklog.Record, 1) + for { + select { + case <-ctx.Done(): + return + default: + _ = e.Export(ctx, r) + _ = e.ForceFlush(ctx) + atomic.AddUint64(runs, 1) + } + } + }() + } + + for atomic.LoadUint64(runs) == 0 { + runtime.Gosched() + } + + _ = e.Shutdown(ctx) + cancel() + wg.Wait() +}