From a2b922a68a06773c2ee973a38ed17a336cb99a42 Mon Sep 17 00:00:00 2001 From: Tochemey Date: Tue, 24 Sep 2024 13:13:02 +0100 Subject: [PATCH] perf: performance tuning --- actors/pid.go | 19 +++++++-- bench/benchmark_test.go | 87 ++++++++++++++++++++++++++++------------- bench/pingpong/main.go | 9 ++--- 3 files changed, 78 insertions(+), 37 deletions(-) diff --git a/actors/pid.go b/actors/pid.go index d050b5a..cea58ba 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -59,6 +59,17 @@ import ( "github.com/tochemey/goakt/v2/telemetry" ) +// specifies the state in which the PID is +// regarding message processing +type processingState int32 + +const ( + // idle means there are no messages to process + idle processingState = iota + // processing means the PID is processing messages + processing +) + // watcher is used to handle parent child relationship. // This helps handle error propagation from a child actor using any of supervisory strategies type watcher struct { @@ -1030,10 +1041,10 @@ func (pid *PID) doReceive(receiveCtx *ReceiveContext) { pid.signalMessage() } -// Signal that a message has arrived and wake up the actor if needed +// signal that a message has arrived and wake up the actor if needed func (pid *PID) signalMessage() { // only signal if the actor is not already processing messages - if pid.processingMessages.CompareAndSwap(0, 1) { + if pid.processingMessages.CompareAndSwap(int32(idle), int32(processing)) { select { case pid.receiveSignal <- types.Unit{}: default: @@ -1055,9 +1066,9 @@ func (pid *PID) receiveLoop() { received := pid.mailbox.Pop() if received == nil { // If no more messages, stop processing - pid.processingMessages.Store(0) + pid.processingMessages.Store(int32(idle)) // Check if new messages were added in the meantime and restart processing - if !pid.mailbox.IsEmpty() && pid.processingMessages.CompareAndSwap(0, 1) { + if !pid.mailbox.IsEmpty() && pid.processingMessages.CompareAndSwap(int32(idle), int32(processing)) { continue } break diff --git a/bench/benchmark_test.go b/bench/benchmark_test.go index 0a23437..ca8db0f 100644 --- a/bench/benchmark_test.go +++ b/bench/benchmark_test.go @@ -26,12 +26,13 @@ package bench import ( "context" + "sync/atomic" "testing" "time" "github.com/tochemey/goakt/v2/actors" "github.com/tochemey/goakt/v2/bench/benchmarkpb" - "github.com/tochemey/goakt/v2/internal/types" + "github.com/tochemey/goakt/v2/internal/lib" "github.com/tochemey/goakt/v2/log" ) @@ -52,7 +53,7 @@ func BenchmarkActor(b *testing.B) { _ = actorSystem.Start(ctx) // wait for system to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) // define the benchmark actor actor := &Benchmarker{} @@ -61,16 +62,25 @@ func BenchmarkActor(b *testing.B) { pid, _ := actorSystem.Spawn(ctx, "test", actor) // wait for actors to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) + var counter int64 b.ResetTimer() b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _ = actors.Tell(ctx, pid, new(benchmarkpb.BenchTell)) + if err := actors.Tell(ctx, pid, new(benchmarkpb.BenchTell)); err != nil { + b.Fatal(err) + } + atomic.AddInt64(&counter, 1) } }) + b.StopTimer() + + messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds() + b.ReportMetric(messagesPerSec, "messages/sec") + _ = pid.Shutdown(ctx) _ = actorSystem.Stop(ctx) }) @@ -88,23 +98,29 @@ func BenchmarkActor(b *testing.B) { _ = actorSystem.Start(ctx) // wait for system to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) // create the actors sender, _ := actorSystem.Spawn(ctx, "sender", new(Benchmarker)) receiver, _ := actorSystem.Spawn(ctx, "receiver", new(Benchmarker)) // wait for actors to start properly - pause(1 * time.Second) - + lib.Pause(1 * time.Second) + var counter int64 b.ResetTimer() b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _ = sender.Tell(ctx, receiver, new(benchmarkpb.BenchTell)) + err := sender.Tell(ctx, receiver, new(benchmarkpb.BenchTell)) + if err != nil { + b.Fatal(err) + } + atomic.AddInt64(&counter, 1) } }) - + b.StopTimer() + messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds() + b.ReportMetric(messagesPerSec, "messages/sec") _ = actorSystem.Stop(ctx) }) b.Run("SendAsync", func(b *testing.B) { @@ -121,22 +137,29 @@ func BenchmarkActor(b *testing.B) { _ = actorSystem.Start(ctx) // wait for system to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) // create the actors sender, _ := actorSystem.Spawn(ctx, "sender", new(Benchmarker)) receiver, _ := actorSystem.Spawn(ctx, "receiver", new(Benchmarker)) // wait for actors to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) + var counter int64 b.ResetTimer() b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _ = sender.SendAsync(ctx, receiver.Name(), new(benchmarkpb.BenchTell)) + if err := sender.SendAsync(ctx, receiver.Name(), new(benchmarkpb.BenchTell)); err != nil { + b.Fatal(err) + } + atomic.AddInt64(&counter, 1) } }) + b.StopTimer() + + messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds() + b.ReportMetric(messagesPerSec, "messages/sec") _ = actorSystem.Stop(ctx) }) @@ -153,7 +176,7 @@ func BenchmarkActor(b *testing.B) { _ = actorSystem.Start(ctx) // wait for system to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) // define the benchmark actor actor := &Benchmarker{} @@ -162,14 +185,22 @@ func BenchmarkActor(b *testing.B) { pid, _ := actorSystem.Spawn(ctx, "test", actor) // wait for actors to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) + var counter int64 b.ResetTimer() b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _ = actors.Ask(ctx, pid, new(benchmarkpb.BenchRequest), receivingTimeout) + if _, err := actors.Ask(ctx, pid, new(benchmarkpb.BenchRequest), receivingTimeout); err != nil { + b.Fatal(err) + } + atomic.AddInt64(&counter, 1) } }) + b.StopTimer() + + messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds() + b.ReportMetric(messagesPerSec, "messages/sec") _ = pid.Shutdown(ctx) _ = actorSystem.Stop(ctx) @@ -188,31 +219,31 @@ func BenchmarkActor(b *testing.B) { _ = actorSystem.Start(ctx) // wait for system to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) // create the actors sender, _ := actorSystem.Spawn(ctx, "sender", new(Benchmarker)) receiver, _ := actorSystem.Spawn(ctx, "receiver", new(Benchmarker)) // wait for actors to start properly - pause(1 * time.Second) + lib.Pause(1 * time.Second) + var counter int64 b.ResetTimer() b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _ = sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest)) + if _, err := sender.Ask(ctx, receiver, new(benchmarkpb.BenchRequest)); err != nil { + b.Fatal(err) + } + atomic.AddInt64(&counter, 1) } }) - _ = actorSystem.Stop(ctx) - }) -} + b.StopTimer() + + messagesPerSec := float64(atomic.LoadInt64(&counter)) / b.Elapsed().Seconds() + b.ReportMetric(messagesPerSec, "messages/sec") -func pause(duration time.Duration) { - bootstrapChan := make(chan types.Unit, 1) - timer := time.AfterFunc(duration, func() { - bootstrapChan <- types.Unit{} + _ = actorSystem.Stop(ctx) }) - <-bootstrapChan - timer.Stop() } diff --git a/bench/pingpong/main.go b/bench/pingpong/main.go index 6ab24a0..a75cffb 100644 --- a/bench/pingpong/main.go +++ b/bench/pingpong/main.go @@ -124,9 +124,9 @@ func (p *Ping) Receive(ctx *actors.ReceiveContext) { switch ctx.Message().(type) { case *goaktpb.PostStart: case *benchmarkpb.Pong: - p.count.Add(1) + p.count.Inc() // let us reply to the sender - _ = ctx.Self().Tell(ctx.Context(), ctx.Sender(), new(benchmarkpb.Ping)) + ctx.Tell(ctx.Sender(), new(benchmarkpb.Ping)) default: ctx.Unhandled() } @@ -155,9 +155,8 @@ func (p *Pong) Receive(ctx *actors.ReceiveContext) { switch ctx.Message().(type) { case *goaktpb.PostStart: case *benchmarkpb.Ping: - p.count.Add(1) - // reply the sender in case there is a sender - _ = ctx.Self().Tell(ctx.Context(), ctx.Sender(), new(benchmarkpb.Pong)) + p.count.Inc() + ctx.Tell(ctx.Sender(), new(benchmarkpb.Pong)) default: ctx.Unhandled() }