diff --git a/internal/eventstream/subscriber.go b/internal/eventstream/subscriber.go index 27258a2f..0cf36aef 100644 --- a/internal/eventstream/subscriber.go +++ b/internal/eventstream/subscriber.go @@ -51,7 +51,7 @@ type subscriber struct { // sem represents a lock sem sync.Mutex // messages of the subscriber - messages *queue.Unbounded[*Message] + messages *queue.Queue[*Message] // topics define the topic the subscriber subscribed to topics map[string]bool // states whether the given subscriber is active or not @@ -70,7 +70,7 @@ func newSubscriber() *subscriber { return &subscriber{ id: id, sem: sync.Mutex{}, - messages: queue.NewUnbounded[*Message](), + messages: queue.New[*Message](), topics: make(map[string]bool), active: true, } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index a053f585..004e2a5e 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -30,10 +30,10 @@ import "sync" // Must be power of 2 for bitwise modulus: x % n == x & (n - 1). const minQueueLen = 16 -// Unbounded thread-safe Queue using ring-buffer +// Queue thread-safe Queue using ring-buffer // reference: https://blog.dubbelboer.com/2015/04/25/go-faster-queue.html // https://github.com/eapache/queue -type Unbounded[T any] struct { +type Queue[T any] struct { mu sync.RWMutex cond *sync.Cond nodes []*T @@ -44,9 +44,9 @@ type Unbounded[T any] struct { initCap int } -// NewUnbounded creates an instance of Unbounded -func NewUnbounded[T any]() *Unbounded[T] { - sq := &Unbounded[T]{ +// New creates an instance of Unbounded +func New[T any]() *Queue[T] { + sq := &Queue[T]{ initCap: minQueueLen, nodes: make([]*T, minQueueLen), } @@ -54,26 +54,11 @@ func NewUnbounded[T any]() *Unbounded[T] { return sq } -// resize the queue -func (q *Unbounded[T]) resize() { - nodes := make([]*T, q.count<<1) - if q.tail > q.head { - copy(nodes, q.nodes[q.head:q.tail]) - } else { - n := copy(nodes, q.nodes[q.head:]) - copy(nodes[n:], q.nodes[:q.tail]) - } - - q.tail = q.count - q.head = 0 - q.nodes = nodes -} - // Push adds an item to the back of the queue // It can be safely called from multiple goroutines // It will return false if the queue is closed. // In that case the Item is dropped. -func (q *Unbounded[T]) Push(i T) bool { +func (q *Queue[T]) Push(i T) bool { q.mu.Lock() if q.closed { q.mu.Unlock() @@ -93,7 +78,7 @@ func (q *Unbounded[T]) Push(i T) bool { // Close the queue and discard all entries in the queue // all goroutines in wait() will return -func (q *Unbounded[T]) Close() { +func (q *Queue[T]) Close() { q.mu.Lock() defer q.mu.Unlock() q.closed = true @@ -104,7 +89,7 @@ func (q *Unbounded[T]) Close() { // CloseRemaining will close the queue and return all entries in the queue. // All goroutines in wait() will return. -func (q *Unbounded[T]) CloseRemaining() []T { +func (q *Queue[T]) CloseRemaining() []T { q.mu.Lock() defer q.mu.Unlock() if q.closed { @@ -128,7 +113,7 @@ func (q *Unbounded[T]) CloseRemaining() []T { // IsClosed returns true if the queue has been closed // The call cannot guarantee that the queue hasn't been // closed while the function returns, so only "true" has a definite meaning. -func (q *Unbounded[T]) IsClosed() bool { +func (q *Queue[T]) IsClosed() bool { q.mu.RLock() c := q.closed q.mu.RUnlock() @@ -140,7 +125,7 @@ func (q *Unbounded[T]) IsClosed() bool { // be returned immediately. // Will return nil, false if the queue is closed. // Otherwise, the return value of "remove" is returned. -func (q *Unbounded[T]) Wait() (T, bool) { +func (q *Queue[T]) Wait() (T, bool) { q.mu.Lock() if q.closed { q.mu.Unlock() @@ -159,7 +144,7 @@ func (q *Unbounded[T]) Wait() (T, bool) { // Pop removes the item from the front of the queue // If false is returned, it either means 1) there were no items on the queue // or 2) the queue is closed. -func (q *Unbounded[T]) Pop() (T, bool) { +func (q *Queue[T]) Pop() (T, bool) { q.mu.Lock() defer q.mu.Unlock() if q.count == 0 { @@ -180,7 +165,7 @@ func (q *Unbounded[T]) Pop() (T, bool) { } // Cap return the capacity (without allocations) -func (q *Unbounded[T]) Cap() int { +func (q *Queue[T]) Cap() int { q.mu.RLock() c := cap(q.nodes) q.mu.RUnlock() @@ -188,7 +173,7 @@ func (q *Unbounded[T]) Cap() int { } // Len return the current length of the queue. -func (q *Unbounded[T]) Len() int { +func (q *Queue[T]) Len() int { q.mu.RLock() l := q.count q.mu.RUnlock() @@ -196,9 +181,24 @@ func (q *Unbounded[T]) Len() int { } // IsEmpty returns true when the queue is empty -func (q *Unbounded[T]) IsEmpty() bool { +func (q *Queue[T]) IsEmpty() bool { q.mu.Lock() cnt := q.count q.mu.Unlock() return cnt == 0 } + +// resize the queue +func (q *Queue[T]) resize() { + nodes := make([]*T, q.count<<1) + if q.tail > q.head { + copy(nodes, q.nodes[q.head:q.tail]) + } else { + n := copy(nodes, q.nodes[q.head:]) + copy(nodes[n:], q.nodes[:q.tail]) + } + + q.tail = q.count + q.head = 0 + q.nodes = nodes +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go index ab64251b..711dd283 100644 --- a/internal/queue/queue_test.go +++ b/internal/queue/queue_test.go @@ -31,9 +31,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestUnboundedQueue(t *testing.T) { +func TestQueue(t *testing.T) { t.Run("With Push/Pop", func(t *testing.T) { - q := NewUnbounded[int]() + q := New[int]() require.True(t, q.IsEmpty()) for j := 0; j < 100; j++ { if q.Len() != 0 { @@ -88,7 +88,7 @@ func TestUnboundedQueue(t *testing.T) { assert.Zero(t, q.Cap()) }) t.Run("With Wait", func(t *testing.T) { - q := NewUnbounded[int]() + q := New[int]() assert.True(t, q.Push(1)) assert.True(t, q.Push(2)) assert.True(t, q.Push(3)) @@ -105,7 +105,7 @@ func TestUnboundedQueue(t *testing.T) { assert.Zero(t, q.Cap()) }) t.Run("With Close remaining", func(t *testing.T) { - q := NewUnbounded[int]() + q := New[int]() for j := 0; j < 100; j++ { q.Push(j) } @@ -119,8 +119,8 @@ func TestUnboundedQueue(t *testing.T) { }) } -func BenchmarkUnbounded_Push(b *testing.B) { - q := NewUnbounded[int]() +func BenchmarkQueue_Push(b *testing.B) { + q := New[int]() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -128,8 +128,8 @@ func BenchmarkUnbounded_Push(b *testing.B) { } } -func BenchmarkUnbounded_Pop(b *testing.B) { - q := NewUnbounded[int]() +func BenchmarkQueue_Pop(b *testing.B) { + q := New[int]() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ {