Skip to content

Commit

Permalink
perf: performance tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 24, 2024
1 parent 0837a7c commit a2b922a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 37 deletions.
19 changes: 15 additions & 4 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ import (
"github.com/tochemey/goakt/v2/telemetry"
)

// specifies the state in which the PID is
// regarding message processing
type processingState int32

const (
// idle means there are no messages to process
idle processingState = iota
// processing means the PID is processing messages
processing
)

// watcher is used to handle parent child relationship.
// This helps handle error propagation from a child actor using any of supervisory strategies
type watcher struct {
Expand Down Expand Up @@ -1030,10 +1041,10 @@ func (pid *PID) doReceive(receiveCtx *ReceiveContext) {
pid.signalMessage()
}

// Signal that a message has arrived and wake up the actor if needed
// signal that a message has arrived and wake up the actor if needed
func (pid *PID) signalMessage() {
// only signal if the actor is not already processing messages
if pid.processingMessages.CompareAndSwap(0, 1) {
if pid.processingMessages.CompareAndSwap(int32(idle), int32(processing)) {
select {
case pid.receiveSignal <- types.Unit{}:
default:
Expand All @@ -1055,9 +1066,9 @@ func (pid *PID) receiveLoop() {
received := pid.mailbox.Pop()
if received == nil {
// If no more messages, stop processing
pid.processingMessages.Store(0)
pid.processingMessages.Store(int32(idle))
// Check if new messages were added in the meantime and restart processing
if !pid.mailbox.IsEmpty() && pid.processingMessages.CompareAndSwap(0, 1) {
if !pid.mailbox.IsEmpty() && pid.processingMessages.CompareAndSwap(int32(idle), int32(processing)) {
continue
}
break
Expand Down
87 changes: 59 additions & 28 deletions bench/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ package bench

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/bench/benchmarkpb"
"github.com/tochemey/goakt/v2/internal/types"
"github.com/tochemey/goakt/v2/internal/lib"
"github.com/tochemey/goakt/v2/log"
)

Expand All @@ -52,7 +53,7 @@ func BenchmarkActor(b *testing.B) {
_ = actorSystem.Start(ctx)

// wait for system to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

// define the benchmark actor
actor := &Benchmarker{}
Expand All @@ -61,16 +62,25 @@ func BenchmarkActor(b *testing.B) {
pid, _ := actorSystem.Spawn(ctx, "test", actor)

// wait for actors to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

var counter int64
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = actors.Tell(ctx, pid, new(benchmarkpb.BenchTell))
if err := actors.Tell(ctx, pid, new(benchmarkpb.BenchTell)); err != nil {
b.Fatal(err)
}
atomic.AddInt64(&counter, 1)
}
})

b.StopTimer()

messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds()
b.ReportMetric(messagesPerSec, "messages/sec")

_ = pid.Shutdown(ctx)
_ = actorSystem.Stop(ctx)
})
Expand All @@ -88,23 +98,29 @@ func BenchmarkActor(b *testing.B) {
_ = actorSystem.Start(ctx)

// wait for system to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

// create the actors
sender, _ := actorSystem.Spawn(ctx, "sender", new(Benchmarker))
receiver, _ := actorSystem.Spawn(ctx, "receiver", new(Benchmarker))

// wait for actors to start properly
pause(1 * time.Second)

lib.Pause(1 * time.Second)
var counter int64
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = sender.Tell(ctx, receiver, new(benchmarkpb.BenchTell))
err := sender.Tell(ctx, receiver, new(benchmarkpb.BenchTell))
if err != nil {
b.Fatal(err)
}
atomic.AddInt64(&counter, 1)
}
})

b.StopTimer()
messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds()
b.ReportMetric(messagesPerSec, "messages/sec")
_ = actorSystem.Stop(ctx)
})
b.Run("SendAsync", func(b *testing.B) {
Expand All @@ -121,22 +137,29 @@ func BenchmarkActor(b *testing.B) {
_ = actorSystem.Start(ctx)

// wait for system to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

// create the actors
sender, _ := actorSystem.Spawn(ctx, "sender", new(Benchmarker))
receiver, _ := actorSystem.Spawn(ctx, "receiver", new(Benchmarker))

// wait for actors to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)
var counter int64
b.ResetTimer()
b.ReportAllocs()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = sender.SendAsync(ctx, receiver.Name(), new(benchmarkpb.BenchTell))
if err := sender.SendAsync(ctx, receiver.Name(), new(benchmarkpb.BenchTell)); err != nil {
b.Fatal(err)
}
atomic.AddInt64(&counter, 1)
}
})
b.StopTimer()

messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds()
b.ReportMetric(messagesPerSec, "messages/sec")

_ = actorSystem.Stop(ctx)
})
Expand All @@ -153,7 +176,7 @@ func BenchmarkActor(b *testing.B) {
_ = actorSystem.Start(ctx)

// wait for system to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

// define the benchmark actor
actor := &Benchmarker{}
Expand All @@ -162,14 +185,22 @@ func BenchmarkActor(b *testing.B) {
pid, _ := actorSystem.Spawn(ctx, "test", actor)

// wait for actors to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)
var counter int64
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _ = actors.Ask(ctx, pid, new(benchmarkpb.BenchRequest), receivingTimeout)
if _, err := actors.Ask(ctx, pid, new(benchmarkpb.BenchRequest), receivingTimeout); err != nil {
b.Fatal(err)
}
atomic.AddInt64(&counter, 1)
}
})
b.StopTimer()

messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds()
b.ReportMetric(messagesPerSec, "messages/sec")

_ = pid.Shutdown(ctx)
_ = actorSystem.Stop(ctx)
Expand All @@ -188,31 +219,31 @@ func BenchmarkActor(b *testing.B) {
_ = actorSystem.Start(ctx)

// wait for system to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

// create the actors
sender, _ := actorSystem.Spawn(ctx, "sender", new(Benchmarker))
receiver, _ := actorSystem.Spawn(ctx, "receiver", new(Benchmarker))

// wait for actors to start properly
pause(1 * time.Second)
lib.Pause(1 * time.Second)

var counter int64
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _ = sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest))
if _, err := sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest)); err != nil {
b.Fatal(err)
}
atomic.AddInt64(&counter, 1)
}
})
_ = actorSystem.Stop(ctx)
})
}
b.StopTimer()

messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds()
b.ReportMetric(messagesPerSec, "messages/sec")

func pause(duration time.Duration) {
bootstrapChan := make(chan types.Unit, 1)
timer := time.AfterFunc(duration, func() {
bootstrapChan <- types.Unit{}
_ = actorSystem.Stop(ctx)
})
<-bootstrapChan
timer.Stop()
}
9 changes: 4 additions & 5 deletions bench/pingpong/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ func (p *Ping) Receive(ctx *actors.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *benchmarkpb.Pong:
p.count.Add(1)
p.count.Inc()
// let us reply to the sender
_ = ctx.Self().Tell(ctx.Context(), ctx.Sender(), new(benchmarkpb.Ping))
ctx.Tell(ctx.Sender(), new(benchmarkpb.Ping))
default:
ctx.Unhandled()
}
Expand Down Expand Up @@ -155,9 +155,8 @@ func (p *Pong) Receive(ctx *actors.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
case *benchmarkpb.Ping:
p.count.Add(1)
// reply the sender in case there is a sender
_ = ctx.Self().Tell(ctx.Context(), ctx.Sender(), new(benchmarkpb.Pong))
p.count.Inc()
ctx.Tell(ctx.Sender(), new(benchmarkpb.Pong))
default:
ctx.Unhandled()
}
Expand Down

0 comments on commit a2b922a

Please sign in to comment.