diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index 85435d2aa61..c9dad90191e 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -16,8 +16,8 @@ import ( // the producer are dropped. type boundedMemoryQueue[T any] struct { component.StartFunc - *queueCapacityLimiter[T] - items chan queueRequest[T] + *sizedElementsChannel[memQueueEl[T]] + sizer Sizer[T] } // MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. @@ -30,29 +30,24 @@ type MemoryQueueSettings[T any] struct { // callback for dropped items (e.g. useful to emit metrics). func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), - items: make(chan queueRequest[T], set.Capacity), + sizedElementsChannel: newSizedElementsChannel[memQueueEl[T]](set.Capacity), + sizer: set.Sizer, } } // Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { - if !q.queueCapacityLimiter.claim(req) { - return ErrQueueIsFull - } - q.items <- queueRequest[T]{ctx: ctx, req: req} - return nil + return q.sizedElementsChannel.enqueue(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.SizeOf(req), nil) } // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := <-q.items + item, ok := q.sizedElementsChannel.dequeue(func(el memQueueEl[T]) uint64 { return q.sizer.SizeOf(el.req) }) if !ok { return false } - q.queueCapacityLimiter.release(item.req) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true @@ -60,11 +55,11 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { - close(q.items) + q.sizedElementsChannel.shutdown() return nil } -type queueRequest[T any] struct { +type memQueueEl[T any] struct { req T ctx context.Context } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index e3431a3eac6..0ea3f1ae8f3 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -179,3 +179,11 @@ func TestZeroSizeNoConsumers(t *testing.T) { assert.NoError(t, q.Shutdown(context.Background())) } + +type fakeReq struct { + itemsCount int +} + +func (r fakeReq) ItemsCount() int { + return r.itemsCount +} diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 0a6629cb87e..18236f1ed14 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -10,7 +10,6 @@ import ( "fmt" "strconv" "sync" - "sync/atomic" "go.uber.org/multierr" "go.uber.org/zap" @@ -43,7 +42,10 @@ import ( // index index x // xxxx deleted type persistentQueue[T any] struct { - *queueCapacityLimiter[T] + // sizedElementsChannel is used by the persistent queue for two purposes: + // 1. a communication channel notifying the consumer that a new item is available. + // 2. capacity control based on the size of the items. + *sizedElementsChannel[permanentQueueEl] set PersistentQueueSettings[T] logger *zap.Logger @@ -52,14 +54,10 @@ type persistentQueue[T any] struct { // isRequestSized indicates whether the queue is sized by the number of requests. isRequestSized bool - putChan chan struct{} - // mu guards everything declared below. mu sync.Mutex readIndex uint64 writeIndex uint64 - initIndexSize uint64 - initQueueSize *atomic.Uint64 currentlyDispatchedItems []uint64 refClient int64 stopped bool @@ -97,12 +95,10 @@ type PersistentQueueSettings[T any] struct { func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] { _, isRequestSized := set.Sizer.(*RequestSizer[T]) return &persistentQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), + sizedElementsChannel: newSizedElementsChannel[permanentQueueEl](set.Capacity), set: set, logger: set.ExporterSettings.Logger, - initQueueSize: &atomic.Uint64{}, isRequestSized: isRequestSized, - putChan: make(chan struct{}, set.Capacity), } } @@ -147,39 +143,46 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex pq.readIndex = 0 pq.writeIndex = 0 } - pq.initIndexSize = pq.writeIndex - pq.readIndex - // Ensure the communication channel has the same size as the queue - for i := 0; i < int(pq.initIndexSize); i++ { - pq.putChan <- struct{}{} - } + initIndexSize := pq.writeIndex - pq.readIndex - // Read snapshot of the queue size from storage. It's not a problem if the value cannot be fetched, - // or it's not accurate. The queue size will be corrected once the recovered queue is drained. - if pq.initIndexSize > 0 { + // Pre-allocate the communication channel with the size of the restored queue. + if initIndexSize > 0 { + initQueueSize := initIndexSize // If the queue is sized by the number of requests, no need to read the queue size from storage. - if pq.isRequestSized { - pq.initQueueSize.Store(pq.initIndexSize) - return + if !pq.isRequestSized { + if restoredQueueSize, err := pq.restoreQueueSizeFromStorage(ctx); err == nil { + initQueueSize = restoredQueueSize + } } - res, err := pq.client.Get(ctx, queueSizeKey) - if err == nil { - var restoredQueueSize uint64 - restoredQueueSize, err = bytesToItemIndex(res) - pq.initQueueSize.Store(restoredQueueSize) - } - if err != nil { - if errors.Is(err, errValueNotSet) { - pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained. "+ - "It's expected when the items sized queue enabled for the first time", zap.Error(err)) - } else { - pq.logger.Error("Failed to read the queue size snapshot from storage. "+ - "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) - } + // Ensure the communication channel filled with evenly sized elements up to the total restored queue size. + pq.sizedElementsChannel.initBulkEnqueue(make([]permanentQueueEl, initIndexSize), initQueueSize) + } +} + +// permanentQueueEl is the type of the elements passed to the sizedElementsChannel by the persistentQueue. +type permanentQueueEl struct{} + +// restoreQueueSizeFromStorage restores the queue size from storage. +func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) (uint64, error) { + var queueSize uint64 + val, err := pq.client.Get(ctx, queueSizeKey) + if err == nil { + queueSize, err = bytesToItemIndex(val) + } + if err != nil { + if errors.Is(err, errValueNotSet) { + pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained. "+ + "It's expected when the items sized queue enabled for the first time", zap.Error(err)) + } else { + pq.logger.Error("Failed to read the queue size snapshot from storage. "+ + "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) } + return 0, err } + return queueSize, nil } // Consume applies the provided function on the head of queue. @@ -187,14 +190,24 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := <-pq.putChan + _, ok := pq.sizedElementsChannel.dequeue(func(permanentQueueEl) uint64 { + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if !consumed { + return 0 + } + return pq.set.Sizer.SizeOf(req) + }) if !ok { return false } - - req, onProcessingFinished, consumed := pq.getNextItem(context.Background()) if consumed { onProcessingFinished(consumeFunc(context.Background(), req)) return true @@ -202,31 +215,24 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error } } -// Size returns the current size of the queue. -func (pq *persistentQueue[T]) Size() int { - return int(pq.initQueueSize.Load()) + pq.queueCapacityLimiter.Size() -} - func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { - close(pq.putChan) + // If the queue is not initialized, there is nothing to shut down. + if pq.client == nil { + return nil + } + pq.mu.Lock() defer pq.mu.Unlock() + backupErr := pq.backupQueueSize(ctx) + pq.sizedElementsChannel.shutdown() // Mark this queue as stopped, so consumer don't start any more work. pq.stopped = true - return multierr.Combine( - pq.backupQueueSize(ctx), - pq.unrefClient(ctx), - ) + return multierr.Combine(backupErr, pq.unrefClient(ctx)) } // backupQueueSize writes the current queue size to storage. The value is used to recover the queue size // in case if the collector is killed. func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error { - // Client can be nil if the queue is not initialized yet. - if pq.client == nil { - return nil - } - // No need to write the queue size if the queue is sized by the number of requests. // That information is already stored as difference between read and write indexes. if pq.isRequestSized { @@ -257,34 +263,31 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { - if !pq.queueCapacityLimiter.claim(req) { - pq.logger.Warn("Maximum queue capacity reached") - return ErrQueueIsFull - } + err := pq.sizedElementsChannel.enqueue(permanentQueueEl{}, pq.set.Sizer.SizeOf(req), func() error { + itemKey := getItemKey(pq.writeIndex) + newIndex := pq.writeIndex + 1 - itemKey := getItemKey(pq.writeIndex) - newIndex := pq.writeIndex + 1 + reqBuf, err := pq.set.Marshaler(req) + if err != nil { + return err + } - reqBuf, err := pq.set.Marshaler(req) + // Carry out a transaction where we both add the item and update the write index + ops := []storage.Operation{ + storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), + storage.SetOperation(itemKey, reqBuf), + } + if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { + return storageErr + } + + pq.writeIndex = newIndex + return nil + }) if err != nil { - pq.queueCapacityLimiter.release(req) return err } - // Carry out a transaction where we both add the item and update the write index - ops := []storage.Operation{ - storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), - storage.SetOperation(itemKey, reqBuf), - } - if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { - pq.queueCapacityLimiter.release(req) - return storageErr - } - - pq.writeIndex = newIndex - // Inform the loop that there's some data to process - pq.putChan <- struct{}{} - // Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 5 { @@ -336,8 +339,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), return request, nil, false } - pq.releaseCapacity(request) - // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 0 { @@ -373,28 +374,6 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), }, true } -// releaseCapacity releases the capacity of the queue. The caller must hold the mutex. -func (pq *persistentQueue[T]) releaseCapacity(req T) { - // If the recovered queue size is not emptied yet, decrease it first. - if pq.initIndexSize > 0 { - pq.initIndexSize-- - if pq.initIndexSize == 0 { - pq.initQueueSize.Store(0) - return - } - reqSize := pq.queueCapacityLimiter.sizeOf(req) - if pq.initQueueSize.Load() < reqSize { - pq.initQueueSize.Store(0) - return - } - pq.initQueueSize.Add(^(reqSize - 1)) - return - } - - // Otherwise, decrease the current queue size. - pq.queueCapacityLimiter.release(req) -} - // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage // and moves the items at the back of the queue. func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) { diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 10934c5be77..d058950e296 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -517,8 +517,6 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.NoError(t, err) } - // get one item out, but don't mark it as processed - <-ps.putChan require.True(t, ps.Consume(func(ctx context.Context, traces tracesRequest) error { // put one more item in require.NoError(t, ps.Offer(context.Background(), req)) @@ -845,27 +843,29 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { newPQ := createTestPersistentQueueWithItemsCapacity(t, ext, 100) - // The queue items size cannot be restored to the previous size. Falls back to 0. - assert.Equal(t, 0, newPQ.Size()) + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 2, newPQ.Size()) assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) - // Only new items are reflected - assert.Equal(t, 10, newPQ.Size()) + // Only new items are correctly reflected + assert.Equal(t, 12, newPQ.Size()) - // Consuming old items should does not affect the size. + // Consuming a restored request should reduce the restored size by 2 - maximum size of the restored items. assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { assert.Equal(t, 20, traces.traces.SpanCount()) return nil })) assert.Equal(t, 10, newPQ.Size()) + // Consuming another restored request should not affect the restored size since it's already dropped to 0. assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { assert.Equal(t, 25, traces.traces.SpanCount()) return nil })) assert.Equal(t, 10, newPQ.Size()) + // Once the restored queue size is dropped to 0, consuming requests should be correctly reflected in the queue size. assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { assert.Equal(t, 10, traces.traces.SpanCount()) return nil @@ -875,6 +875,60 @@ func TestPersistentQueue_ItemsCapacityUsageIsNotPreserved(t *testing.T) { assert.NoError(t, newPQ.Shutdown(context.Background())) } +// This test covers the case when the queue is restarted with the less capacity than needed to restore the queued items. +// In that case, the queue has to be restored anyway even if it exceeds the capacity limit. +func TestPersistentQueue_RequestCapacityLessAfterRestart(t *testing.T) { + ext := NewMockStorageExtension(nil) + pq := createTestPersistentQueueWithRequestsCapacity(t, ext, 100) + + assert.Equal(t, 0, pq.Size()) + + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(4, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(2, 10))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(5, 5))) + assert.NoError(t, pq.Offer(context.Background(), newTracesRequest(1, 5))) + + // Read the first request just to populate the read index in the storage. + // Otherwise, the write index won't be restored either. + assert.True(t, pq.Consume(func(ctx context.Context, traces tracesRequest) error { + assert.Equal(t, 40, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 3, pq.Size()) + + assert.NoError(t, pq.Shutdown(context.Background())) + + // The queue is restarted with the less capacity than needed to restore the queued items, but with the same + // underlying storage. No need to drop requests that are over capacity since they are already in the storage. + newPQ := createTestPersistentQueueWithRequestsCapacity(t, ext, 2) + + // The queue items size cannot be restored, fall back to request-based size + assert.Equal(t, 3, newPQ.Size()) + + // Queue is full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { + assert.Equal(t, 20, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 2, newPQ.Size()) + + // Still full + assert.Error(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.True(t, newPQ.Consume(func(ctx context.Context, traces tracesRequest) error { + assert.Equal(t, 25, traces.traces.SpanCount()) + return nil + })) + assert.Equal(t, 1, newPQ.Size()) + + // Now it can accept new items + assert.NoError(t, newPQ.Offer(context.Background(), newTracesRequest(2, 5))) + + assert.NoError(t, newPQ.Shutdown(context.Background())) +} + func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[tracesRequest], compare []uint64) { pq.mu.Lock() defer pq.mu.Unlock() diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go index 8bd8879a940..807416bb182 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/exporterhelper/internal/queue.go @@ -34,3 +34,26 @@ type Queue[T any] interface { // Capacity returns the capacity of the queue. Capacity() int } + +type itemsCounter interface { + ItemsCount() int +} + +// Sizer is an interface that returns the size of the given element. +type Sizer[T any] interface { + SizeOf(T) uint64 +} + +// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. +type ItemsSizer[T itemsCounter] struct{} + +func (is *ItemsSizer[T]) SizeOf(el T) uint64 { + return uint64(el.ItemsCount()) +} + +// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. +type RequestSizer[T any] struct{} + +func (rs *RequestSizer[T]) SizeOf(T) uint64 { + return 1 +} diff --git a/exporter/exporterhelper/internal/queue_capacity.go b/exporter/exporterhelper/internal/queue_capacity.go deleted file mode 100644 index 0466de0d8b2..00000000000 --- a/exporter/exporterhelper/internal/queue_capacity.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - -import ( - "sync/atomic" -) - -type itemsCounter interface { - ItemsCount() int -} - -// Sizer is an interface that returns the size of the given element. -type Sizer[T any] interface { - SizeOf(T) uint64 -} - -// ItemsSizer is a Sizer implementation that returns the size of a queue element as the number of items it contains. -type ItemsSizer[T itemsCounter] struct{} - -func (is *ItemsSizer[T]) SizeOf(el T) uint64 { - return uint64(el.ItemsCount()) -} - -// RequestSizer is a Sizer implementation that returns the size of a queue element as one request. -type RequestSizer[T any] struct{} - -func (rs *RequestSizer[T]) SizeOf(T) uint64 { - return 1 -} - -type queueCapacityLimiter[T any] struct { - used *atomic.Uint64 - cap uint64 - sz Sizer[T] -} - -func (bcl queueCapacityLimiter[T]) Capacity() int { - return int(bcl.cap) -} - -func (bcl queueCapacityLimiter[T]) Size() int { - return int(bcl.used.Load()) -} - -func (bcl queueCapacityLimiter[T]) claim(el T) bool { - size := bcl.sizeOf(el) - if bcl.used.Add(size) > bcl.cap { - bcl.releaseSize(size) - return false - } - return true -} - -func (bcl queueCapacityLimiter[T]) release(el T) { - bcl.releaseSize(bcl.sizeOf(el)) -} - -func (bcl queueCapacityLimiter[T]) releaseSize(size uint64) { - bcl.used.Add(^(size - 1)) -} - -func (bcl queueCapacityLimiter[T]) sizeOf(el T) uint64 { - return bcl.sz.SizeOf(el) -} - -func newQueueCapacityLimiter[T any](sizer Sizer[T], capacity int) *queueCapacityLimiter[T] { - return &queueCapacityLimiter[T]{ - used: &atomic.Uint64{}, - cap: uint64(capacity), - sz: sizer, - } -} diff --git a/exporter/exporterhelper/internal/queue_capacity_test.go b/exporter/exporterhelper/internal/queue_capacity_test.go deleted file mode 100644 index 7a3b3ad41f2..00000000000 --- a/exporter/exporterhelper/internal/queue_capacity_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRequestsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&RequestSizer[fakeReq]{}, 2) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 2, rl.Capacity()) - - req := fakeReq{itemsCount: 5} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 1, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 2, rl.Size()) - - rl.release(req) - assert.Equal(t, 1, rl.Size()) -} - -func TestItemsCapacityLimiter(t *testing.T) { - rl := newQueueCapacityLimiter[fakeReq](&ItemsSizer[fakeReq]{}, 7) - assert.Equal(t, 0, rl.Size()) - assert.Equal(t, 7, rl.Capacity()) - - req := fakeReq{itemsCount: 3} - - assert.True(t, rl.claim(req)) - assert.Equal(t, 3, rl.Size()) - - assert.True(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - assert.False(t, rl.claim(req)) - assert.Equal(t, 6, rl.Size()) - - rl.release(req) - assert.Equal(t, 3, rl.Size()) -} - -type fakeReq struct { - itemsCount int -} - -func (r fakeReq) ItemsCount() int { - return r.itemsCount -} diff --git a/exporter/exporterhelper/internal/sized_elements_channel.go b/exporter/exporterhelper/internal/sized_elements_channel.go new file mode 100644 index 00000000000..1e5abbd1f6c --- /dev/null +++ b/exporter/exporterhelper/internal/sized_elements_channel.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import "sync/atomic" + +// sizedElementsChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. +// The channel will accept elements until the total size of the elements reaches the capacity. +type sizedElementsChannel[T any] struct { + used *atomic.Uint64 + cap uint64 + ch chan T + + // The following fields are used by the persistent queue to initialize the queue with the elements recovered from + // the disk. Synchronization is not required because the persistent queue doesn't access it concurrently. + // dequeue() will decreases the initElementsCount and initTotalSize before the used field. + initTotalSize uint64 + initElementsCount uint64 +} + +// newSizedElementsChannel creates a sized elements channel. Each element is assigned a size by the provided sizer. +// chanCapacity is the capacity of the underlying channel which usually should be equal to the capacity of the queue to +// avoid blocking the producer. +func newSizedElementsChannel[T any](capacity int) *sizedElementsChannel[T] { + return &sizedElementsChannel[T]{ + used: &atomic.Uint64{}, + cap: uint64(capacity), + ch: make(chan T, capacity), + } +} + +// enqueue puts the element into the queue with the given sized if there is enough capacity. +// Returns an error if the queue is full. The callback is called before the element is committed to the queue. +// If the callback returns an error, the element is not put into the queue and the error is returned. +func (vcq *sizedElementsChannel[T]) enqueue(el T, size uint64, callback func() error) error { + if vcq.initTotalSize+vcq.used.Add(size) > vcq.cap { + vcq.used.Add(^(size - 1)) + return ErrQueueIsFull + } + if callback != nil { + if err := callback(); err != nil { + vcq.used.Add(^(size - 1)) + return err + } + } + vcq.ch <- el + return nil +} + +// initBulkEnqueue puts the elements into the queue with the given size. It's used by the persistent queue to +// initialize the queue with the elements recovered from the disk. +func (vcq *sizedElementsChannel[T]) initBulkEnqueue(els []T, totalSize uint64) { + vcq.initElementsCount = uint64(len(els)) + vcq.initTotalSize = totalSize + if cap(vcq.ch) < len(els) { + vcq.ch = make(chan T, len(els)) + } + for _, el := range els { + vcq.ch <- el + } +} + +// dequeue removes the element from the queue and returns it. +// The call blocks until there is an item available or the queue is stopped. +// The function returns true when an item is consumed or false if the queue is stopped and emptied. +// The callback is called before the element is removed from the queue. It must return the size of the element. +func (vcq *sizedElementsChannel[T]) dequeue(callback func(T) (size uint64)) (T, bool) { + el, ok := <-vcq.ch + if !ok { + return el, false + } + + size := callback(el) + + if vcq.initElementsCount > 0 { + vcq.initElementsCount-- + if vcq.initElementsCount == 0 || vcq.initTotalSize < size { + vcq.initTotalSize = 0 + } else { + vcq.initTotalSize -= size + } + return el, true + } + + vcq.used.Add(^(size - 1)) + return el, true +} + +// shutdown closes the queue channel to initiate draining of the queue. +func (vcq *sizedElementsChannel[T]) shutdown() { + close(vcq.ch) +} + +func (vcq *sizedElementsChannel[T]) Size() int { + return int(vcq.initTotalSize + vcq.used.Load()) +} + +func (vcq *sizedElementsChannel[T]) Capacity() int { + return int(vcq.cap) +} diff --git a/exporter/exporterhelper/internal/sized_elements_channel_test.go b/exporter/exporterhelper/internal/sized_elements_channel_test.go new file mode 100644 index 00000000000..0862f77411b --- /dev/null +++ b/exporter/exporterhelper/internal/sized_elements_channel_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestVariedCapacityChannel(t *testing.T) { + q := newSizedElementsChannel[int](7) + assert.NoError(t, q.enqueue(1, 1, nil)) + assert.Equal(t, 1, q.Size()) + assert.Equal(t, 7, q.Capacity()) + + // failed callback should not allow the element to be added + assert.Error(t, q.enqueue(2, 2, func() error { return errors.New("failed") })) + assert.Equal(t, 1, q.Size()) + + assert.NoError(t, q.enqueue(3, 3, nil)) + assert.Equal(t, 4, q.Size()) + + // should not be able to send to the full queue + assert.Error(t, q.enqueue(4, 4, nil)) + assert.Equal(t, 4, q.Size()) + + el, ok := q.dequeue(func(el int) uint64 { return uint64(el) }) + assert.Equal(t, 1, el) + assert.True(t, ok) + assert.Equal(t, 3, q.Size()) + + el, ok = q.dequeue(func(el int) uint64 { return uint64(el) }) + assert.Equal(t, 3, el) + assert.True(t, ok) + assert.Equal(t, 0, q.Size()) + + q.shutdown() + el, ok = q.dequeue(func(el int) uint64 { return uint64(el) }) + assert.False(t, ok) + assert.Equal(t, 0, el) +}