Skip to content

Commit

Permalink
perf: performance tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 23, 2024
1 parent 33a4194 commit 3521805
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 565 deletions.
80 changes: 38 additions & 42 deletions actors/behavior_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,80 +24,78 @@

package actors

import "sync"
import (
"sync/atomic"
"unsafe"
)

// Behavior defines an actor behavior
type Behavior func(ctx *ReceiveContext)

type bnode struct {
value Behavior
previous *bnode
value Behavior
next unsafe.Pointer
}

// behaviorStack defines a stack of Behavior
type behaviorStack struct {
top *bnode
length int
mutex *sync.RWMutex
top unsafe.Pointer
length uint64
}

// newBehaviorStack creates an instance of behaviorStack
func newBehaviorStack() *behaviorStack {
return &behaviorStack{
top: nil,
length: 0,
mutex: &sync.RWMutex{},
}
}

// Len returns the length of the stack.
func (bs *behaviorStack) Len() int {
bs.mutex.RLock()
length := bs.length
bs.mutex.RUnlock()
return length
return int(atomic.LoadUint64(&bs.length))
}

// Peek helps view the top item on the stack
func (bs *behaviorStack) Peek() Behavior {
bs.mutex.RLock()
length := bs.length
bs.mutex.RUnlock()
if length == 0 {
top := atomic.LoadPointer(&bs.top)
if top == nil {
return nil
}

bs.mutex.RLock()
value := bs.top.value
bs.mutex.RUnlock()
return value
item := (*bnode)(top)
return item.value
}

// Pop removes and return top element of stack
func (bs *behaviorStack) Pop() Behavior {
bs.mutex.RLock()
length := bs.length
bs.mutex.RUnlock()
if length == 0 {
return nil
var top, next unsafe.Pointer
var item *bnode
for {
top = atomic.LoadPointer(&bs.top)
if top == nil {
return nil
}
item = (*bnode)(top)
next = atomic.LoadPointer(&item.next)
if atomic.CompareAndSwapPointer(&bs.top, top, next) {
atomic.AddUint64(&bs.length, ^uint64(0))
return item.value
}
}

bs.mutex.RLock()
n := bs.top
bs.top = n.previous
bs.length--
value := n.value
bs.mutex.RUnlock()
return value
}

// Push a new value onto the stack
func (bs *behaviorStack) Push(behavior Behavior) {
bs.mutex.Lock()
n := &bnode{behavior, bs.top}
bs.top = n
bs.length++
bs.mutex.Unlock()
node := bnode{value: behavior}
var top unsafe.Pointer
for {
top = atomic.LoadPointer(&bs.top)
node.next = top
if atomic.CompareAndSwapPointer(&bs.top, top, unsafe.Pointer(&node)) {
atomic.AddUint64(&bs.length, 1)
return
}
}
}

// IsEmpty checks if stack is empty
Expand All @@ -107,8 +105,6 @@ func (bs *behaviorStack) IsEmpty() bool {

// Reset empty the stack
func (bs *behaviorStack) Reset() {
bs.mutex.Lock()
bs.top = nil
bs.length = 0
bs.mutex.Unlock()
atomic.StorePointer(&bs.top, nil)
atomic.StoreUint64(&bs.length, 0)
}
2 changes: 1 addition & 1 deletion actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ func (pid *PID) setBehaviorStacked(behavior Behavior) {
pid.fieldsLocker.Unlock()
}

// unsetBehaviorStacked sets the actor's behavior to the previous behavior
// unsetBehaviorStacked sets the actor's behavior to the next behavior
// prior to setBehaviorStacked is called
func (pid *PID) unsetBehaviorStacked() {
pid.fieldsLocker.Lock()
Expand Down
4 changes: 2 additions & 2 deletions actors/receive_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func (c *ReceiveContext) Message() proto.Message {
// BecomeStacked sets a new behavior to the actor.
// The current message in process during the transition will still be processed with the current
// behavior before the transition. However, subsequent messages will be processed with the new behavior.
// One needs to call UnBecomeStacked to go the previous the actor's behavior.
// One needs to call UnBecomeStacked to go the next the actor's behavior.
// which is the default behavior.
func (c *ReceiveContext) BecomeStacked(behavior Behavior) {
c.self.setBehaviorStacked(behavior)
}

// UnBecomeStacked sets the actor behavior to the previous behavior before BecomeStacked was called
// UnBecomeStacked sets the actor behavior to the next behavior before BecomeStacked was called
func (c *ReceiveContext) UnBecomeStacked() {
c.self.unsetBehaviorStacked()
}
Expand Down
2 changes: 1 addition & 1 deletion bench/async-comm/main.go → bench/pingpong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
lib.Pause(1 * time.Second)

duration := time.Minute
if err := pingActor.SendAsync(ctx, pongActor.Name(), new(benchmarkpb.Ping)); err != nil {
if err := pingActor.Tell(ctx, pongActor, new(benchmarkpb.Ping)); err != nil {
panic(err)
}

Expand Down
18 changes: 9 additions & 9 deletions internal/collection/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ type Queue struct {

// NewQueue creates a new lock-free queue.
func NewQueue() *Queue {
head := directItem{next: nil, v: nil} // allocate a free item
head := item{next: nil, v: nil} // allocate a free item
return &Queue{
tail: unsafe.Pointer(&head), // both head and tail points
head: unsafe.Pointer(&head), // to the free item
pool: sync.Pool{
New: func() interface{} {
return &directItem{}
return &item{}
},
},
}
}

// Enqueue puts the given value v at the tail of the queue.
func (q *Queue) Enqueue(v interface{}) {
i := q.pool.Get().(*directItem)
i := q.pool.Get().(*item)
i.next = nil
i.v = v

var last, lastnext *directItem
var last, lastnext *item
for {
last = loaditem(&q.tail)
lastnext = loaditem(&last.next)
Expand All @@ -80,7 +80,7 @@ func (q *Queue) Enqueue(v interface{}) {
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *Queue) Dequeue() interface{} {
var first, last, firstnext *directItem
var first, last, firstnext *item
for {
first = loaditem(&q.head)
last = loaditem(&q.tail)
Expand Down Expand Up @@ -108,14 +108,14 @@ func (q *Queue) Length() uint64 {
return atomic.LoadUint64(&q.len)
}

type directItem struct {
type item struct {
next unsafe.Pointer
v interface{}
}

func loaditem(p *unsafe.Pointer) *directItem {
return (*directItem)(atomic.LoadPointer(p))
func loaditem(p *unsafe.Pointer) *item {
return (*item)(atomic.LoadPointer(p))
}
func casitem(p *unsafe.Pointer, old, new *directItem) bool {
func casitem(p *unsafe.Pointer, old, new *item) bool {
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}
89 changes: 0 additions & 89 deletions internal/collection/stack.go

This file was deleted.

61 changes: 0 additions & 61 deletions internal/collection/stack_test.go

This file was deleted.

Loading

0 comments on commit 3521805

Please sign in to comment.