Skip to content

Commit

Permalink
[chore] [exporterhelper] Move queue creation parameters to structs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Jan 2, 2024
1 parent e7e73ab commit 7f7b826
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 59 deletions.
12 changes: 9 additions & 3 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ type boundedMemoryQueue[T any] struct {
items chan queueRequest[T]
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
type MemoryQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](sizer Sizer[T], capacity int) Queue[T] {
func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity),
items: make(chan queueRequest[T], capacity),
queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity),
items: make(chan queueRequest[T], set.Capacity),
}
}

Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1)
q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1})

assert.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -72,7 +72,7 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 1000)
q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1000})

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -153,7 +153,7 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], capacity int, numCo
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers int, numberOfItems int) {
var wg sync.WaitGroup
wg.Add(numberOfItems)
q := NewBoundedMemoryQueue[fakeReq](sizer, capacity)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: capacity})
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, fakeReq) error {
wg.Done()
return nil
Expand All @@ -170,7 +170,7 @@ func queueUsage(tb testing.TB, sizer Sizer[fakeReq], capacity int, numConsumers
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](&RequestSizer[string]{}, 0)
q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 0})

err := q.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
Expand Down
85 changes: 44 additions & 41 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@ import (
type persistentQueue[T any] struct {
*queueCapacityLimiter[T]

set exporter.CreateSettings
storageID component.ID
dataType component.DataType
client storage.Client
unmarshaler func(data []byte) (T, error)
marshaler func(req T) ([]byte, error)
set PersistentQueueSettings[T]
logger *zap.Logger
client storage.Client

// isRequestSized indicates whether the queue is sized by the number of requests.
isRequestSized bool
Expand Down Expand Up @@ -86,26 +83,32 @@ var (
errWrongExtensionType = errors.New("requested extension is not a storage extension")
)

type PersistentQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int
DataType component.DataType
StorageID component.ID
Marshaler func(req T) ([]byte, error)
Unmarshaler func([]byte) (T, error)
ExporterSettings exporter.CreateSettings
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue[T any](sizer Sizer[T], capacity int, dataType component.DataType, storageID component.ID,
marshaler func(req T) ([]byte, error), unmarshaler func([]byte) (T, error), set exporter.CreateSettings) Queue[T] {
_, isRequestSized := sizer.(*RequestSizer[T])
func NewPersistentQueue[T any](set PersistentQueueSettings[T]) Queue[T] {
_, isRequestSized := set.Sizer.(*RequestSizer[T])
return &persistentQueue[T]{
queueCapacityLimiter: newQueueCapacityLimiter[T](sizer, capacity),
queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity),
set: set,
storageID: storageID,
dataType: dataType,
unmarshaler: unmarshaler,
marshaler: marshaler,
logger: set.ExporterSettings.Logger,
initQueueSize: &atomic.Uint64{},
isRequestSized: isRequestSized,
putChan: make(chan struct{}, capacity),
putChan: make(chan struct{}, set.Capacity),
}
}

// Start starts the persistentQueue with the given number of consumers.
func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) error {
storageClient, err := toStorageClient(ctx, pq.storageID, host, pq.set.ID, pq.dataType)
storageClient, err := toStorageClient(ctx, pq.set.StorageID, host, pq.set.ExporterSettings.ID, pq.set.DataType)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,9 +140,9 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex

if err != nil {
if errors.Is(err, errValueNotSet) {
pq.set.Logger.Info("Initializing new persistent queue")
pq.logger.Info("Initializing new persistent queue")
} else {
pq.set.Logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err))
pq.logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err))
}
pq.readIndex = 0
pq.writeIndex = 0
Expand Down Expand Up @@ -168,11 +171,11 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
}
if err != nil {
if errors.Is(err, errValueNotSet) {
pq.set.Logger.Warn("Cannot read the queue size snapshot from storage. "+
pq.logger.Warn("Cannot read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained. "+
"It's expected when the items sized queue enabled for the first time", zap.Error(err))
} else {
pq.set.Logger.Error("Failed to read the queue size snapshot from storage. "+
pq.logger.Error("Failed to read the queue size snapshot from storage. "+
"The reported queue size will be inaccurate until the initial queue is drained.", zap.Error(err))
}
}
Expand Down Expand Up @@ -255,14 +258,14 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
// putInternal is the internal version that requires caller to hold the mutex lock.
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
if !pq.queueCapacityLimiter.claim(req) {
pq.set.Logger.Warn("Maximum queue capacity reached")
pq.logger.Warn("Maximum queue capacity reached")
return ErrQueueIsFull
}

itemKey := getItemKey(pq.writeIndex)
newIndex := pq.writeIndex + 1

reqBuf, err := pq.marshaler(req)
reqBuf, err := pq.set.Marshaler(req)
if err != nil {
pq.queueCapacityLimiter.release(req)
return err
Expand All @@ -286,7 +289,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 5 {
if err := pq.backupQueueSize(ctx); err != nil {
pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err))
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}
}

Expand Down Expand Up @@ -320,14 +323,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
getOp)

if err == nil {
request, err = pq.unmarshaler(getOp.Value)
request, err = pq.set.Unmarshaler(getOp.Value)
}

if err != nil {
pq.set.Logger.Debug("Failed to dispatch item", zap.Error(err))
pq.logger.Debug("Failed to dispatch item", zap.Error(err))
// We need to make sure that currently dispatched items list is cleaned
if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.set.Logger.Error("Error deleting item from queue", zap.Error(err))
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

return request, nil, false
Expand All @@ -339,7 +342,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
if (pq.writeIndex % 10) == 0 {
if qsErr := pq.backupQueueSize(ctx); qsErr != nil {
pq.set.Logger.Error("Error writing queue size to storage", zap.Error(err))
pq.logger.Error("Error writing queue size to storage", zap.Error(err))
}
}

Expand All @@ -352,7 +355,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err = pq.unrefClient(ctx); err != nil {
pq.set.Logger.Error("Error closing the storage client", zap.Error(err))
pq.logger.Error("Error closing the storage client", zap.Error(err))
}
pq.mu.Unlock()
}()
Expand All @@ -364,7 +367,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
}

if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.set.Logger.Error("Error deleting item from queue", zap.Error(err))
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

}, true
Expand Down Expand Up @@ -399,22 +402,22 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co

pq.mu.Lock()
defer pq.mu.Unlock()
pq.set.Logger.Debug("Checking if there are items left for dispatch by consumers")
pq.logger.Debug("Checking if there are items left for dispatch by consumers")
itemKeysBuf, err := pq.client.Get(ctx, currentlyDispatchedItemsKey)
if err == nil {
dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf)
}
if err != nil {
pq.set.Logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
pq.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
return
}

if len(dispatchedItems) == 0 {
pq.set.Logger.Debug("No items left for dispatch by consumers")
pq.logger.Debug("No items left for dispatch by consumers")
return
}

pq.set.Logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems,
pq.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems,
len(dispatchedItems)))
retrieveBatch := make([]storage.Operation, len(dispatchedItems))
cleanupBatch := make([]storage.Operation, len(dispatchedItems))
Expand All @@ -427,24 +430,24 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
cleanupErr := pq.client.Batch(ctx, cleanupBatch...)

if cleanupErr != nil {
pq.set.Logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr))
pq.logger.Debug("Failed cleaning items left by consumers", zap.Error(cleanupErr))
}

if retrieveErr != nil {
pq.set.Logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
pq.logger.Warn("Failed retrieving items left by consumers", zap.Error(retrieveErr))
return
}

errCount := 0
for _, op := range retrieveBatch {
if op.Value == nil {
pq.set.Logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
pq.logger.Warn("Failed retrieving item", zap.String(zapKey, op.Key), zap.Error(errValueNotSet))
continue
}
req, err := pq.unmarshaler(op.Value)
req, err := pq.set.Unmarshaler(op.Value)
// If error happened or item is nil, it will be efficiently ignored
if err != nil {
pq.set.Logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
continue
}
if pq.putInternal(ctx, req) != nil {
Expand All @@ -453,10 +456,10 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
}

if errCount > 0 {
pq.set.Logger.Error("Errors occurred while moving items for dispatching back to queue",
pq.logger.Error("Errors occurred while moving items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(retrieveBatch)), zap.Int(zapErrorCount, errCount))
} else {
pq.set.Logger.Info("Moved items for dispatching back to queue",
pq.logger.Info("Moved items for dispatching back to queue",
zap.Int(zapNumberOfItems, len(retrieveBatch)))
}
}
Expand All @@ -476,7 +479,7 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u
deleteOp := storage.DeleteOperation(getItemKey(index))
if err := pq.client.Batch(ctx, setOp, deleteOp); err != nil {
// got an error, try to gracefully handle it
pq.set.Logger.Warn("Failed updating currently dispatched items, trying to delete the item first",
pq.logger.Warn("Failed updating currently dispatched items, trying to delete the item first",
zap.Error(err))
} else {
// Everything ok, exit
Expand Down
36 changes: 28 additions & 8 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,15 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
// createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers.
func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest], capacity int, numConsumers int,
consumeFunc func(_ context.Context, item tracesRequest) error) Queue[tracesRequest] {
pq := NewPersistentQueue[tracesRequest](sizer, capacity, component.DataTypeTraces, component.ID{},
marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings())
pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{
Sizer: sizer,
Capacity: capacity,
DataType: component.DataTypeTraces,
StorageID: component.ID{},
Marshaler: marshalTracesRequest,
Unmarshaler: unmarshalTracesRequest,
ExporterSettings: exportertest.NewNopCreateSettings(),
})
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
Expand All @@ -69,8 +76,15 @@ func createAndStartTestPersistentQueue(t *testing.T, sizer Sizer[tracesRequest],
}

func createTestPersistentQueueWithClient(client storage.Client) *persistentQueue[tracesRequest] {
pq := NewPersistentQueue[tracesRequest](&RequestSizer[tracesRequest]{}, 1000, component.DataTypeTraces,
component.ID{}, marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()).(*persistentQueue[tracesRequest])
pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{
Sizer: &RequestSizer[tracesRequest]{},
Capacity: 1000,
DataType: component.DataTypeTraces,
StorageID: component.ID{},
Marshaler: marshalTracesRequest,
Unmarshaler: unmarshalTracesRequest,
ExporterSettings: exportertest.NewNopCreateSettings(),
}).(*persistentQueue[tracesRequest])
pq.initClient(context.Background(), client)
return pq
}
Expand All @@ -85,8 +99,15 @@ func createTestPersistentQueueWithItemsCapacity(t testing.TB, ext storage.Extens

func createTestPersistentQueueWithCapacityLimiter(t testing.TB, ext storage.Extension, sizer Sizer[tracesRequest],
capacity int) *persistentQueue[tracesRequest] {
pq := NewPersistentQueue[tracesRequest](sizer, capacity, component.DataTypeTraces, component.ID{},
marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings()).(*persistentQueue[tracesRequest])
pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{
Sizer: sizer,
Capacity: capacity,
DataType: component.DataTypeTraces,
StorageID: component.ID{},
Marshaler: marshalTracesRequest,
Unmarshaler: unmarshalTracesRequest,
ExporterSettings: exportertest.NewNopCreateSettings(),
}).(*persistentQueue[tracesRequest])
require.NoError(t, pq.Start(context.Background(), &mockHost{ext: map[component.ID]component.Component{{}: ext}}))
return pq
}
Expand Down Expand Up @@ -320,8 +341,7 @@ func TestInvalidStorageExtensionType(t *testing.T) {
}

func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
pq := NewPersistentQueue[tracesRequest](&RequestSizer[tracesRequest]{}, 1, component.DataTypeTraces, component.ID{},
marshalTracesRequest, unmarshalTracesRequest, exportertest.NewNopCreateSettings())
pq := NewPersistentQueue[tracesRequest](PersistentQueueSettings[tracesRequest]{})
// verify that stopping a un-start/started w/error queue does not panic
assert.NoError(t, pq.Shutdown(context.Background()))
}
Expand Down
16 changes: 13 additions & 3 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,20 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
var queue internal.Queue[Request]
queueSizer := &internal.RequestSizer[Request]{}
if isPersistent {
queue = internal.NewPersistentQueue[Request](queueSizer, config.QueueSize, signal, *config.StorageID, marshaler,
unmarshaler, set)
queue = internal.NewPersistentQueue[Request](internal.PersistentQueueSettings[Request]{
Sizer: queueSizer,
Capacity: config.QueueSize,
DataType: signal,
StorageID: *config.StorageID,
Marshaler: marshaler,
Unmarshaler: unmarshaler,
ExporterSettings: set,
})
} else {
queue = internal.NewBoundedMemoryQueue[Request](queueSizer, config.QueueSize)
queue = internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{
Sizer: queueSizer,
Capacity: config.QueueSize,
})
}
qs := &queueSender{
fullName: set.ID.String(),
Expand Down

0 comments on commit 7f7b826

Please sign in to comment.