Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spike] sdk/log: Add Logger.Enabled and EnabledParameters #5816

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `Enabled` method to `Processor` interface in `go.opentelemetry.io/otel/sdk/log` that accepts a newly introduced `EnabledParameters` type. (#5816)

### Changed

- Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778)
- `Logger.Enabled` in `go.opentelemetry.io/otel/log` now accepts a newly introduced `EnabledParameters` type instead of `Record`. (#5791)
- `FilterProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log/internal/x` now accepts `EnabledParameters` instead of `Record`. (#5791)

### Removed

- Remove `go.opentelemetry.io/otel/sdk/log/internal/x` package. Filtering is now part of `Processor` interface in `go.opentelemetry.io/otel/sdk/log`. (#5816)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
5 changes: 5 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, EnabledParameters) bool {
return !b.stopped.Load() && b.q != nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {
Expand Down
9 changes: 9 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestEmptyBatchConfig(t *testing.T) {
ctx := context.Background()
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, EnabledParameters{}), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
Expand Down Expand Up @@ -269,6 +270,14 @@ func TestBatchProcessor(t *testing.T) {
assert.Equal(t, 3, e.ExportN())
})

t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, EnabledParameters{}))

_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, EnabledParameters{}))
})

t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)
Expand Down
6 changes: 3 additions & 3 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error {
return nil
}

func (p timestampProcessor) Enabled(context.Context, Record) bool {
func (p timestampProcessor) Enabled(context.Context, EnabledParameters) bool {
return true
}

Expand All @@ -135,7 +135,7 @@ func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error {
return nil
}

func (p attrAddProcessor) Enabled(context.Context, Record) bool {
func (p attrAddProcessor) Enabled(context.Context, EnabledParameters) bool {
return true
}

Expand All @@ -154,7 +154,7 @@ func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error {
return nil
}

func (p attrSetDecorator) Enabled(context.Context, Record) bool {
func (p attrSetDecorator) Enabled(context.Context, EnabledParameters) bool {
return true
}

Expand Down
3 changes: 0 additions & 3 deletions sdk/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,5 @@ at a single endpoint their origin is decipherable.

See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.

See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"
25 changes: 7 additions & 18 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -59,7 +58,7 @@ func ExampleProcessor_filtering() {
// Wrap the processor so that it ignores processing log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{Processor: processor}
processor = &ContextFilterProcessor{processor}

// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
Expand All @@ -82,15 +81,6 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
}

type filter interface {
Enabled(ctx context.Context, param logapi.EnabledParameters) bool
}

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand All @@ -100,13 +90,8 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
return p.Processor.OnEmit(ctx, record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, param logapi.EnabledParameters) bool {
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, param))
func (p *ContextFilterProcessor) Enabled(ctx context.Context, param log.EnabledParameters) bool {
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, param)
}

func ignoreLogs(ctx context.Context) bool {
Expand Down Expand Up @@ -135,6 +120,10 @@ func ExampleProcessor_redact() {
// from attributes containing "token" in the key.
type RedactTokensProcessor struct{}

func (p *RedactTokensProcessor) Enabled(context.Context, log.EnabledParameters) bool {
return true
}

// OnEmit redacts values from attributes containing "token" in the key
// by replacing them with a REDACTED value.
func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record *log.Record) error {
Expand Down
35 changes: 0 additions & 35 deletions sdk/log/internal/x/README.md

This file was deleted.

47 changes: 0 additions & 47 deletions sdk/log/internal/x/x.go

This file was deleted.

106 changes: 85 additions & 21 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -43,29 +43,13 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}

// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process param for the provided context and param.
//
// If it is not possible to definitively determine the param will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, param, fltrs)
}

func anyEnabled(ctx context.Context, param log.EnabledParameters, fltrs []x.FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
// At least one Processor will process the Record.
func (l *logger) Enabled(ctx context.Context, r log.EnabledParameters) bool {
newParam := l.newEnabledParameters(ctx, r)
for _, p := range l.provider.processors {
if enabled := p.Enabled(ctx, newParam); enabled {
return true
}
}
// No Processor will process the record
return false
}

Expand Down Expand Up @@ -101,3 +85,83 @@ func (l *logger) newRecord(ctx context.Context, r log.Record) Record {

return newRecord
}

func (l *logger) newEnabledParameters(ctx context.Context, param log.EnabledParameters) EnabledParameters {
sc := trace.SpanContextFromContext(ctx)

newParam := EnabledParameters{
traceID: sc.TraceID(),
spanID: sc.SpanID(),
traceFlags: sc.TraceFlags(),

resource: l.provider.resource,
scope: &l.instrumentationScope,
}

if v, ok := param.Severity(); ok {
newParam.setSeverity(v)
}

return newParam
}

// EnabledParameters represent Enabled parameters.
type EnabledParameters struct {
severity log.Severity
severitySet bool

traceID trace.TraceID
spanID trace.SpanID
traceFlags trace.TraceFlags

// resource represents the entity that collected the log.
resource *resource.Resource

// scope is the Scope that the Logger was created with.
scope *instrumentation.Scope

noCmp [0]func() //nolint: unused // This is indeed used.
}

// Severity returns the [Severity] level value, or [SeverityUndefined] if no value was set.
// The ok result indicates whether the value was set.
func (r *EnabledParameters) Severity() (value log.Severity, ok bool) {
return r.severity, r.severitySet
}

// setSeverity sets the [Severity] level.
func (r *EnabledParameters) setSeverity(level log.Severity) {
r.severity = level
r.severitySet = true
}

// TraceID returns the trace ID or empty array.
func (r *EnabledParameters) TraceID() trace.TraceID {
return r.traceID
}

// SpanID returns the span ID or empty array.
func (r *EnabledParameters) SpanID() trace.SpanID {
return r.spanID
}

// TraceFlags returns the trace flags.
func (r *EnabledParameters) TraceFlags() trace.TraceFlags {
return r.traceFlags
}

// Resource returns the entity that collected the log.
func (r *EnabledParameters) Resource() resource.Resource {
if r.resource == nil {
return *resource.Empty()
}
return *r.resource
}

// InstrumentationScope returns the scope that the Logger was created with.
func (r *EnabledParameters) InstrumentationScope() instrumentation.Scope {
if r.scope == nil {
return instrumentation.Scope{}
}
return *r.scope
}
Loading
Loading