From 7f7b8265c0099a6a449c320681ab16dde5011298 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Tue, 2 Jan 2024 09:51:19 -0800 Subject: [PATCH] [chore] [exporterhelper] Move queue creation parameters to structs (#9197) Address https://github.com/open-telemetry/opentelemetry-collector/pull/8853#discussion_r1438362242 --- .../internal/bounded_memory_queue.go | 12 ++- .../internal/bounded_memory_queue_test.go | 8 +- .../internal/persistent_queue.go | 85 ++++++++++--------- .../internal/persistent_queue_test.go | 36 ++++++-- exporter/exporterhelper/queue_sender.go | 16 +++- 5 files changed, 98 insertions(+), 59 deletions(-) diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index 0390495c510..85435d2aa61 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -20,12 +20,18 @@ type boundedMemoryQueue[T any] struct { items chan queueRequest[T] } +// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation. +type MemoryQueueSettings[T any] struct { + Sizer Sizer[T] + Capacity int +} + // NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional // callback for dropped items (e.g. useful to emit metrics). -func NewBoundedMemoryQueue[T any](sizer Sizer[T], capacity int) Queue[T] { +func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity), - items: make(chan queueRequest[T], capacity), + queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), + items: make(chan queueRequest[T], set.Capacity), } } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 436854dcfa4..e3431a3eac6 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -22,7 +22,7 @@ import ( // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. func TestBoundedQueue(t *testing.T) { - q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1) + q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1}) assert.NoError(t, q.Offer(context.Background(), "a")) @@ -72,7 +72,7 @@ func TestBoundedQueue(t *testing.T) { // only after Stop will mean the consumers are still locked while // trying to perform the final consumptions. func TestShutdownWhileNotEmpty(t *testing.T) { - q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1000) + q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1000}) assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) for i := 0; i < 10; i++ { @@ -153,7 +153,7 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numCo func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) { var wg sync.WaitGroup wg.Add(numberOfItems) - q := NewBoundedMemoryQueue[fakeReq](sizer, capacity) + q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: capacity}) consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error { wg.Done() return nil @@ -170,7 +170,7 @@ func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers } func TestZeroSizeNoConsumers(t *testing.T) { - q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 0) + q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 0}) err := q.Start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index f058a9177da..0a6629cb87e 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -45,12 +45,9 @@ import ( type persistentQueue[T any] struct { *queueCapacityLimiter[T] - set exporter.CreateSettings - storageID component.ID - dataType component.DataType - client storage.Client - unmarshaler func(data []byte) (T, error) - marshaler func(req T) ([]byte, error) + set PersistentQueueSettings[T] + logger *zap.Logger + client storage.Client // isRequestSized indicates whether the queue is sized by the number of requests. isRequestSized bool @@ -86,26 +83,32 @@ var ( errWrongExtensionType = errors.New("requested extension is not a storage extension") ) +type PersistentQueueSettings[T any] struct { + Sizer Sizer[T] + Capacity int + DataType component.DataType + StorageID component.ID + Marshaler func(req T) ([]byte, error) + Unmarshaler func([]byte) (T, error) + ExporterSettings exporter.CreateSettings +} + // NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage -func NewPersistentQueue[T any](sizer Sizer[T], capacity int, dataType component.DataType, storageID component.ID, - marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] { - _, isRequestSized := sizer.(*RequestSizer[T]) +func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] { + _, isRequestSized := set.Sizer.(*RequestSizer[T]) return &persistentQueue[T]{ - queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity), + queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity), set: set, - storageID: storageID, - dataType: dataType, - unmarshaler: unmarshaler, - marshaler: marshaler, + logger: set.ExporterSettings.Logger, initQueueSize: &atomic.Uint64{}, isRequestSized: isRequestSized, - putChan: make(chan struct{}, capacity), + putChan: make(chan struct{}, set.Capacity), } } // Start starts the persistentQueue with the given number of consumers. func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error { - storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, pq.dataType) + storageClient, err := toStorageClient(ctx, pq.set.StorageID, host, pq.set.ExporterSettings.ID, pq.set.DataType) if err != nil { return err } @@ -137,9 +140,9 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex if err != nil { if errors.Is(err, errValueNotSet) { - pq.set.Logger.Info("Initializing new persistent queue") + pq.logger.Info("Initializing new persistent queue") } else { - pq.set.Logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err)) + pq.logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err)) } pq.readIndex = 0 pq.writeIndex = 0 @@ -168,11 +171,11 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex } if err != nil { if errors.Is(err, errValueNotSet) { - pq.set.Logger.Warn("Cannot read the queue size snapshot from storage. "+ + pq.logger.Warn("Cannot read the queue size snapshot from storage. "+ "The reported queue size will be inaccurate until the initial queue is drained. "+ "It's expected when the items sized queue enabled for the first time", zap.Error(err)) } else { - pq.set.Logger.Error("Failed to read the queue size snapshot from storage. "+ + pq.logger.Error("Failed to read the queue size snapshot from storage. "+ "The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err)) } } @@ -255,14 +258,14 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error { // putInternal is the internal version that requires caller to hold the mutex lock. func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { if !pq.queueCapacityLimiter.claim(req) { - pq.set.Logger.Warn("Maximum queue capacity reached") + pq.logger.Warn("Maximum queue capacity reached") return ErrQueueIsFull } itemKey := getItemKey(pq.writeIndex) newIndex := pq.writeIndex + 1 - reqBuf, err := pq.marshaler(req) + reqBuf, err := pq.set.Marshaler(req) if err != nil { pq.queueCapacityLimiter.release(req) return err @@ -286,7 +289,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 5 { if err := pq.backupQueueSize(ctx); err != nil { - pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err)) + pq.logger.Error("Error writing queue size to storage", zap.Error(err)) } } @@ -320,14 +323,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), getOp) if err == nil { - request, err = pq.unmarshaler(getOp.Value) + request, err = pq.set.Unmarshaler(getOp.Value) } if err != nil { - pq.set.Logger.Debug("Failed to dispatch item", zap.Error(err)) + pq.logger.Debug("Failed to dispatch item", zap.Error(err)) // We need to make sure that currently dispatched items list is cleaned if err = pq.itemDispatchingFinish(ctx, index); err != nil { - pq.set.Logger.Error("Error deleting item from queue", zap.Error(err)) + pq.logger.Error("Error deleting item from queue", zap.Error(err)) } return request, nil, false @@ -339,7 +342,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. if (pq.writeIndex % 10) == 0 { if qsErr := pq.backupQueueSize(ctx); qsErr != nil { - pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err)) + pq.logger.Error("Error writing queue size to storage", zap.Error(err)) } } @@ -352,7 +355,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // Always unref client even if the consumer is shutdown because we always ref it for every valid request. defer func() { if err = pq.unrefClient(ctx); err != nil { - pq.set.Logger.Error("Error closing the storage client", zap.Error(err)) + pq.logger.Error("Error closing the storage client", zap.Error(err)) } pq.mu.Unlock() }() @@ -364,7 +367,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), } if err = pq.itemDispatchingFinish(ctx, index); err != nil { - pq.set.Logger.Error("Error deleting item from queue", zap.Error(err)) + pq.logger.Error("Error deleting item from queue", zap.Error(err)) } }, true @@ -399,22 +402,22 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co pq.mu.Lock() defer pq.mu.Unlock() - pq.set.Logger.Debug("Checking if there are items left for dispatch by consumers") + pq.logger.Debug("Checking if there are items left for dispatch by consumers") itemKeysBuf, err := pq.client.Get(ctx, currentlyDispatchedItemsKey) if err == nil { dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf) } if err != nil { - pq.set.Logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err)) + pq.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err)) return } if len(dispatchedItems) == 0 { - pq.set.Logger.Debug("No items left for dispatch by consumers") + pq.logger.Debug("No items left for dispatch by consumers") return } - pq.set.Logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, + pq.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems, len(dispatchedItems))) retrieveBatch := make([]storage.Operation, len(dispatchedItems)) cleanupBatch := make([]storage.Operation, len(dispatchedItems)) @@ -427,24 +430,24 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co cleanupErr := pq.client.Batch(ctx, cleanupBatch...) if cleanupErr != nil { - pq.set.Logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr)) + pq.logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr)) } if retrieveErr != nil { - pq.set.Logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr)) + pq.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr)) return } errCount := 0 for _, op := range retrieveBatch { if op.Value == nil { - pq.set.Logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) + pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet)) continue } - req, err := pq.unmarshaler(op.Value) + req, err := pq.set.Unmarshaler(op.Value) // If error happened or item is nil, it will be efficiently ignored if err != nil { - pq.set.Logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) + pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err)) continue } if pq.putInternal(ctx, req) != nil { @@ -453,10 +456,10 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co } if errCount > 0 { - pq.set.Logger.Error("Errors occurred while moving items for dispatching back to queue", + pq.logger.Error("Errors occurred while moving items for dispatching back to queue", zap.Int(zapNumberOfItems, len(retrieveBatch)), zap.Int(zapErrorCount, errCount)) } else { - pq.set.Logger.Info("Moved items for dispatching back to queue", + pq.logger.Info("Moved items for dispatching back to queue", zap.Int(zapNumberOfItems, len(retrieveBatch))) } } @@ -476,7 +479,7 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u deleteOp := storage.DeleteOperation(getItemKey(index)) if err := pq.client.Batch(ctx, setOp, deleteOp); err != nil { // got an error, try to gracefully handle it - pq.set.Logger.Warn("Failed updating currently dispatched items, trying to delete the item first", + pq.logger.Warn("Failed updating currently dispatched items, trying to delete the item first", zap.Error(err)) } else { // Everything ok, exit diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 1b4fccdeac7..10934c5be77 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -55,8 +55,15 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { // createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers. func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int, numConsumers int, consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](sizer, capacity, component.DataTypeTraces, component.ID{}, - marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()) + pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ + Sizer: sizer, + Capacity: capacity, + DataType: component.DataTypeTraces, + StorageID: component.ID{}, + Marshaler: marshalTracesRequest, + Unmarshaler: unmarshalTracesRequest, + ExporterSettings: exportertest.NewNopCreateSettings(), + }) host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} @@ -69,8 +76,15 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], } func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](&RequestSizer[tracesRequest]{}, 1000, component.DataTypeTraces, - component.ID{}, marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()).(*persistentQueue[tracesRequest]) + pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ + Sizer: &RequestSizer[tracesRequest]{}, + Capacity: 1000, + DataType: component.DataTypeTraces, + StorageID: component.ID{}, + Marshaler: marshalTracesRequest, + Unmarshaler: unmarshalTracesRequest, + ExporterSettings: exportertest.NewNopCreateSettings(), + }).(*persistentQueue[tracesRequest]) pq.initClient(context.Background(), client) return pq } @@ -85,8 +99,15 @@ func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extens func createTestPersistentQueueWithCapacityLimiter(t testing.TB, ext storage.Extension, sizer Sizer[tracesRequest], capacity int) *persistentQueue[tracesRequest] { - pq := NewPersistentQueue[tracesRequest](sizer, capacity, component.DataTypeTraces, component.ID{}, - marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()).(*persistentQueue[tracesRequest]) + pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{ + Sizer: sizer, + Capacity: capacity, + DataType: component.DataTypeTraces, + StorageID: component.ID{}, + Marshaler: marshalTracesRequest, + Unmarshaler: unmarshalTracesRequest, + ExporterSettings: exportertest.NewNopCreateSettings(), + }).(*persistentQueue[tracesRequest]) require.NoError(t, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}})) return pq } @@ -320,8 +341,7 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := NewPersistentQueue[tracesRequest](&RequestSizer[tracesRequest]{}, 1, component.DataTypeTraces, component.ID{}, - marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()) + pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{}) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index c6891825851..423b7657e10 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -92,10 +92,20 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co var queue internal.Queue[Request] queueSizer := &internal.RequestSizer[Request]{} if isPersistent { - queue = internal.NewPersistentQueue[Request](queueSizer, config.QueueSize, signal, *config.StorageID, marshaler, - unmarshaler, set) + queue = internal.NewPersistentQueue[Request](internal.PersistentQueueSettings[Request]{ + Sizer: queueSizer, + Capacity: config.QueueSize, + DataType: signal, + StorageID: *config.StorageID, + Marshaler: marshaler, + Unmarshaler: unmarshaler, + ExporterSettings: set, + }) } else { - queue = internal.NewBoundedMemoryQueue[Request](queueSizer, config.QueueSize) + queue = internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{ + Sizer: queueSizer, + Capacity: config.QueueSize, + }) } qs := &queueSender{ fullName: set.ID.String(),