Skip to content

Commit

Permalink
refactor: rename queue appropriately
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Jul 30, 2024
1 parent 7b5237b commit 7b4778d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 39 deletions.
4 changes: 2 additions & 2 deletions internal/eventstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type subscriber struct {
// sem represents a lock
sem sync.Mutex
// messages of the subscriber
messages *queue.Unbounded[*Message]
messages *queue.Queue[*Message]
// topics define the topic the subscriber subscribed to
topics map[string]bool
// states whether the given subscriber is active or not
Expand All @@ -70,7 +70,7 @@ func newSubscriber() *subscriber {
return &subscriber{
id: id,
sem: sync.Mutex{},
messages: queue.NewUnbounded[*Message](),
messages: queue.New[*Message](),
topics: make(map[string]bool),
active: true,
}
Expand Down
58 changes: 29 additions & 29 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import "sync"
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
const minQueueLen = 16

// Unbounded thread-safe Queue using ring-buffer
// Queue thread-safe Queue using ring-buffer
// reference: https://blog.dubbelboer.com/2015/04/25/go-faster-queue.html
// https://github.com/eapache/queue
type Unbounded[T any] struct {
type Queue[T any] struct {
mu sync.RWMutex
cond *sync.Cond
nodes []*T
Expand All @@ -44,36 +44,21 @@ type Unbounded[T any] struct {
initCap int
}

// NewUnbounded creates an instance of Unbounded
func NewUnbounded[T any]() *Unbounded[T] {
sq := &Unbounded[T]{
// New creates an instance of Unbounded
func New[T any]() *Queue[T] {
sq := &Queue[T]{
initCap: minQueueLen,
nodes: make([]*T, minQueueLen),
}
sq.cond = sync.NewCond(&sq.mu)
return sq
}

// resize the queue
func (q *Unbounded[T]) resize() {
nodes := make([]*T, q.count<<1)
if q.tail > q.head {
copy(nodes, q.nodes[q.head:q.tail])
} else {
n := copy(nodes, q.nodes[q.head:])
copy(nodes[n:], q.nodes[:q.tail])
}

q.tail = q.count
q.head = 0
q.nodes = nodes
}

// Push adds an item to the back of the queue
// It can be safely called from multiple goroutines
// It will return false if the queue is closed.
// In that case the Item is dropped.
func (q *Unbounded[T]) Push(i T) bool {
func (q *Queue[T]) Push(i T) bool {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
Expand All @@ -93,7 +78,7 @@ func (q *Unbounded[T]) Push(i T) bool {

// Close the queue and discard all entries in the queue
// all goroutines in wait() will return
func (q *Unbounded[T]) Close() {
func (q *Queue[T]) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
Expand All @@ -104,7 +89,7 @@ func (q *Unbounded[T]) Close() {

// CloseRemaining will close the queue and return all entries in the queue.
// All goroutines in wait() will return.
func (q *Unbounded[T]) CloseRemaining() []T {
func (q *Queue[T]) CloseRemaining() []T {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
Expand All @@ -128,7 +113,7 @@ func (q *Unbounded[T]) CloseRemaining() []T {
// IsClosed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
func (q *Unbounded[T]) IsClosed() bool {
func (q *Queue[T]) IsClosed() bool {
q.mu.RLock()
c := q.closed
q.mu.RUnlock()
Expand All @@ -140,7 +125,7 @@ func (q *Unbounded[T]) IsClosed() bool {
// be returned immediately.
// Will return nil, false if the queue is closed.
// Otherwise, the return value of "remove" is returned.
func (q *Unbounded[T]) Wait() (T, bool) {
func (q *Queue[T]) Wait() (T, bool) {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
Expand All @@ -159,7 +144,7 @@ func (q *Unbounded[T]) Wait() (T, bool) {
// Pop removes the item from the front of the queue
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
func (q *Unbounded[T]) Pop() (T, bool) {
func (q *Queue[T]) Pop() (T, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if q.count == 0 {
Expand All @@ -180,25 +165,40 @@ func (q *Unbounded[T]) Pop() (T, bool) {
}

// Cap return the capacity (without allocations)
func (q *Unbounded[T]) Cap() int {
func (q *Queue[T]) Cap() int {
q.mu.RLock()
c := cap(q.nodes)
q.mu.RUnlock()
return c
}

// Len return the current length of the queue.
func (q *Unbounded[T]) Len() int {
func (q *Queue[T]) Len() int {
q.mu.RLock()
l := q.count
q.mu.RUnlock()
return l
}

// IsEmpty returns true when the queue is empty
func (q *Unbounded[T]) IsEmpty() bool {
func (q *Queue[T]) IsEmpty() bool {
q.mu.Lock()
cnt := q.count
q.mu.Unlock()
return cnt == 0
}

// resize the queue
func (q *Queue[T]) resize() {
nodes := make([]*T, q.count<<1)
if q.tail > q.head {
copy(nodes, q.nodes[q.head:q.tail])
} else {
n := copy(nodes, q.nodes[q.head:])
copy(nodes[n:], q.nodes[:q.tail])
}

q.tail = q.count
q.head = 0
q.nodes = nodes
}
16 changes: 8 additions & 8 deletions internal/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
"github.com/stretchr/testify/require"
)

func TestUnboundedQueue(t *testing.T) {
func TestQueue(t *testing.T) {
t.Run("With Push/Pop", func(t *testing.T) {
q := NewUnbounded[int]()
q := New[int]()
require.True(t, q.IsEmpty())
for j := 0; j < 100; j++ {
if q.Len() != 0 {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestUnboundedQueue(t *testing.T) {
assert.Zero(t, q.Cap())
})
t.Run("With Wait", func(t *testing.T) {
q := NewUnbounded[int]()
q := New[int]()
assert.True(t, q.Push(1))
assert.True(t, q.Push(2))
assert.True(t, q.Push(3))
Expand All @@ -105,7 +105,7 @@ func TestUnboundedQueue(t *testing.T) {
assert.Zero(t, q.Cap())
})
t.Run("With Close remaining", func(t *testing.T) {
q := NewUnbounded[int]()
q := New[int]()
for j := 0; j < 100; j++ {
q.Push(j)
}
Expand All @@ -119,17 +119,17 @@ func TestUnboundedQueue(t *testing.T) {
})
}

func BenchmarkUnbounded_Push(b *testing.B) {
q := NewUnbounded[int]()
func BenchmarkQueue_Push(b *testing.B) {
q := New[int]()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Push(i)
}
}

func BenchmarkUnbounded_Pop(b *testing.B) {
q := NewUnbounded[int]()
func BenchmarkQueue_Pop(b *testing.B) {
q := New[int]()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 7b4778d

Please sign in to comment.