Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otlploggrpc exporter prototype #5522

Closed
wants to merge 15 commits into from
33 changes: 33 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlploggrpc_test

import (
"context"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/log/global"
"go.opentelemetry.io/otel/sdk/log"
)

func Example() {
ctx := context.Background()
exp, err := otlploggrpc.New(ctx)
if err != nil {
panic(err)
}

processor := log.NewBatchProcessor(exp)
provider := log.NewLoggerProvider(log.WithProcessor(processor))
defer func() {
if err := provider.Shutdown(ctx); err != nil {
panic(err)
}
}()

global.SetLoggerProvider(provider)

// From here, the provider can be used by instrumentation to collect
// telemetry.
}
86 changes: 86 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package otlploggrpc
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -14,6 +15,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
sdklog "go.opentelemetry.io/otel/sdk/log"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
"go.opentelemetry.io/otel/log"
sdklog "go.opentelemetry.io/otel/sdk/log"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
Expand Down Expand Up @@ -149,3 +154,84 @@ func TestExporterConcurrentSafe(t *testing.T) {
cancel()
wg.Wait()
}

// TestExporter runs integration test against the real OTLP collector.
func TestExporter(t *testing.T) {
t.Run("ExporterHonorsContextErrors", func(t *testing.T) {
t.Run("Export", testCtxErrs(func() func(context.Context) error {
c, _ := clientFactory(t, nil)
e := newExporter(c)
return func(ctx context.Context) error {
return e.Export(ctx, []sdklog.Record{{}})
}
}))

t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
c, _ := clientFactory(t, nil)
e := newExporter(c)
return e.Shutdown
}))
})

t.Run("Export", func(t *testing.T) {
ctx := context.Background()
c, coll := clientFactory(t, nil)
e := newExporter(c)

require.NoError(t, e.Export(ctx, records))
require.NoError(t, e.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
require.Len(t, got[0].ScopeLogs, 1, "upload of one ScopeLogs")
require.Len(t, got[0].ScopeLogs[0].LogRecords, 2, "upload of two ScopeLogs")

// Check body
assert.Equal(t, "A", got[0].ScopeLogs[0].LogRecords[0].Body.GetStringValue())
assert.Equal(t, "B", got[0].ScopeLogs[0].LogRecords[1].Body.GetStringValue())
})

t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 3)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
// Should not be logged.
RejectedLogRecords: 0,
ErrorMessage: "",
},
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{},
}

ctx := context.Background()
c, _ := clientFactory(t, rCh)
e := newExporter(c)

defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())

var errs []error
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)

require.NoError(t, e.Export(ctx, records))
require.NoError(t, e.Export(ctx, records))
require.NoError(t, e.Export(ctx, records))

require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d log records rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
}
Loading