Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: asserting linked queue #425

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ type pid struct {
shutdownTimeout atomic.Duration

// specifies the actor mailbox
mailbox *queue.MpscQueue[ReceiveContext]
mailbox queue.Queue[ReceiveContext]

haltPassivationLnr chan types.Unit

Expand Down Expand Up @@ -264,7 +264,7 @@ type pid struct {
behaviorStack *behaviorStack

// stash settings
stashBuffer *queue.MpscQueue[ReceiveContext]
stashBuffer *queue.Mpsc[ReceiveContext]
stashLocker *sync.Mutex

// define an events stream
Expand Down Expand Up @@ -307,7 +307,7 @@ func newPID(ctx context.Context, actorPath *Path, actor Actor, opts ...pidOption
fieldsLocker: &sync.RWMutex{},
stopLocker: &sync.Mutex{},
httpClient: http.NewClient(),
mailbox: queue.NewMpscQueue[ReceiveContext](),
mailbox: queue.NewLinked[ReceiveContext](),
stashBuffer: nil,
stashLocker: &sync.Mutex{},
eventsStream: nil,
Expand Down
2 changes: 1 addition & 1 deletion actors/pid_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func withTelemetry(telemetry *telemetry.Telemetry) pidOption {
// withStash sets the actor's stash buffer
func withStash() pidOption {
return func(pid *pid) {
pid.stashBuffer = queue.NewMpscQueue[ReceiveContext]()
pid.stashBuffer = queue.NewMpsc[ReceiveContext]()
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/eventstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type subscriber struct {
// sem represents a lock
sem sync.Mutex
// messages of the subscriber
messages *queue.Queue[*Message]
messages *queue.Ring[*Message]
// topics define the topic the subscriber subscribed to
topics map[string]bool
// states whether the given subscriber is active or not
Expand All @@ -70,7 +70,7 @@ func newSubscriber() *subscriber {
return &subscriber{
id: id,
sem: sync.Mutex{},
messages: queue.New[*Message](),
messages: queue.NewRing[*Message](),
topics: make(map[string]bool),
active: true,
}
Expand Down
91 changes: 91 additions & 0 deletions internal/queue/linked.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package queue

import "sync/atomic"

type linkedNode[T any] struct {
value T
next atomic.Pointer[linkedNode[T]]
}

// Linked is a concurrent non-blocking queue
type Linked[T any] struct {
head, tail atomic.Pointer[linkedNode[T]]
}

// NewLinked creates an instance of Linked
func NewLinked[T any]() *Linked[T] {
empty := new(linkedNode[T])
lnk := new(Linked[T])
lnk.head.Store(empty)
lnk.tail.Store(empty)
return lnk
}

// Push place the given value in the queue head (FIFO).
func (q *Linked[T]) Push(value T) {
node := &linkedNode[T]{value, atomic.Pointer[linkedNode[T]]{}}
var currentTail *linkedNode[T]
for added := false; !added; {
currentTail = q.tail.Load()
currentNext := currentTail.next.Load()
if currentNext != nil {
q.tail.CompareAndSwap(currentTail, currentNext)
continue
}
added = q.tail.Load().next.CompareAndSwap(currentNext, node)
}
q.tail.CompareAndSwap(currentTail, node)
}

// Pop removes the QueueItem from the front of the queue
// If false is returned, it means there were no items on the queue
func (q *Linked[T]) Pop() (T, bool) {
var currentHead *linkedNode[T]
for removed := false; !removed; {
head, tail := q.head.Load(), q.tail.Load()
currentHead = head.next.Load()
if tail == head {
if currentHead != nil {
q.tail.CompareAndSwap(tail, currentHead)
continue
}
return *new(T), false
}
removed = q.head.CompareAndSwap(head, currentHead)
}
return currentHead.value, true
}

// Peek return the next element in the queue without removing it
func (q *Linked[T]) Peek() T {
return q.head.Load().next.Load().value
}

// IsEmpty returns true when the queue is empty
func (q *Linked[T]) IsEmpty() bool {
return q.head.Load().next.Load() == nil
}
18 changes: 9 additions & 9 deletions internal/queue/mpsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ type node[T any] struct {
next *node[T]
}

// MpscQueue is a Multi-Producer-Single-Consumer Queue
// Mpsc is a Multi-Producer-Single-Consumer Queue
// reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html
type MpscQueue[T any] struct {
type Mpsc[T any] struct {
head *node[T]
tail *node[T]
length int64
lock sync.Mutex
}

// NewMpscQueue create an instance of MpscQueue
func NewMpscQueue[T any]() *MpscQueue[T] {
// NewMpsc create an instance of MPSC Queue
func NewMpsc[T any]() *Mpsc[T] {
item := new(node[T])
return &MpscQueue[T]{
return &Mpsc[T]{
head: item,
tail: item,
length: 0,
Expand All @@ -57,7 +57,7 @@ func NewMpscQueue[T any]() *MpscQueue[T] {
}

// Push place the given value in the queue head (FIFO).
func (q *MpscQueue[T]) Push(value T) {
func (q *Mpsc[T]) Push(value T) {
tnode := &node[T]{
value: value,
}
Expand All @@ -68,7 +68,7 @@ func (q *MpscQueue[T]) Push(value T) {

// Pop takes the QueueItem from the queue tail.
// Returns false if the queue is empty. Can be used in a single consumer (goroutine) only.
func (q *MpscQueue[T]) Pop() (T, bool) {
func (q *Mpsc[T]) Pop() (T, bool) {
var tnil T
next := (*node[T])(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.tail.next))))
if next == nil {
Expand All @@ -85,13 +85,13 @@ func (q *MpscQueue[T]) Pop() (T, bool) {
}

// Len returns queue length
func (q *MpscQueue[T]) Len() int64 {
func (q *Mpsc[T]) Len() int64 {
return atomic.LoadInt64(&q.length)
}

// IsEmpty returns true when the queue is empty
// must be called from a single, consumer goroutine
func (q *MpscQueue[T]) IsEmpty() bool {
func (q *Mpsc[T]) IsEmpty() bool {
q.lock.Lock()
tail := q.tail
q.lock.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion internal/queue/mpsc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// TODO: add go routine-based tests
func TestMpscQueue(t *testing.T) {
t.Run("With Push/Pop", func(t *testing.T) {
q := NewMpscQueue[int]()
q := NewMpsc[int]()
require.True(t, q.IsEmpty())
for j := 0; j < 100; j++ {
if q.Len() != 0 {
Expand Down
Loading