Skip to content

Commit

Permalink
[chore] [exporterhelper] Integrate capacity limiting into a helper queue
Browse files Browse the repository at this point in the history
Integrate capacity limiting into internal channels used by both memory and persistent queues. Otherwise, with the independent capacity limiter, it's hard to ensure that queue size is always accurate going forward.
  • Loading branch information
dmitryax committed May 4, 2024
1 parent 67d3718 commit 999718d
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 273 deletions.
4 changes: 2 additions & 2 deletions exporter/exporterqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func sizerFromConfig[T itemsCounter](Config) queue.Sizer[T] {
return &queue.RequestSizer[T]{}
}

func capacityFromConfig(cfg Config) int {
func capacityFromConfig(cfg Config) int64 {
// TODO: Handle other ways to measure the queue size once they are added.
return cfg.QueueSize
return int64(cfg.QueueSize)
}
23 changes: 9 additions & 14 deletions exporter/internal/queue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,50 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*queueCapacityLimiter[T]
items chan queueRequest[T]
*sizedChannel[memQueueEl[T]]
sizer Sizer[T]
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
type MemoryQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int
Capacity int64
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
queueCapacityLimiter: newQueueCapacityLimiter[T](set.Sizer, set.Capacity),
items: make(chan queueRequest[T], set.Capacity),
sizedChannel: newSizedChannel[memQueueEl[T]](set.Capacity, nil, 0),
sizer: set.Sizer,
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
if !q.queueCapacityLimiter.claim(req) {
return ErrQueueIsFull
}
q.items <- queueRequest[T]{ctx: ctx, req: req}
return nil
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil)
}

// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) })
if !ok {
return false
}
q.queueCapacityLimiter.release(item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
close(q.items)
q.sizedChannel.shutdown()
return nil
}

type queueRequest[T any] struct {
type memQueueEl[T any] struct {
req T
ctx context.Context
}
10 changes: 9 additions & 1 deletion exporter/internal/queue/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int)
func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
var wg sync.WaitGroup
wg.Add(requestsCount)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: 10 * requestsCount})
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: int64(10 * requestsCount)})
consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error {
wg.Done()
return nil
Expand All @@ -156,3 +156,11 @@ func TestZeroSizeNoConsumers(t *testing.T) {

assert.NoError(t, q.Shutdown(context.Background()))
}

type fakeReq struct {
itemsCount int
}

func (r fakeReq) ItemsCount() int {
return r.itemsCount
}
Loading

0 comments on commit 999718d

Please sign in to comment.