diff --git a/.chloggen/exporter-helper-v2-move-request.yaml b/.chloggen/exporter-helper-v2-move-request.yaml new file mode 100644 index 00000000000..67e1781b874 --- /dev/null +++ b/.chloggen/exporter-helper-v2-move-request.yaml @@ -0,0 +1,26 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Move the experimental request API to a separate package. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental Request API is moved from exporter/exporterhelper to exporter/exporterhelper/request package: + - `Request` -> `request.Request` + - `RequestItemsCounter` -> `request.ItemsCounter` + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..48a90660adf --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,37 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter/exporterhelper package: + - `WithRequestQueue`: a new exporter helper option for using a queue. + - queue.Queue: an interface for queue implementations. + - queue.Factory: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option. + - queue.Settings: queue factory settings. + - queue.Config: common configuration for queue implementations. + - queue.NewDefaultConfig: a function for creating a default queue configuration. + - queue/memoryqueue.NewFactory: a new factory for creating a memory queue. + - queue/memoryqueue.Config: a configuration for the memory queue factory. + - queue/memoryqueue.NewDefaultConfig: a function for creating a default memory queue configuration. + - request.ErrorHandler: an optional interface for handling errors that occur during request processing. + - request.Marshaler: a function that can marshal a Request into bytes. + - request.Unmarshaler: a function that can unmarshal bytes into a Request + All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 06134aae2f5..67f6c339e42 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -12,46 +12,58 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" ) // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { - start(ctx context.Context, host component.Host, set exporter.CreateSettings) error - shutdown(ctx context.Context) error - send(req internal.Request) error - setNextSender(nextSender requestSender) + send(req *intrequest.Request) error } -type baseRequestSender struct { - nextSender requestSender +type starter interface { + start(context.Context, component.Host) error } -var _ requestSender = (*baseRequestSender)(nil) +type shutdowner interface { + shutdown(ctx context.Context) error +} -func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error { - return nil +type senderWrapper struct { + sender requestSender + nextSender *senderWrapper } -func (b *baseRequestSender) shutdown(context.Context) error { +func (b *senderWrapper) start(ctx context.Context, host component.Host) error { + if s, ok := b.sender.(starter); ok { + return s.start(ctx, host) + } return nil } -func (b *baseRequestSender) send(req internal.Request) error { - return b.nextSender.send(req) +func (b *senderWrapper) shutdown(ctx context.Context) error { + if s, ok := b.sender.(shutdowner); ok { + return s.shutdown(ctx) + } + return nil } -func (b *baseRequestSender) setNextSender(nextSender requestSender) { - b.nextSender = nextSender +func (b *senderWrapper) send(req *intrequest.Request) error { + if b.sender == nil { + return b.nextSender.send(req) + } + return b.sender.send(req) } type errorLoggingRequestSender struct { - baseRequestSender - logger *zap.Logger + logger *zap.Logger + nextSender *senderWrapper } -func (l *errorLoggingRequestSender) send(req internal.Request) error { - err := l.baseRequestSender.send(req) +func (l *errorLoggingRequestSender) send(req *intrequest.Request) error { + err := l.nextSender.send(req) if err != nil { l.logger.Error( "Exporting failed", @@ -61,31 +73,7 @@ func (l *errorLoggingRequestSender) send(req internal.Request) error { return err } -type obsrepSenderFactory func(obsrep *ObsReport) requestSender - -// baseRequest is a base implementation for the internal.Request. -type baseRequest struct { - ctx context.Context - processingFinishedCallback func() -} - -func (req *baseRequest) Context() context.Context { - return req.ctx -} - -func (req *baseRequest) SetContext(ctx context.Context) { - req.ctx = ctx -} - -func (req *baseRequest) SetOnProcessingFinished(callback func()) { - req.processingFinishedCallback = callback -} - -func (req *baseRequest) OnProcessingFinished() { - if req.processingFinishedCallback != nil { - req.processingFinishedCallback() - } -} +type obsrepSenderFactory func(obsrep *ObsReport, nextSender *senderWrapper) requestSender // Option apply changes to baseExporter. type Option func(*baseExporter) @@ -110,7 +98,7 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // The default TimeoutSettings is 5 seconds. func WithTimeout(timeoutSettings TimeoutSettings) Option { return func(o *baseExporter) { - o.timeoutSender.cfg = timeoutSettings + o.timeoutSender.sender = &timeoutSender{cfg: timeoutSettings} } } @@ -118,29 +106,40 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option { // The default RetrySettings is to disable retries. func WithRetry(retrySettings RetrySettings) Option { return func(o *baseExporter) { - o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure) + o.retrySender.sender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure, o.retrySender.nextSender) } } // WithQueue overrides the default QueueSettings for an exporter. // The default QueueSettings is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. -func WithQueue(config QueueSettings) Option { +func WithQueue(cfg QueueSettings) Option { return func(o *baseExporter) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") + panic("this option is not available for the new request exporters, " + + "use WithMemoryQueue or WithPersistentQueue instead") } - var queue internal.Queue - if config.Enabled { - if config.StorageID == nil { - queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) - } else { - queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) - } + qf := persistentqueue.NewFactory(persistentqueue.Config{StorageID: cfg.StorageID}, o.marshaler, o.unmarshaler) + queueCfg := queue.Config{ + Enabled: cfg.Enabled, + NumConsumers: cfg.NumConsumers, + QueueItemsSize: cfg.QueueSize, } - qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger) - o.queueSender = qs + qs := newQueueSender(o.set, queueCfg, o.signal, qf, newRequestsCapacityLimiter(cfg.QueueSize), o.queueSender.nextSender) + o.setOnTemporaryFailure(qs.onTemporaryFailure) + o.queueSender.sender = qs + } +} + +// WithRequestQueue enables queueing for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithRequestQueue(cfg queue.Config, queueFactory queue.Factory) Option { + return func(o *baseExporter) { + qs := newQueueSender(o.set, cfg, o.signal, queueFactory, newItemsCapacityLimiter(cfg.QueueItemsSize), o.queueSender.nextSender) o.setOnTemporaryFailure(qs.onTemporaryFailure) + o.queueSender.sender = qs } } @@ -159,8 +158,8 @@ type baseExporter struct { component.ShutdownFunc requestExporter bool - marshaler internal.RequestMarshaler - unmarshaler internal.RequestUnmarshaler + marshaler request.Marshaler + unmarshaler request.Unmarshaler signal component.DataType set exporter.CreateSettings @@ -169,10 +168,10 @@ type baseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. - queueSender requestSender - obsrepSender requestSender - retrySender requestSender - timeoutSender *timeoutSender // timeoutSender is always initialized. + queueSender *senderWrapper + obsrepSender *senderWrapper + retrySender *senderWrapper + timeoutSender *senderWrapper // onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. onTemporaryFailure onRequestHandlingFinishedFunc @@ -181,8 +180,8 @@ type baseExporter struct { } // TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. -func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler, - unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler request.Marshaler, + unmarshaler request.Unmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}) if err != nil { @@ -195,35 +194,31 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req unmarshaler: unmarshaler, signal: signal, - queueSender: &baseRequestSender{}, - obsrepSender: osf(obsReport), - retrySender: &errorLoggingRequestSender{logger: set.Logger}, - timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, - set: set, obsrep: obsReport, } + // Initialize the chain of senders in the reverse order. + be.timeoutSender = &senderWrapper{sender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}} + be.retrySender = &senderWrapper{ + sender: &errorLoggingRequestSender{logger: set.Logger, nextSender: be.timeoutSender}, + nextSender: be.timeoutSender, + } + be.obsrepSender = &senderWrapper{sender: osf(obsReport, be.retrySender)} + be.queueSender = &senderWrapper{nextSender: be.obsrepSender} + for _, op := range options { op(be) } - be.connectSenders() return be, nil } // send sends the request using the first sender in the chain. -func (be *baseExporter) send(req internal.Request) error { +func (be *baseExporter) send(req *intrequest.Request) error { return be.queueSender.send(req) } -// connectSenders connects the senders in the predefined order. -func (be *baseExporter) connectSenders() { - be.queueSender.setNextSender(be.obsrepSender) - be.obsrepSender.setNextSender(be.retrySender) - be.retrySender.setNextSender(be.timeoutSender) -} - func (be *baseExporter) Start(ctx context.Context, host component.Host) error { // First start the wrapped exporter. if err := be.StartFunc.Start(ctx, host); err != nil { @@ -231,7 +226,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queueSender. - return be.queueSender.start(ctx, host, be.set) + return be.queueSender.start(ctx, host) } func (be *baseExporter) Shutdown(ctx context.Context) error { @@ -247,7 +242,7 @@ func (be *baseExporter) Shutdown(ctx context.Context) error { func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { be.onTemporaryFailure = onTemporaryFailure - if rs, ok := be.retrySender.(*retrySender); ok { + if rs, ok := be.retrySender.sender.(*retrySender); ok { rs.onTemporaryFailure = onTemporaryFailure } } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 10166c32974..e36b7a5c9d5 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -34,8 +34,8 @@ var ( } ) -func newNoopObsrepSender(_ *ObsReport) requestSender { - return &baseRequestSender{} +func newNoopObsrepSender(_ *ObsReport, nextSender *senderWrapper) requestSender { + return &senderWrapper{nextSender: nextSender} } func TestBaseExporter(t *testing.T) { diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index fa73dba3ff2..78efc05da47 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -7,81 +7,56 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" - "sync" + "errors" "sync/atomic" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" ) // boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue, // where the queue is bounded and if it fills up due to slow consumers, the new items written by // the producer are dropped. type boundedMemoryQueue struct { - stopWG sync.WaitGroup - stopped *atomic.Bool - items chan Request - numConsumers int + component.StartFunc + stopped *atomic.Bool + items chan *intrequest.Request } // NewBoundedMemoryQueue constructs the new queue of specified capacity. Capacity cannot be 0. -func NewBoundedMemoryQueue(capacity int, numConsumers int) Queue { +func NewBoundedMemoryQueue(capacity int) queue.Queue { return &boundedMemoryQueue{ - items: make(chan Request, capacity), - stopped: &atomic.Bool{}, - numConsumers: numConsumers, + items: make(chan *intrequest.Request, capacity), + stopped: &atomic.Bool{}, } } -// Start starts a given number of goroutines consuming items from the queue -// and passing them into the consumer callback. -func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set QueueSettings) error { - var startWG sync.WaitGroup - for i := 0; i < q.numConsumers; i++ { - q.stopWG.Add(1) - startWG.Add(1) - go func() { - startWG.Done() - defer q.stopWG.Done() - for item := range q.items { - set.Callback(item) - } - }() - } - startWG.Wait() - return nil -} - -// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. -func (q *boundedMemoryQueue) Produce(item Request) bool { +func (q *boundedMemoryQueue) Produce(item *intrequest.Request) error { if q.stopped.Load() { - return false + return errors.New("memory queue is stopped") } select { case q.items <- item: - return true + return nil default: - return false + // should not happen, as overflows should have been captured earlier + return errors.New("memory queue is full") } } +func (q *boundedMemoryQueue) Chan() <-chan *intrequest.Request { + return q.items +} + // Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped. func (q *boundedMemoryQueue) Shutdown(context.Context) error { q.stopped.Store(true) // disable producer close(q.items) - q.stopWG.Wait() return nil } -// Size returns the current size of the queue -func (q *boundedMemoryQueue) Size() int { - return len(q.items) -} - -func (q *boundedMemoryQueue) Capacity() int { - return cap(q.items) -} - func (q *boundedMemoryQueue) IsPersistent() bool { return false } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 39c433658f1..d48dae3061c 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -19,64 +19,66 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/exporter/exportertest" ) -func newNopQueueSettings(callback func(item Request)) QueueSettings { - return QueueSettings{ +func newNopQueueSettings(capacity int) queue.Settings { + return queue.Settings{ CreateSettings: exportertest.NewNopCreateSettings(), DataType: component.DataTypeMetrics, - Callback: callback, + Capacity: capacity, } } type stringRequest struct { - Request + request.Request str string } -func newStringRequest(str string) Request { +func newStringRequest(str string) request.Request { return stringRequest{str: str} } -// In this test we run a queue with capacity 1 and a single consumer. -// We want to test the overflow behavior, so we block the consumer -// by holding a startLock before submitting items to the queue. func TestBoundedQueue(t *testing.T) { - q := NewBoundedMemoryQueue(1, 1) - var startLock sync.Mutex startLock.Lock() // block consumers consumerState := newConsumerState(t) - assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) { - consumerState.record(item.(stringRequest).str) + stopWG := sync.WaitGroup{} + q := NewBoundedMemoryQueue(1) + startConsumers(q, 1, &stopWG, func(item *intrequest.Request) { + consumerState.record(item.Request.(stringRequest).str) // block further processing until startLock is released startLock.Lock() //nolint:staticcheck // SA2001 ignore this! startLock.Unlock() - }))) + }) + + assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) - assert.True(t, q.Produce(newStringRequest("a"))) + assert.Nil(t, q.Produce(intrequest.New(context.Background(), newStringRequest("a")))) // at this point "a" may or may not have been received by the consumer go-routine // so let's make sure it has been consumerState.waitToConsumeOnce() // at this point the item must have been read off the queue, but the consumer is blocked - assert.Equal(t, 0, q.Size()) + assert.Equal(t, 0, len(q.Chan())) consumerState.assertConsumed(map[string]bool{ "a": true, }) // produce two more items. The first one should be accepted, but not consumed. - assert.True(t, q.Produce(newStringRequest("b"))) - assert.Equal(t, 1, q.Size()) + assert.Nil(t, q.Produce(intrequest.New(context.Background(), newStringRequest("b")))) + assert.Equal(t, 1, len(q.Chan())) // the second should be rejected since the queue is full - assert.False(t, q.Produce(newStringRequest("c"))) - assert.Equal(t, 1, q.Size()) + assert.Error(t, q.Produce(intrequest.New(context.Background(), newStringRequest("c")))) + assert.Equal(t, 1, len(q.Chan())) startLock.Unlock() // unblock consumer @@ -91,13 +93,13 @@ func TestBoundedQueue(t *testing.T) { "b": true, } for _, item := range []string{"d", "e", "f"} { - assert.True(t, q.Produce(newStringRequest(item))) + assert.Nil(t, q.Produce(intrequest.New(context.Background(), newStringRequest(item)))) expected[item] = true consumerState.assertConsumed(expected) } assert.NoError(t, q.Shutdown(context.Background())) - assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue") + assert.Error(t, q.Produce(intrequest.New(context.Background(), newStringRequest("x")))) } // In this test we run a queue with many items and a slow consumer. @@ -107,29 +109,29 @@ func TestBoundedQueue(t *testing.T) { // only after Stop will mean the consumers are still locked while // trying to perform the final consumptions. func TestShutdownWhileNotEmpty(t *testing.T) { - q := NewBoundedMemoryQueue(10, 1) - consumerState := newConsumerState(t) + q := NewBoundedMemoryQueue(10) + stopWG := sync.WaitGroup{} + startConsumers(q, 1, &stopWG, func(req *intrequest.Request) { + consumerState.record(req.Request.(stringRequest).str) + time.Sleep(1 * time.Millisecond) + }) - assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) { - consumerState.record(item.(stringRequest).str) - time.Sleep(1 * time.Second) - }))) - - q.Produce(newStringRequest("a")) - q.Produce(newStringRequest("b")) - q.Produce(newStringRequest("c")) - q.Produce(newStringRequest("d")) - q.Produce(newStringRequest("e")) - q.Produce(newStringRequest("f")) - q.Produce(newStringRequest("g")) - q.Produce(newStringRequest("h")) - q.Produce(newStringRequest("i")) - q.Produce(newStringRequest("j")) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("a")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("b")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("c")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("d")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("e")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("f")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("g")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("h")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("i")))) + assert.NoError(t, q.Produce(intrequest.New(context.Background(), newStringRequest("j")))) assert.NoError(t, q.Shutdown(context.Background())) + stopWG.Wait() - assert.False(t, q.Produce(newStringRequest("x")), "cannot push to closed queue") + assert.Error(t, q.Produce(intrequest.New(context.Background(), newStringRequest("x")))) consumerState.assertConsumed(map[string]bool{ "a": true, "b": true, @@ -142,7 +144,7 @@ func TestShutdownWhileNotEmpty(t *testing.T) { "i": true, "j": true, }) - assert.Equal(t, 0, q.Size()) + assert.Equal(t, 0, len(q.Chan())) } func Benchmark_QueueUsage_10000_1_50000(b *testing.B) { @@ -190,13 +192,13 @@ func Benchmark_QueueUsage_10000_10_250000(b *testing.B) { func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) { b.ReportAllocs() for i := 0; i < b.N; i++ { - q := NewBoundedMemoryQueue(capacity, numConsumers) - err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) { + q := NewBoundedMemoryQueue(capacity) + stopWG := sync.WaitGroup{} + startConsumers(q, numConsumers, &stopWG, func(request *intrequest.Request) { time.Sleep(1 * time.Millisecond) - })) - require.NoError(b, err) + }) for j := 0; j < numberOfItems; j++ { - q.Produce(newStringRequest(fmt.Sprintf("%d", j))) + assert.Nil(b, q.Produce(intrequest.New(context.Background(), newStringRequest(fmt.Sprintf("%d", j))))) } assert.NoError(b, q.Shutdown(context.Background())) } @@ -248,23 +250,39 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { } func TestZeroSizeWithConsumers(t *testing.T) { - q := NewBoundedMemoryQueue(0, 1) + q := NewBoundedMemoryQueue(0) - err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) - assert.NoError(t, err) + stopWG := sync.WaitGroup{} + startConsumers(q, 1, &stopWG, func(request *intrequest.Request) {}) - assert.True(t, q.Produce(newStringRequest("a"))) // in process + assert.Nil(t, q.Produce(intrequest.New(context.Background(), newStringRequest("a")))) // in process assert.NoError(t, q.Shutdown(context.Background())) } func TestZeroSizeNoConsumers(t *testing.T) { - q := NewBoundedMemoryQueue(0, 0) + q := NewBoundedMemoryQueue(0) - err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) + err := q.Start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) - assert.False(t, q.Produce(newStringRequest("a"))) // in process + assert.Error(t, q.Produce(intrequest.New(context.Background(), newStringRequest("a")))) // in process assert.NoError(t, q.Shutdown(context.Background())) } + +func startConsumers(queue queue.Queue, numConsumers int, stopWG *sync.WaitGroup, consume func(request *intrequest.Request)) { + var startWG sync.WaitGroup + for i := 0; i < numConsumers; i++ { + stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer stopWG.Done() + for item := range queue.Chan() { + consume(item) + } + }() + } + startWG.Wait() +} diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index c40313f9c22..a4b1b8f144f 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -7,9 +7,14 @@ import ( "context" "errors" "fmt" - "sync" + + "go.uber.org/zap" + "sync/atomic" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -27,14 +32,15 @@ var ( // persistentQueue holds the queue backed by file storage type persistentQueue struct { - stopWG sync.WaitGroup - stopChan chan struct{} - storageID component.ID - storage *persistentContiguousStorage - capacity uint64 - numConsumers int - marshaler RequestMarshaler - unmarshaler RequestUnmarshaler + logger *zap.Logger + compID component.ID + stopped *atomic.Bool + dataType component.DataType + storageID component.ID + storage *persistentContiguousStorage + capacity uint64 + marshaler request.Marshaler + unmarshaler request.Unmarshaler } // buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done @@ -44,65 +50,50 @@ func buildPersistentStorageName(name string, signal component.DataType) string { } // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue(capacity int, numConsumers int, storageID component.ID, marshaler RequestMarshaler, - unmarshaler RequestUnmarshaler) Queue { +func NewPersistentQueue(set queue.Settings, storageID component.ID, marshaler request.Marshaler, unmarshaler request.Unmarshaler) queue.Queue { return &persistentQueue{ - capacity: uint64(capacity), - numConsumers: numConsumers, - storageID: storageID, - marshaler: marshaler, - unmarshaler: unmarshaler, - stopChan: make(chan struct{}), + logger: set.Logger, + capacity: uint64(set.Capacity), + stopped: &atomic.Bool{}, + compID: set.ID, + storageID: storageID, + marshaler: marshaler, + unmarshaler: unmarshaler, + dataType: set.DataType, } } // Start starts the persistentQueue with the given number of consumers. -func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set QueueSettings) error { - storageClient, err := toStorageClient(ctx, pq.storageID, host, set.ID, set.DataType) +func (pq *persistentQueue) Start(ctx context.Context, host component.Host) error { + storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.compID, pq.dataType) if err != nil { return err } - storageName := buildPersistentStorageName(set.ID.Name(), set.DataType) - pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, set.Logger, pq.capacity, pq.marshaler, pq.unmarshaler) - for i := 0; i < pq.numConsumers; i++ { - pq.stopWG.Add(1) - go func() { - defer pq.stopWG.Done() - for { - select { - case req := <-pq.storage.get(): - set.Callback(req) - case <-pq.stopChan: - return - } - } - }() - } + storageName := buildPersistentStorageName(pq.compID.Name(), pq.dataType) + pq.storage = newPersistentContiguousStorage(ctx, storageName, storageClient, pq.logger, pq.capacity, pq.marshaler, pq.unmarshaler) return nil } // Produce adds an item to the queue and returns true if it was accepted -func (pq *persistentQueue) Produce(item Request) bool { - err := pq.storage.put(item) - return err == nil +func (pq *persistentQueue) Produce(req *intrequest.Request) error { + if pq.stopped.Load() { + return errors.New("persistent queue is stopped") + } + return pq.storage.put(req) +} + +// Chan returns a channel to pull the requests from. +func (pq *persistentQueue) Chan() <-chan *intrequest.Request { + return pq.storage.get() } // Shutdown stops accepting items, shuts down the queue and closes the persistent queue func (pq *persistentQueue) Shutdown(ctx context.Context) error { - close(pq.stopChan) - pq.stopWG.Wait() + // stop accepting requests before the storage or the successful processing result will fail to write to persistent storage + pq.stopped.Store(true) return stopStorage(pq.storage, ctx) } -// Size returns the current depth of the queue, excluding the item already in the storage channel (if any) -func (pq *persistentQueue) Size() int { - return int(pq.storage.size()) -} - -func (pq *persistentQueue) Capacity() int { - return int(pq.capacity) -} - func (pq *persistentQueue) IsPersistent() bool { return true } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 25241c6a8f4..b098e8fb874 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strconv" + "sync" "sync/atomic" "testing" "time" @@ -16,6 +17,8 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -32,73 +35,61 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { } // createTestQueue creates and starts a fake queue with the given capacity and number of consumers. -func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) Queue { - pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(), +func createTestQueue(t *testing.T, capacity int) queue.Queue { + pq := NewPersistentQueue(newNopQueueSettings(capacity), component.ID{}, newFakeTracesRequestMarshalerFunc(), newFakeTracesRequestUnmarshalerFunc()) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} - err := pq.Start(context.Background(), host, newNopQueueSettings(callback)) + err := pq.Start(context.Background(), host) require.NoError(t, err) return pq } func TestPersistentQueue_Capacity(t *testing.T) { - pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), - newFakeTracesRequestUnmarshalerFunc()) - host := &mockHost{ext: map[component.ID]component.Component{ - {}: NewMockStorageExtension(nil), - }} - err := pq.Start(context.Background(), host, newNopQueueSettings(func(req Request) {})) - require.NoError(t, err) - - // Stop consumer to imitate queue overflow - close(pq.(*persistentQueue).stopChan) - pq.(*persistentQueue).stopWG.Wait() - - assert.Equal(t, 0, pq.Size()) + pq := createTestQueue(t, 5) + assert.Equal(t, 0, len(pq.Chan())) req := newFakeTracesRequest(newTraces(1, 10)) for i := 0; i < 10; i++ { - result := pq.Produce(req) + err := pq.Produce(intrequest.New(context.Background(), req)) if i < 6 { - assert.True(t, result) + assert.NoError(t, err) } else { - assert.False(t, result) + assert.Error(t, err) } // Let's make sure the loop picks the first element into the channel, // so the capacity could be used in full if i == 0 { assert.Eventually(t, func() bool { - return pq.Size() == 0 + return len(pq.Chan()) == 0 }, 5*time.Second, 10*time.Millisecond) } } - assert.Equal(t, 5, pq.Size()) assert.NoError(t, stopStorage(pq.(*persistentQueue).storage, context.Background())) } func TestPersistentQueueShutdown(t *testing.T) { - pq := createTestQueue(t, 1001, 100, func(item Request) {}) + pq := createTestQueue(t, 1001) req := newFakeTracesRequest(newTraces(1, 10)) for i := 0; i < 1000; i++ { - pq.Produce(req) + assert.NoError(t, pq.Produce(intrequest.New(context.Background(), req))) } assert.NoError(t, pq.Shutdown(context.Background())) } // Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { - pq := createTestQueue(t, 1001, 1, func(item Request) {}) + pq := createTestQueue(t, 1001) lastRequestProcessedTime := time.Now() - req := newFakeTracesRequest(newTraces(1, 10)) - req.processingFinishedCallback = func() { + req := intrequest.New(context.Background(), newFakeTracesRequest(newTraces(1, 10))) + req.SetOnProcessingFinished(func() { lastRequestProcessedTime = time.Now() - } + }) fnBefore := stopStorage stopStorageTime := time.Now() @@ -108,7 +99,7 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { } for i := 0; i < 1000; i++ { - pq.Produce(req) + assert.NoError(t, pq.Produce(req)) } assert.NoError(t, pq.Shutdown(context.Background())) assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time") @@ -147,14 +138,16 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newFakeTracesRequest(newTraces(1, 10)) numMessagesConsumed := &atomic.Int32{} - pq := createTestQueue(t, 1000, c.numConsumers, func(item Request) { + stopWG := &sync.WaitGroup{} + pq := createTestQueue(t, 1000) + startConsumers(pq, c.numConsumers, stopWG, func(item *intrequest.Request) { if item != nil { numMessagesConsumed.Add(int32(1)) } }) for i := 0; i < c.numMessagesProduced; i++ { - pq.Produce(req) + assert.NoError(t, pq.Produce(intrequest.New(context.Background(), req))) } assert.Eventually(t, func() bool { @@ -279,7 +272,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := NewPersistentQueue(1, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(), + pq := NewPersistentQueue(newNopQueueSettings(1), component.ID{}, newFakeTracesRequestMarshalerFunc(), newFakeTracesRequestUnmarshalerFunc()) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 214e9f4d92f..12d346f09e1 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -15,6 +15,8 @@ import ( "go.uber.org/zap" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/extension/experimental/storage" ) @@ -43,14 +45,14 @@ import ( type persistentContiguousStorage struct { logger *zap.Logger client storage.Client - unmarshaler RequestUnmarshaler - marshaler RequestMarshaler + unmarshaler intrequest.Unmarshaler + marshaler intrequest.Marshaler putChan chan struct{} stopChan chan struct{} capacity uint64 - reqChan chan Request + reqChan chan *intrequest.Request mu sync.Mutex readIndex itemIndex @@ -81,17 +83,25 @@ var ( // newPersistentContiguousStorage creates a new file-storage extension backed queue; // queueName parameter must be a unique value that identifies the queue. func newPersistentContiguousStorage(ctx context.Context, queueName string, client storage.Client, - logger *zap.Logger, capacity uint64, marshaler RequestMarshaler, unmarshaler RequestUnmarshaler) *persistentContiguousStorage { + logger *zap.Logger, capacity uint64, marshaler request.Marshaler, unmarshaler request.Unmarshaler) *persistentContiguousStorage { pcs := &persistentContiguousStorage{ - logger: logger.With(zap.String(zapQueueNameKey, queueName)), - client: client, - unmarshaler: unmarshaler, - marshaler: marshaler, - capacity: capacity, - putChan: make(chan struct{}, capacity), - reqChan: make(chan Request), - stopChan: make(chan struct{}), - itemsCount: &atomic.Uint64{}, + logger: logger.With(zap.String(zapQueueNameKey, queueName)), + client: client, + marshaler: func(req *intrequest.Request) ([]byte, error) { + return marshaler(req.Request) + }, + unmarshaler: func(b []byte) (*intrequest.Request, error) { + req, err := unmarshaler(b) + if err != nil { + return nil, err + } + return intrequest.New(context.Background(), req), nil + }, + capacity: capacity, + putChan: make(chan struct{}, capacity), + reqChan: make(chan *intrequest.Request), + stopChan: make(chan struct{}), + itemsCount: &atomic.Uint64{}, } pcs.initPersistentContiguousStorage(ctx) @@ -138,7 +148,7 @@ func (pcs *persistentContiguousStorage) initPersistentContiguousStorage(ctx cont pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex)) } -func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Request) { +func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []*intrequest.Request) { if len(reqs) > 0 { errCount := 0 for _, req := range reqs { @@ -163,6 +173,7 @@ func (pcs *persistentContiguousStorage) loop() { for { select { case <-pcs.stopChan: + close(pcs.reqChan) return case <-pcs.putChan: req, found := pcs.getNextItem(context.Background()) @@ -174,7 +185,7 @@ func (pcs *persistentContiguousStorage) loop() { } // get returns the request channel that all the requests will be send on -func (pcs *persistentContiguousStorage) get() <-chan Request { +func (pcs *persistentContiguousStorage) get() <-chan *intrequest.Request { return pcs.reqChan } @@ -190,7 +201,7 @@ func (pcs *persistentContiguousStorage) stop(ctx context.Context) error { } // put marshals the request and puts it into the persistent queue -func (pcs *persistentContiguousStorage) put(req Request) error { +func (pcs *persistentContiguousStorage) put(req *intrequest.Request) error { // Nil requests are ignored if req == nil { return nil @@ -200,7 +211,7 @@ func (pcs *persistentContiguousStorage) put(req Request) error { defer pcs.mu.Unlock() if pcs.size() >= pcs.capacity { - pcs.logger.Warn("Maximum queue capacity reached") + // Should not happen because the queue sender has a controls the capacity separately. return errMaxCapacityReached } @@ -224,7 +235,7 @@ func (pcs *persistentContiguousStorage) put(req Request) error { } // getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false) -func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Request, bool) { +func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (*intrequest.Request, bool) { pcs.mu.Lock() defer pcs.mu.Unlock() @@ -237,7 +248,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques pcs.updateReadIndex(ctx) pcs.itemDispatchingStart(ctx, index) - var req Request + var req *intrequest.Request itemKey := getItemKey(index) buf, err := pcs.client.Get(ctx, itemKey) if err == nil { @@ -270,8 +281,8 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques // retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage // and moves the items back to the queue. The function returns an array which might contain nils // if unmarshalling of the value at a given index was not possible. -func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []Request { - var reqs []Request +func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []*intrequest.Request { + var reqs []*intrequest.Request var dispatchedItems []itemIndex pcs.mu.Lock() @@ -293,7 +304,7 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co pcs.logger.Debug("No items left for dispatch by consumers") } - reqs = make([]Request, len(dispatchedItems)) + reqs = make([]*intrequest.Request, len(dispatchedItems)) retrieveBatch := make([]storage.Operation, len(dispatchedItems)) cleanupBatch := make([]storage.Operation, len(dispatchedItems)) for i, it := range dispatchedItems { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index 66ea4e06f45..e1692386646 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -18,6 +18,8 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/extension/experimental/storage" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -38,29 +40,25 @@ func createTestPersistentStorage(client storage.Client) *persistentContiguousSto } type fakeTracesRequest struct { - td ptrace.Traces - processingFinishedCallback func() - Request + td ptrace.Traces } -func newFakeTracesRequest(td ptrace.Traces) *fakeTracesRequest { - return &fakeTracesRequest{ - td: td, - } +func (ftr *fakeTracesRequest) Export(context.Context) error { + return nil } -func (fd *fakeTracesRequest) OnProcessingFinished() { - if fd.processingFinishedCallback != nil { - fd.processingFinishedCallback() - } +func (ftr *fakeTracesRequest) ItemsCount() int { + return ftr.td.SpanCount() } -func (fd *fakeTracesRequest) SetOnProcessingFinished(callback func()) { - fd.processingFinishedCallback = callback +func newFakeTracesRequest(td ptrace.Traces) *fakeTracesRequest { + return &fakeTracesRequest{ + td: td, + } } -func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { - return func(bytes []byte) (Request, error) { +func newFakeTracesRequestUnmarshalerFunc() request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { unmarshaler := ptrace.ProtoUnmarshaler{} traces, err := unmarshaler.UnmarshalTraces(bytes) if err != nil { @@ -70,8 +68,8 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } -func newFakeTracesRequestMarshalerFunc() RequestMarshaler { - return func(req Request) ([]byte, error) { +func newFakeTracesRequestMarshalerFunc() request.Marshaler { + return func(req request.Request) ([]byte, error) { marshaler := ptrace.ProtoMarshaler{} return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) } @@ -149,7 +147,7 @@ func TestPersistentStorage_CorruptedData(t *testing.T) { // Put some items, make sure they are loaded and shutdown the storage... for i := 0; i < 3; i++ { - err := ps.put(req) + err := ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) } require.Eventually(t, func() bool { @@ -198,7 +196,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { ps := createTestPersistentStorage(client) for i := 0; i < 5; i++ { - err := ps.put(req) + err := ps.put(intrequest.New(context.Background(), req)) require.NoError(t, err) } @@ -207,7 +205,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { // Now, this will take item 0 and pull item 1 into the unbuffered channel readReq := <-ps.get() - assert.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + assert.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td) requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1}) // This takes item 1 from channel and pulls another one (item 2) into the unbuffered channel @@ -257,7 +255,7 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) { var capacity uint64 = 5 // arbitrary small number traces := newTraces(5, 10) - req := newFakeTracesRequest(traces) + req := intrequest.New(context.Background(), newFakeTracesRequest(traces)) ext := NewMockStorageExtension(nil) client := createTestClient(t, ext) @@ -290,7 +288,8 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) { } func TestPersistentStorage_PutCloseReadClose(t *testing.T) { - req := newFakeTracesRequest(newTraces(5, 10)) + td := newTraces(5, 10) + req := intrequest.New(context.Background(), newFakeTracesRequest(td)) ext := NewMockStorageExtension(nil) ps := createTestPersistentStorage(createTestClient(t, ext)) require.Equal(t, uint64(0), ps.size()) @@ -310,10 +309,10 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) { // Lets read both of the elements we put readReq := <-ps.get() - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + require.Equal(t, td, readReq.Request.(*fakeTracesRequest).td) readReq = <-ps.get() - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + require.Equal(t, td, readReq.Request.(*fakeTracesRequest).td) require.Equal(t, uint64(0), ps.size()) assert.NoError(t, ps.stop(context.Background())) } @@ -352,7 +351,7 @@ func BenchmarkPersistentStorage_TraceSpans(b *testing.B) { client := createTestClient(b, ext) ps := createTestPersistentStorageWithCapacity(client, 10000000) - req := newFakeTracesRequest(newTraces(c.numTraces, c.numSpansPerTrace)) + req := intrequest.New(context.Background(), newFakeTracesRequest(newTraces(c.numTraces, c.numSpansPerTrace))) bb.ResetTimer() @@ -452,7 +451,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) { // Put enough items in to fill the underlying storage reqCount := 0 for { - err = ps.put(req) + err = ps.put(intrequest.New(context.Background(), req)) if errors.Is(err, syscall.ENOSPC) { break } @@ -465,7 +464,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) { client.SetMaxSizeInBytes(newMaxSize) // Try to put an item in, should fail - require.Error(t, ps.put(req)) + require.Error(t, ps.put(intrequest.New(context.Background(), req))) // Take out all the items for i := reqCount; i > 0; i-- { @@ -475,7 +474,7 @@ func TestPersistentStorage_StorageFull(t *testing.T) { // We should be able to put a new item in // However, this will fail if deleting items fails with full storage - require.NoError(t, ps.put(req)) + require.NoError(t, ps.put(intrequest.New(context.Background(), req))) } func TestPersistentStorage_ItemDispatchingFinish_ErrorHandling(t *testing.T) { diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go deleted file mode 100644 index 52dfdefc603..00000000000 --- a/exporter/exporterhelper/internal/queue.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright The OpenTelemetry Authors -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" -) - -type QueueSettings struct { - exporter.CreateSettings - DataType component.DataType - Callback func(item Request) -} - -// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue -// (boundedMemoryQueue) or via a disk-based queue (persistentQueue) -type Queue interface { - // Start starts the queue with a given number of goroutines consuming items from the queue - // and passing them into the consumer callback. - Start(ctx context.Context, host component.Host, set QueueSettings) error - // Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added - // to the queue due to queue overflow. - Produce(item Request) bool - // Size returns the current Size of the queue - Size() int - // Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped. - Shutdown(ctx context.Context) error - // Capacity returns the capacity of the queue. - Capacity() int - // IsPersistent returns true if the queue is persistent. - // TODO: Do not expose this method if the interface moves to a public package. - IsPersistent() bool -} diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go deleted file mode 100644 index 454a42782ce..00000000000 --- a/exporter/exporterhelper/internal/request.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - -import "context" - -// Request defines capabilities required for persistent storage of a request -type Request interface { - // Context returns the context.Context of the requests. - Context() context.Context - - // SetContext updates the context.Context of the requests. - SetContext(context.Context) - - Export(ctx context.Context) error - - // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. - // Otherwise, it should return the original Request. - OnError(error) Request - - // Count returns the count of spans/metric points or log records. - Count() int - - // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished - OnProcessingFinished() - - // SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue) - SetOnProcessingFinished(callback func()) -} - -// RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request -type RequestUnmarshaler func([]byte) (Request, error) - -// RequestMarshaler defines a function which takes a request and marshals it into a byte slice -type RequestMarshaler func(Request) ([]byte, error) diff --git a/exporter/exporterhelper/internal/request/request.go b/exporter/exporterhelper/internal/request/request.go new file mode 100644 index 00000000000..23fd77ec156 --- /dev/null +++ b/exporter/exporterhelper/internal/request/request.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/request" +) + +// Request is a wrapper around request.Request which adds context and a callback to be called when the request +// is finished processing. +type Request struct { + request.Request + ctx context.Context + onProcessingFinishedCallback func() +} + +func New(ctx context.Context, req request.Request) *Request { + return &Request{ + Request: req, + ctx: ctx, + } +} + +// Context returns the context.Context of the requests. +func (req *Request) Context() context.Context { + return req.ctx +} + +// SetContext updates the context.Context of the request. +func (req *Request) SetContext(ctx context.Context) { + req.ctx = ctx +} + +func (req *Request) OnError(err error) *Request { + if r, ok := req.Request.(request.ErrorHandler); ok { + return New(req.ctx, r.OnError(err)) + } + return req +} + +// Count returns a number of items in the request. If the request does not implement RequestItemsCounter +// then 0 is returned. +func (req *Request) Count() int { + if counter, ok := req.Request.(request.ItemsCounter); ok { + return counter.ItemsCount() + } + return 0 +} + +// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished +func (req *Request) OnProcessingFinished() { + if req.onProcessingFinishedCallback != nil { + req.onProcessingFinishedCallback() + } +} + +// SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue) +func (req *Request) SetOnProcessingFinished(callback func()) { + req.onProcessingFinishedCallback = callback +} + +// Unmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request +type Unmarshaler func([]byte) (*Request, error) + +// Marshaler defines a function which takes a request and marshals it into a byte slice +type Marshaler func(*Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index d3ea32cea90..999afc1706f 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/plog" ) @@ -21,37 +22,35 @@ var logsMarshaler = &plog.ProtoMarshaler{} var logsUnmarshaler = &plog.ProtoUnmarshaler{} type logsRequest struct { - baseRequest ld plog.Logs pusher consumer.ConsumeLogsFunc } -func newLogsRequest(ctx context.Context, ld plog.Logs, pusher consumer.ConsumeLogsFunc) internal.Request { +func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) request.Request { return &logsRequest{ - baseRequest: baseRequest{ctx: ctx}, - ld: ld, - pusher: pusher, + ld: ld, + pusher: pusher, } } -func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { logs, err := logsUnmarshaler.UnmarshalLogs(bytes) if err != nil { return nil, err } - return newLogsRequest(context.Background(), logs, pusher), nil + return newLogsRequest(logs, pusher), nil } } -func logsRequestMarshaler(req internal.Request) ([]byte, error) { +func logsRequestMarshaler(req request.Request) ([]byte, error) { return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) } -func (req *logsRequest) OnError(err error) internal.Request { +func (req *logsRequest) OnError(err error) request.Request { var logError consumererror.Logs if errors.As(err, &logError) { - return newLogsRequest(req.ctx, logError.Data(), req.pusher) + return newLogsRequest(logError.Data(), req.pusher) } return req } @@ -60,7 +59,7 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -96,7 +95,7 @@ func NewLogsExporter( } lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { - req := newLogsRequest(ctx, ld, pusher) + req := intrequest.New(ctx, newLogsRequest(ld, pusher)) serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeLogs, int64(req.Count())) @@ -115,7 +114,7 @@ func NewLogsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type LogsConverter interface { // RequestFromLogs converts plog.Logs data into a request. - RequestFromLogs(context.Context, plog.Logs) (Request, error) + RequestFromLogs(context.Context, plog.Logs) (request.Request, error) } // NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. @@ -148,7 +147,7 @@ func NewLogsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := intrequest.New(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeLogs, int64(r.Count())) @@ -163,15 +162,15 @@ func NewLogsRequestExporter( } type logsExporterWithObservability struct { - baseRequestSender - obsrep *ObsReport + obsrep *ObsReport + nextSender *senderWrapper } -func newLogsExporterWithObservability(obsrep *ObsReport) requestSender { - return &logsExporterWithObservability{obsrep: obsrep} +func newLogsExporterWithObservability(obsrep *ObsReport, nextSender *senderWrapper) requestSender { + return &logsExporterWithObservability{obsrep: obsrep, nextSender: nextSender} } -func (lewo *logsExporterWithObservability) send(req internal.Request) error { +func (lewo *logsExporterWithObservability) send(req *intrequest.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index a0ac7cfeabf..70d6d34ea4a 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" @@ -42,12 +43,13 @@ var ( ) func TestLogsRequest(t *testing.T) { - lr := newLogsRequest(context.Background(), testdata.GenerateLogs(1), nil) + ctx := context.Background() + lr := intrequest.New(ctx, newLogsRequest(testdata.GenerateLogs(1), nil)) logErr := consumererror.NewLogs(errors.New("some error"), plog.NewLogs()) assert.EqualValues( t, - newLogsRequest(context.Background(), plog.NewLogs(), nil), + intrequest.New(ctx, newLogsRequest(plog.NewLogs(), nil)), lr.OnError(logErr), ) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 4edd3997bb2..369220feb8b 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -21,37 +22,35 @@ var metricsMarshaler = &pmetric.ProtoMarshaler{} var metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} type metricsRequest struct { - baseRequest md pmetric.Metrics pusher consumer.ConsumeMetricsFunc } -func newMetricsRequest(ctx context.Context, md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) internal.Request { +func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) request.Request { return &metricsRequest{ - baseRequest: baseRequest{ctx: ctx}, - md: md, - pusher: pusher, + md: md, + pusher: pusher, } } -func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) if err != nil { return nil, err } - return newMetricsRequest(context.Background(), metrics, pusher), nil + return newMetricsRequest(metrics, pusher), nil } } -func metricsRequestMarshaler(req internal.Request) ([]byte, error) { +func metricsRequestMarshaler(req request.Request) ([]byte, error) { return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) } -func (req *metricsRequest) OnError(err error) internal.Request { +func (req *metricsRequest) OnError(err error) request.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { - return newMetricsRequest(req.ctx, metricsError.Data(), req.pusher) + return newMetricsRequest(metricsError.Data(), req.pusher) } return req } @@ -60,7 +59,7 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -96,7 +95,7 @@ func NewMetricsExporter( } mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { - req := newMetricsRequest(ctx, md, pusher) + req := intrequest.New(ctx, newMetricsRequest(md, pusher)) serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeMetrics, int64(req.Count())) @@ -115,7 +114,7 @@ func NewMetricsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type MetricsConverter interface { // RequestFromMetrics converts pdata.Metrics into a request. - RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) + RequestFromMetrics(context.Context, pmetric.Metrics) (request.Request, error) } // NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender. @@ -148,7 +147,7 @@ func NewMetricsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := intrequest.New(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeMetrics, int64(r.Count())) @@ -163,15 +162,15 @@ func NewMetricsRequestExporter( } type metricsSenderWithObservability struct { - baseRequestSender - obsrep *ObsReport + obsrep *ObsReport + nextSender *senderWrapper } -func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender { - return &metricsSenderWithObservability{obsrep: obsrep} +func newMetricsSenderWithObservability(obsrep *ObsReport, nextSender *senderWrapper) requestSender { + return &metricsSenderWithObservability{obsrep: obsrep, nextSender: nextSender} } -func (mewo *metricsSenderWithObservability) send(req internal.Request) error { +func (mewo *metricsSenderWithObservability) send(req *intrequest.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 5d16908e452..1627762c082 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" @@ -42,12 +43,13 @@ var ( ) func TestMetricsRequest(t *testing.T) { - mr := newMetricsRequest(context.Background(), testdata.GenerateMetrics(1), nil) + ctx := context.Background() + mr := intrequest.New(ctx, newMetricsRequest(testdata.GenerateMetrics(1), nil)) metricsErr := consumererror.NewMetrics(errors.New("some error"), pmetric.NewMetrics()) assert.EqualValues( t, - newMetricsRequest(context.Background(), pmetric.NewMetrics(), nil), + intrequest.New(ctx, newMetricsRequest(pmetric.NewMetrics(), nil)), mr.OnError(metricsErr), ) } diff --git a/exporter/exporterhelper/queue/config.go b/exporter/exporterhelper/queue/config.go new file mode 100644 index 00000000000..d27437d77f0 --- /dev/null +++ b/exporter/exporterhelper/queue/config.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + +import "errors" + +// Config defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueItemsSize is the maximum number of items (spans, metric data points or log records) + // allowed in queue at any given time. + QueueItemsSize int `mapstructure:"queue_size"` +} + +// NewDefaultConfig returns the default Config. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultConfig() Config { + return Config{ + Enabled: true, + NumConsumers: 10, + QueueItemsSize: 100_000, + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *Config) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.NumConsumers <= 0 { + return errors.New("number of consumers must be positive") + } + if qCfg.QueueItemsSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} diff --git a/exporter/exporterhelper/queue/factory.go b/exporter/exporterhelper/queue/factory.go new file mode 100644 index 00000000000..17fa1f0dfb9 --- /dev/null +++ b/exporter/exporterhelper/queue/factory.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" +) + +// Settings defines parameters for creating a Queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Settings struct { + exporter.CreateSettings + DataType component.DataType + // Capacity is the maximum possible number of requests the queue should handle. + // Queue sender has another limiter, so this capacity is never reached. + Capacity int +} + +// Factory defines a factory interface for creating a Queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Factory func(Settings) Queue diff --git a/exporter/exporterhelper/queue/memoryqueue/factory.go b/exporter/exporterhelper/queue/memoryqueue/factory.go new file mode 100644 index 00000000000..2e74b41e507 --- /dev/null +++ b/exporter/exporterhelper/queue/memoryqueue/factory.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memoryqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/memoryqueue" + +import ( + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" +) + +// NewFactory returns a factory to create a new memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewFactory() queue.Factory { + return func(set queue.Settings) queue.Queue { + return internal.NewBoundedMemoryQueue(set.Capacity) + } +} diff --git a/exporter/exporterhelper/queue/memoryqueue/factory_test.go b/exporter/exporterhelper/queue/memoryqueue/factory_test.go new file mode 100644 index 00000000000..ea3c1e49bb3 --- /dev/null +++ b/exporter/exporterhelper/queue/memoryqueue/factory_test.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memoryqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" +) + +func TestNewFactory(t *testing.T) { + assert.NotNil(t, NewFactory()(queue.Settings{})) +} diff --git a/exporter/exporterhelper/queue/persistentqueue/config.go b/exporter/exporterhelper/queue/persistentqueue/config.go new file mode 100644 index 00000000000..8705556a073 --- /dev/null +++ b/exporter/exporterhelper/queue/persistentqueue/config.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" + +import ( + "go.opentelemetry.io/collector/component" +) + +// Config defines configuration part for persistent queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} diff --git a/exporter/exporterhelper/queue/persistentqueue/factory.go b/exporter/exporterhelper/queue/persistentqueue/factory.go new file mode 100644 index 00000000000..20f08873cfe --- /dev/null +++ b/exporter/exporterhelper/queue/persistentqueue/factory.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" + +import ( + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue/memoryqueue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" +) + +// NewFactory returns a factory of persistent queue. +// If cfg.StorageID is nil then it falls back to memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewFactory(cfg Config, marshaler request.Marshaler, unmarshaler request.Unmarshaler) queue.Factory { + if cfg.StorageID == nil { + return memoryqueue.NewFactory() + } + return func(set queue.Settings) queue.Queue { + return internal.NewPersistentQueue(set, *cfg.StorageID, marshaler, unmarshaler) + } +} diff --git a/exporter/exporterhelper/queue/persistentqueue/factory_test.go b/exporter/exporterhelper/queue/persistentqueue/factory_test.go new file mode 100644 index 00000000000..c729bee98a9 --- /dev/null +++ b/exporter/exporterhelper/queue/persistentqueue/factory_test.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package persistentqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" +) + +func TestNewFactory(t *testing.T) { + f := NewFactory(Config{StorageID: nil}, nil, nil) + assert.NotNil(t, f(queue.Settings{})) + + storage := component.NewID("fake-storage") + f = NewFactory(Config{StorageID: &storage}, nil, nil) + assert.NotNil(t, f(queue.Settings{})) +} diff --git a/exporter/exporterhelper/queue/queue.go b/exporter/exporterhelper/queue/queue.go new file mode 100644 index 00000000000..b2f31f00c6a --- /dev/null +++ b/exporter/exporterhelper/queue/queue.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + +import ( + "go.opentelemetry.io/collector/component" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" +) + +// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue +// (boundedMemoryQueue) or via a disk-based queue (persistentQueue) +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Queue interface { + component.Component + // Chan retrieves a channel to pull requests from the queue. + // The channel must be closed when the queue is shutdown. + Chan() <-chan *intrequest.Request + // Produce is used by the producer to submit new item to the queue. + Produce(*intrequest.Request) error + // IsPersistent returns true if the queue is persistent. + // TODO: Remove this method once we move it to the config. + IsPersistent() bool +} diff --git a/exporter/exporterhelper/queue/queue_test.go b/exporter/exporterhelper/queue/queue_test.go new file mode 100644 index 00000000000..e01f37d705f --- /dev/null +++ b/exporter/exporterhelper/queue/queue_test.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.NumConsumers = 0 + assert.EqualError(t, qCfg.Validate(), "number of consumers must be positive") + + qCfg = NewDefaultConfig() + qCfg.QueueItemsSize = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} diff --git a/exporter/exporterhelper/queue_capacity.go b/exporter/exporterhelper/queue_capacity.go new file mode 100644 index 00000000000..5a9eccbeb02 --- /dev/null +++ b/exporter/exporterhelper/queue_capacity.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import ( + "sync/atomic" + + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" +) + +type queueCapacityLimiter interface { + // Claim tries to claim capacity for the given item. If the capacity is not available, it returns false. + Claim(item *intrequest.Request) bool + + // Release releases capacity for the given item. + Release(item *intrequest.Request) + + // Size is current size of the queue. + Size() int + + // Capacity is the maximum capacity of the queue. + Capacity() int +} + +type baseQueueCapacityLimiter struct { + size *atomic.Uint32 + capacity int +} + +func newBaseQueueCapacityLimiter(capacity int) *baseQueueCapacityLimiter { + return &baseQueueCapacityLimiter{ + size: &atomic.Uint32{}, + capacity: capacity, + } +} + +func (bqcl *baseQueueCapacityLimiter) Size() int { + return int(bqcl.size.Load()) +} + +func (bqcl *baseQueueCapacityLimiter) Capacity() int { + return bqcl.capacity +} + +// requestCapacityLimiter is a capacity limiter that limits the queue based on the number of requests. +type requestsCapacityLimiter struct { + baseQueueCapacityLimiter +} + +var _ queueCapacityLimiter = (*requestsCapacityLimiter)(nil) + +func newRequestsCapacityLimiter(capacity int) *requestsCapacityLimiter { + return &requestsCapacityLimiter{ + baseQueueCapacityLimiter: *newBaseQueueCapacityLimiter(capacity), + } +} + +func (rcl *requestsCapacityLimiter) Claim(_ *intrequest.Request) bool { + if rcl.size.Load() >= uint32(rcl.capacity) { + return false + } + rcl.size.Add(1) + return true +} + +func (rcl *requestsCapacityLimiter) Release(_ *intrequest.Request) { + rcl.size.Add(^uint32(0)) +} + +// itemsCapacityLimiter is a capacity limiter that limits the queue based on the number of items (e.g. spans, log records). +type itemsCapacityLimiter struct { + baseQueueCapacityLimiter +} + +func newItemsCapacityLimiter(capacity int) *itemsCapacityLimiter { + return &itemsCapacityLimiter{ + baseQueueCapacityLimiter: *newBaseQueueCapacityLimiter(capacity), + } +} + +func (icl *itemsCapacityLimiter) Claim(item *intrequest.Request) bool { + if icl.size.Load() > uint32(icl.capacity-item.Count()) { + return false + } + icl.size.Add(uint32(item.Count())) + return true +} + +func (icl *itemsCapacityLimiter) Release(item *intrequest.Request) { + icl.size.Add(^uint32(item.Count() - 1)) +} + +var _ queueCapacityLimiter = (*itemsCapacityLimiter)(nil) diff --git a/exporter/exporterhelper/queue_capacity_test.go b/exporter/exporterhelper/queue_capacity_test.go new file mode 100644 index 00000000000..0c731fe8e34 --- /dev/null +++ b/exporter/exporterhelper/queue_capacity_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" +) + +func TestRequestsCapacityLimiter(t *testing.T) { + rl := newRequestsCapacityLimiter(2) + assert.Equal(t, 0, rl.Size()) + assert.Equal(t, 2, rl.Capacity()) + + req := intrequest.New(context.Background(), newMockRequest(5, nil)) + + assert.True(t, rl.Claim(req)) + assert.Equal(t, 1, rl.Size()) + + assert.True(t, rl.Claim(req)) + assert.Equal(t, 2, rl.Size()) + + assert.False(t, rl.Claim(req)) + assert.Equal(t, 2, rl.Size()) + + rl.Release(req) + assert.Equal(t, 1, rl.Size()) +} + +func TestItemsCapacityLimiter(t *testing.T) { + rl := newItemsCapacityLimiter(7) + assert.Equal(t, 0, rl.Size()) + assert.Equal(t, 7, rl.Capacity()) + + req := intrequest.New(context.Background(), newMockRequest(3, nil)) + + assert.True(t, rl.Claim(req)) + assert.Equal(t, 3, rl.Size()) + + assert.True(t, rl.Claim(req)) + assert.Equal(t, 6, rl.Size()) + + assert.False(t, rl.Claim(req)) + assert.Equal(t, 6, rl.Size()) + + rl.Release(req) + assert.Equal(t, 3, rl.Size()) +} diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 0b11db6c4ec..91168a4324b 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "go.opencensus.io/metric/metricdata" @@ -18,7 +19,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -73,32 +75,47 @@ func (qCfg *QueueSettings) Validate() error { } type queueSender struct { - baseRequestSender + nextSender *senderWrapper + numConsumers int fullName string - signal component.DataType - queue internal.Queue + queue queue.Queue traceAttribute attribute.KeyValue - logger *zap.Logger + telSettings component.TelemetrySettings requeuingEnabled bool + capacityLimiter queueCapacityLimiter + + stopWG sync.WaitGroup metricCapacity otelmetric.Int64ObservableGauge metricSize otelmetric.Int64ObservableGauge } -func newQueueSender(id component.ID, signal component.DataType, queue internal.Queue, logger *zap.Logger) *queueSender { +func newQueueSender(set exporter.CreateSettings, cfg queue.Config, signal component.DataType, queueFactory queue.Factory, + capacityLimiter queueCapacityLimiter, nextSender *senderWrapper) *queueSender { + var q queue.Queue + if cfg.Enabled { + q = queueFactory(queue.Settings{ + CreateSettings: set, + DataType: signal, + Capacity: cfg.QueueItemsSize, + }) + } return &queueSender{ - fullName: id.String(), - signal: signal, - queue: queue, - traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), - logger: logger, - // TODO: this can be further exposed as a config param rather than relying on a type of queue - requeuingEnabled: queue != nil && queue.IsPersistent(), + numConsumers: cfg.NumConsumers, + fullName: set.ID.String(), + queue: q, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + telSettings: set.TelemetrySettings, + // TODO: Move this to a configuration option. + requeuingEnabled: q != nil && q.IsPersistent(), + nextSender: nextSender, + capacityLimiter: capacityLimiter, + stopWG: sync.WaitGroup{}, } } -func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { - if !qs.requeuingEnabled || qs.queue == nil { +func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req *intrequest.Request, err error) error { + if !qs.requeuingEnabled { logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), @@ -107,7 +124,7 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque return err } - if qs.queue.Produce(req) { + if qs.queue.Produce(req) == nil { logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), @@ -123,25 +140,33 @@ func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Reque } // start is invoked during service startup. -func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { +func (qs *queueSender) start(ctx context.Context, host component.Host) error { if qs.queue == nil { return nil } - err := qs.queue.Start(ctx, host, internal.QueueSettings{ - CreateSettings: set, - DataType: qs.signal, - Callback: func(item internal.Request) { - _ = qs.nextSender.send(item) - item.OnProcessingFinished() - }, - }) - if err != nil { + if err := qs.queue.Start(ctx, host); err != nil { return err } + var startWG sync.WaitGroup + for i := 0; i < qs.numConsumers; i++ { + qs.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer qs.stopWG.Done() + for req := range qs.queue.Chan() { + _ = qs.nextSender.send(req) + qs.capacityLimiter.Release(req) + req.OnProcessingFinished() + } + }() + } + startWG.Wait() + if obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled() { - return qs.recordWithOtel(set.MeterProvider.Meter(scopeName)) + return qs.recordWithOtel(qs.telSettings.MeterProvider.Meter(scopeName)) } return qs.recordWithOC() } @@ -156,7 +181,7 @@ func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error { otelmetric.WithDescription("Current size of the retry queue (in batches)"), otelmetric.WithUnit("1"), otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(int64(qs.queue.Size()), attrs) + o.Observe(int64(qs.capacityLimiter.Size()), attrs) return nil }), ) @@ -167,7 +192,7 @@ func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error { otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"), otelmetric.WithUnit("1"), otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error { - o.Observe(int64(qs.queue.Capacity()), attrs) + o.Observe(int64(qs.capacityLimiter.Capacity()), attrs) return nil })) @@ -178,13 +203,13 @@ func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error { func (qs *queueSender) recordWithOC() error { // Start reporting queue length metric err := globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(qs.queue.Size()) + return int64(qs.capacityLimiter.Size()) }, metricdata.NewLabelValue(qs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qs.queue.Capacity()) + return int64(qs.capacityLimiter.Capacity()) }, metricdata.NewLabelValue(qs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) @@ -203,17 +228,20 @@ func (qs *queueSender) shutdown(ctx context.Context) error { // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - return qs.queue.Shutdown(ctx) + if err := qs.queue.Shutdown(ctx); err != nil { + return err + } + qs.stopWG.Wait() } return nil } // send implements the requestSender interface -func (qs *queueSender) send(req internal.Request) error { +func (qs *queueSender) send(req *intrequest.Request) error { if qs.queue == nil { err := qs.nextSender.send(req) if err != nil { - qs.logger.Error( + qs.telSettings.Logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", zap.Int("dropped_items", req.Count()), ) @@ -221,13 +249,9 @@ func (qs *queueSender) send(req internal.Request) error { return err } - // Prevent cancellation and deadline to propagate to the context stored in the queue. - // The grpc/http based receivers will cancel the request context after this function returns. - req.SetContext(noCancellationContext{Context: req.Context()}) - span := trace.SpanFromContext(req.Context()) - if !qs.queue.Produce(req) { - qs.logger.Error( + if !qs.capacityLimiter.Claim(req) { + qs.telSettings.Logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", zap.Int("dropped_items", req.Count()), ) @@ -235,6 +259,14 @@ func (qs *queueSender) send(req internal.Request) error { return errSendingQueueIsFull } + // Prevent cancellation and deadline to propagate to the context stored in the queue. + // The grpc/http based receivers will cancel the request context after this function returns. + req.SetContext(noCancellationContext{Context: req.Context()}) + + if err := qs.queue.Produce(req); err != nil { + return err + } + span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute)) return nil } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index c0d46db2dfc..2476c21f821 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -18,6 +18,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/internal/obsreportconfig" @@ -31,7 +34,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { rCfg := NewDefaultRetrySettings() be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) firstMockR := newErrorRequest(context.Background()) @@ -41,20 +44,20 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { }) // Enqueue another request to ensure when calling shutdown we drain the queue. - secondMockR := newMockRequest(context.Background(), 3, nil) + secondMockR := newMockRequest(3, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(secondMockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), secondMockR))) }) - require.LessOrEqual(t, 1, be.queueSender.(*queueSender).queue.Size()) + require.LessOrEqual(t, 1, be.queueSender.sender.(*queueSender).capacityLimiter.Size()) assert.NoError(t, be.Shutdown(context.Background())) secondMockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 3) ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) + require.Zero(t, be.queueSender.sender.(*queueSender).capacityLimiter.Size()) } func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { @@ -63,7 +66,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { rCfg := NewDefaultRetrySettings() be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -71,17 +74,17 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) cancelFunc() - mockR := newMockRequest(ctx, 2, nil) + mockR := newMockRequest(2, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(ctx, mockR))) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) + require.Zero(t, be.queueSender.sender.(*queueSender).capacityLimiter.Size()) } func TestQueuedRetry_DropOnFull(t *testing.T) { @@ -94,7 +97,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) - require.Error(t, be.send(newMockRequest(context.Background(), 2, nil))) + require.Error(t, be.send(intrequest.New(context.Background(), newMockRequest(2, nil)))) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -107,7 +110,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()} be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -117,9 +120,9 @@ func TestQueuedRetryHappyPath(t *testing.T) { reqs := make([]*mockRequest, 0, 10) for i := 0; i < wantRequests; i++ { ocs.run(func() { - req := newMockRequest(context.Background(), 2, nil) + req := newMockRequest(2, nil) reqs = append(reqs, req) - require.NoError(t, be.send(req)) + require.NoError(t, be.send(intrequest.New(context.Background(), req))) }) } @@ -232,18 +235,18 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - be.queueSender.(*queueSender).requeuingEnabled = true + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + be.queueSender.sender.(*queueSender).requeuingEnabled = true t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 1, traceErr) + mockR := newMockRequest(1, traceErr) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing }) ocs.awaitAsyncProcessing() @@ -263,18 +266,18 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.queueSender.(*queueSender).requeuingEnabled = true + be.queueSender.sender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 1, traceErr) + mockR := newMockRequest(1, traceErr) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) ocs.run(func() { - require.Error(t, be.retrySender.send(mockR), "sending_queue is full") + require.Error(t, be.retrySender.send(intrequest.New(context.Background(), mockR)), "sending_queue is full") }) mockR.checkNumRequests(t, 1) } @@ -283,14 +286,14 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) - require.Nil(t, be.queueSender.(*queueSender).queue) + require.Nil(t, be.queueSender.sender.(*queueSender).queue) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.Error(t, be.send(mockR)) + require.Error(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) @@ -346,7 +349,6 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { } func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { - produceCounter := &atomic.Uint32{} qCfg := NewDefaultQueueSettings() @@ -361,11 +363,11 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { require.NoError(t, be.Start(context.Background(), &mockHost{})) // wraps original queue so we can count operations - be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ - Queue: be.queueSender.(*queueSender).queue, + be.queueSender.sender.(*queueSender).queue = &producerConsumerQueueWithCounter{ + Queue: be.queueSender.sender.(*queueSender).queue, produceCounter: produceCounter, } - be.queueSender.(*queueSender).requeuingEnabled = true + be.queueSender.sender.(*queueSender).requeuingEnabled = true // Invoke queuedRetrySender so the producer will put the item for consumer to poll require.NoError(t, be.send(newErrorRequest(context.Background()))) @@ -382,6 +384,30 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { }, time.Second, 1*time.Millisecond) } +func TestPersistentQueueRetryPersistenceEnabledStorageError(t *testing.T) { + storageError := errors.New("could not get storage client") + tt, err := obsreporttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := queue.NewDefaultConfig() + storageID := component.NewIDWithName("file_storage", "storage") + pqCfg := persistentqueue.Config{StorageID: &storageID} + rCfg := NewDefaultRetrySettings() + set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, "", true, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), + WithRequestQueue(qCfg, persistentqueue.NewFactory(pqCfg, fakeRequestMarshaler, fakeRequestUnmarshaler))) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: internal.NewMockStorageExtension(storageError), + } + host := &mockHost{ext: extensions} + + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") +} + type mockHost struct { component.Host ext map[component.ID]component.Component diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go deleted file mode 100644 index ef05aa6395d..00000000000 --- a/exporter/exporterhelper/request.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" - -import ( - "context" - - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" -) - -// Request represents a single request that can be sent to an external endpoint. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Request interface { - // Export exports the request to an external endpoint. - Export(ctx context.Context) error -} - -// RequestItemsCounter is an optional interface that can be implemented by Request to provide a number of items -// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing -// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. -// If not implemented, collector's logs, metrics and traces will report 0 items. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type RequestItemsCounter interface { - // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be - // sent. For example, for OTLP exporter, this value represents the number of spans, - // metric data points or log records. - ItemsCount() int -} - -type request struct { - Request - baseRequest -} - -var _ internal.Request = (*request)(nil) - -func newRequest(ctx context.Context, req Request) *request { - return &request{ - Request: req, - baseRequest: baseRequest{ctx: ctx}, - } -} - -func (req *request) OnError(_ error) internal.Request { - // Potentially we could introduce a new RequestError type that would represent partially succeeded request. - // In that case we should consider returning them back to the pipeline converted back to pdata in case if - // sending queue is disabled. We leave it as a future improvement if decided that it's needed. - return req -} - -// Count returns a number of items in the request. If the request does not implement RequestItemsCounter -// then 0 is returned. -func (req *request) Count() int { - if counter, ok := req.Request.(RequestItemsCounter); ok { - return counter.ItemsCount() - } - return 0 -} diff --git a/exporter/exporterhelper/request/request.go b/exporter/exporterhelper/request/request.go new file mode 100644 index 00000000000..32499c9fbcc --- /dev/null +++ b/exporter/exporterhelper/request/request.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/exporter/exporterhelper/request" + +import ( + "context" +) + +// Request represents a single request that can be sent to an external endpoint. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Request interface { + // Export exports the request to an external endpoint. + Export(ctx context.Context) error +} + +// ItemsCounter is an optional interface that can be implemented by Request to provide a number of items +// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing +// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. +// If not implemented, collector's logs, metrics and traces will report 0 items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type ItemsCounter interface { + // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be + // sent. For example, for OTLP exporter, this value represents the number of spans, + // metric data points or log records. + ItemsCount() int +} + +// ErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial +// temporary failures. For example, if some items failed to process and can be retried, this interface allows to +// return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. +// If not implemented, the original Request will be returned assuming the error is applied to the whole Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type ErrorHandler interface { + // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. + // Otherwise, it should return the original Request. + OnError(error) Request +} + +// Marshaler is a function that can marshal a Request into bytes. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Marshaler func(req Request) ([]byte, error) + +// Unmarshaler is a function that can unmarshal bytes into a Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Unmarshaler func(data []byte) (Request, error) diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..460f7b6aa78 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -5,23 +5,26 @@ package exporterhelper import ( "context" + "encoding/json" + "errors" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) type fakeRequest struct { - items int - err error + Items int + Err error } func (r fakeRequest) Export(_ context.Context) error { - return r.err + return r.Err } func (r fakeRequest) ItemsCount() int { - return r.items + return r.Items } type fakeRequestConverter struct { @@ -31,14 +34,30 @@ type fakeRequestConverter struct { requestError error } -func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { - return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (request.Request, error) { + return fakeRequest{Items: md.DataPointCount(), Err: c.requestError}, c.metricsError } -func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { - return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (request.Request, error) { + return fakeRequest{Items: td.SpanCount(), Err: c.requestError}, c.tracesError } -func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { - return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (request.Request, error) { + return fakeRequest{Items: ld.LogRecordCount(), Err: c.requestError}, c.logsError +} + +func fakeRequestMarshaler(req request.Request) ([]byte, error) { + r, ok := req.(fakeRequest) + if !ok { + return nil, errors.New("invalid request type") + } + return json.Marshal(r) +} + +func fakeRequestUnmarshaler(bytes []byte) (request.Request, error) { + var r fakeRequest + if err := json.Unmarshal(bytes, &r); err != nil { + return nil, err + } + return r, nil } diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 51828ba1818..a071a8cd19f 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -16,7 +16,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -74,20 +74,21 @@ func NewThrottleRetry(err error, delay time.Duration) error { } } -type onRequestHandlingFinishedFunc func(*zap.Logger, internal.Request, error) error +type onRequestHandlingFinishedFunc func(*zap.Logger, *intrequest.Request, error) error type retrySender struct { - baseRequestSender traceAttribute attribute.KeyValue cfg RetrySettings stopCh chan struct{} logger *zap.Logger onTemporaryFailure onRequestHandlingFinishedFunc + nextSender *senderWrapper } -func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { +func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, + onTemporaryFailure onRequestHandlingFinishedFunc, nextSender *senderWrapper) *retrySender { if onTemporaryFailure == nil { - onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { + onTemporaryFailure = func(logger *zap.Logger, req *intrequest.Request, err error) error { return err } } @@ -97,16 +98,17 @@ func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onT stopCh: make(chan struct{}), logger: logger, onTemporaryFailure: onTemporaryFailure, + nextSender: nextSender, } } -func (rs *retrySender) shutdown(context.Context) error { +func (rs *retrySender) shutdown(context.Context) error { // nolint: unparam close(rs.stopCh) return nil } // send implements the requestSender interface -func (rs *retrySender) send(req internal.Request) error { +func (rs *retrySender) send(req *intrequest.Request) error { if !rs.cfg.Enabled { err := rs.nextSender.send(req) if err != nil { diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 72a21bc1311..ebc547ac5e6 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -21,28 +21,30 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/queue" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/testdata" ) -func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func mockRequestUnmarshaler(mr *mockRequest) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { return mr, nil } } -func mockRequestMarshaler(_ internal.Request) ([]byte, error) { +func mockRequestMarshaler(_ request.Request) ([]byte, error) { return nil, nil } func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() - mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) + mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -50,7 +52,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -64,19 +66,19 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.Enabled = false be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, - mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))), + mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error"))), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) - mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) + mockR := newMockRequest(2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -98,11 +100,11 @@ func TestQueuedRetry_OnError(t *testing.T) { }) traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(context.Background(), 2, traceErr) - ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, traceErr) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -120,7 +122,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg.MaxElapsedTime = 100 * time.Millisecond be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -131,11 +133,11 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { require.NoError(t, be.send(newErrorRequest(context.Background()))) }) - mockR := newMockRequest(context.Background(), 2, nil) + mockR := newMockRequest(2, nil) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -148,7 +150,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) + require.Zero(t, be.queueSender.sender.(*queueSender).capacityLimiter.Size()) } type wrappedError struct { @@ -166,18 +168,18 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { rCfg.InitialInterval = 10 * time.Millisecond be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) - mockR := newMockRequest(context.Background(), 2, wrappedError{retry}) + mockR := newMockRequest(2, wrappedError{retry}) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -187,7 +189,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) + require.Zero(t, be.queueSender.sender.(*queueSender).capacityLimiter.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { @@ -198,16 +200,16 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { rCfg.InitialInterval = 0 be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) - mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) + mockR := newMockRequest(2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.send(mockR)) + require.NoError(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() @@ -215,7 +217,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.queueSender.(*queueSender).queue.Size()) + require.Zero(t, be.queueSender.sender.(*queueSender).capacityLimiter.Size()) } func TestQueueRetryWithNoQueue(t *testing.T) { @@ -224,11 +226,11 @@ func TestQueueRetryWithNoQueue(t *testing.T) { be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs := be.obsrepSender.sender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.Error(t, be.send(mockR)) + require.Error(t, be.send(intrequest.New(context.Background(), mockR))) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) @@ -237,30 +239,21 @@ func TestQueueRetryWithNoQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } -type mockErrorRequest struct { - baseRequest -} +type mockErrorRequest struct{} func (mer *mockErrorRequest) Export(_ context.Context) error { return errors.New("transient error") } -func (mer *mockErrorRequest) OnError(error) internal.Request { - return mer -} - -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } -func newErrorRequest(ctx context.Context) internal.Request { - return &mockErrorRequest{ - baseRequest: baseRequest{ctx: ctx}, - } +func newErrorRequest(ctx context.Context) *intrequest.Request { + return intrequest.New(ctx, &mockErrorRequest{}) } type mockRequest struct { - baseRequest cnt int mu sync.Mutex consumeError error @@ -280,9 +273,8 @@ func (m *mockRequest) Export(ctx context.Context) error { return ctx.Err() } -func (m *mockRequest) OnError(error) internal.Request { +func (m *mockRequest) OnError(error) request.Request { return &mockRequest{ - baseRequest: m.baseRequest, cnt: 1, consumeError: nil, requestCount: m.requestCount, @@ -295,13 +287,12 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } -func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockRequest { +func newMockRequest(cnt int, consumeError error) *mockRequest { return &mockRequest{ - baseRequest: baseRequest{ctx: ctx}, cnt: cnt, consumeError: consumeError, requestCount: &atomic.Int64{}, @@ -309,21 +300,22 @@ func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockReque } type observabilityConsumerSender struct { - baseRequestSender + nextSender *senderWrapper waitGroup *sync.WaitGroup sentItemsCount *atomic.Int64 droppedItemsCount *atomic.Int64 } -func newObservabilityConsumerSender(_ *ObsReport) requestSender { +func newObservabilityConsumerSender(_ *ObsReport, nextSender *senderWrapper) requestSender { return &observabilityConsumerSender{ + nextSender: nextSender, waitGroup: new(sync.WaitGroup), droppedItemsCount: &atomic.Int64{}, sentItemsCount: &atomic.Int64{}, } } -func (ocs *observabilityConsumerSender) send(req internal.Request) error { +func (ocs *observabilityConsumerSender) send(req *intrequest.Request) error { err := ocs.nextSender.send(req) if err != nil { ocs.droppedItemsCount.Add(int64(req.Count())) @@ -396,11 +388,11 @@ func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []met } type producerConsumerQueueWithCounter struct { - internal.Queue + queue.Queue produceCounter *atomic.Uint32 } -func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool { +func (pcq *producerConsumerQueueWithCounter) Produce(req *intrequest.Request) error { pcq.produceCounter.Add(1) - return pcq.Queue.Produce(item) + return pcq.Queue.Produce(req) } diff --git a/exporter/exporterhelper/timeout_sender.go b/exporter/exporterhelper/timeout_sender.go index 11b85cf08be..f73ccea78ed 100644 --- a/exporter/exporterhelper/timeout_sender.go +++ b/exporter/exporterhelper/timeout_sender.go @@ -7,7 +7,7 @@ import ( "context" "time" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" ) // TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend. @@ -25,17 +25,16 @@ func NewDefaultTimeoutSettings() TimeoutSettings { // timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. type timeoutSender struct { - baseRequestSender cfg TimeoutSettings } -func (ts *timeoutSender) send(req internal.Request) error { +func (ts *timeoutSender) send(req *intrequest.Request) error { // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be // updated because this deadline most likely is before the next one. ctx := req.Context() if ts.cfg.Timeout > 0 { var cancelFunc func() - ctx, cancelFunc = context.WithTimeout(req.Context(), ts.cfg.Timeout) + ctx, cancelFunc = context.WithTimeout(ctx, ts.cfg.Timeout) defer cancelFunc() } return req.Export(ctx) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index bd9b6eb6ac8..7d27ff97ded 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -21,37 +22,35 @@ var tracesMarshaler = &ptrace.ProtoMarshaler{} var tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} type tracesRequest struct { - baseRequest td ptrace.Traces pusher consumer.ConsumeTracesFunc } -func newTracesRequest(ctx context.Context, td ptrace.Traces, pusher consumer.ConsumeTracesFunc) internal.Request { +func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) request.Request { return &tracesRequest{ - baseRequest: baseRequest{ctx: ctx}, - td: td, - pusher: pusher, + td: td, + pusher: pusher, } } -func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) request.Unmarshaler { + return func(bytes []byte) (request.Request, error) { traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) if err != nil { return nil, err } - return newTracesRequest(context.Background(), traces, pusher), nil + return newTracesRequest(traces, pusher), nil } } -func tracesRequestMarshaler(req internal.Request) ([]byte, error) { +func tracesRequestMarshaler(req request.Request) ([]byte, error) { return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) } -func (req *tracesRequest) OnError(err error) internal.Request { +func (req *tracesRequest) OnError(err error) request.Request { var traceError consumererror.Traces if errors.As(err, &traceError) { - return newTracesRequest(req.ctx, traceError.Data(), req.pusher) + return newTracesRequest(traceError.Data(), req.pusher) } return req } @@ -60,7 +59,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -96,7 +95,7 @@ func NewTracesExporter( } tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { - req := newTracesRequest(ctx, td, pusher) + req := intrequest.New(ctx, newTracesRequest(td, pusher)) serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeTraces, int64(req.Count())) @@ -115,7 +114,7 @@ func NewTracesExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type TracesConverter interface { // RequestFromTraces converts ptrace.Traces into a Request. - RequestFromTraces(context.Context, ptrace.Traces) (Request, error) + RequestFromTraces(context.Context, ptrace.Traces) (request.Request, error) } // NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. @@ -148,7 +147,7 @@ func NewTracesRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := intrequest.New(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeTraces, int64(r.Count())) @@ -163,15 +162,15 @@ func NewTracesRequestExporter( } type tracesExporterWithObservability struct { - baseRequestSender - obsrep *ObsReport + obsrep *ObsReport + nextSender *senderWrapper } -func newTracesExporterWithObservability(obsrep *ObsReport) requestSender { - return &tracesExporterWithObservability{obsrep: obsrep} +func newTracesExporterWithObservability(obsrep *ObsReport, nextSender *senderWrapper) requestSender { + return &tracesExporterWithObservability{obsrep: obsrep, nextSender: nextSender} } -func (tewo *tracesExporterWithObservability) send(req internal.Request) error { +func (tewo *tracesExporterWithObservability) send(req *intrequest.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req) diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 1daf4e8e3c8..8cbf98e0eb4 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" @@ -42,10 +43,12 @@ var ( ) func TestTracesRequest(t *testing.T) { - mr := newTracesRequest(context.Background(), testdata.GenerateTraces(1), nil) + ctx := context.Background() + mr := intrequest.New(ctx, newTracesRequest(testdata.GenerateTraces(1), nil)) traceErr := consumererror.NewTraces(errors.New("some error"), ptrace.NewTraces()) - assert.EqualValues(t, newTracesRequest(context.Background(), ptrace.NewTraces(), nil), mr.OnError(traceErr)) + assert.EqualValues(t, intrequest.New(ctx, newTracesRequest(ptrace.NewTraces(), nil)), + mr.OnError(traceErr)) } func TestTracesExporter_InvalidName(t *testing.T) {