Skip to content

Commit

Permalink
perf: add a performance benchmark tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Aug 11, 2024
1 parent 3dfc2f1 commit 0aa1a56
Show file tree
Hide file tree
Showing 5 changed files with 451 additions and 40 deletions.
4 changes: 3 additions & 1 deletion Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ protogen:
--template buf.gen.yaml \
--path protos/goakt \
--path protos/internal \
--path protos/benchmark \
--path protos/test

# save artifact to
SAVE ARTIFACT gen/goakt AS LOCAL goaktpb
SAVE ARTIFACT gen/test AS LOCAL test/data/testpb
SAVE ARTIFACT gen/internal AS LOCAL internal/internalpb
SAVE ARTIFACT gen/internal AS LOCAL internal/internalpb
SAVE ARTIFACT gen/benchmark AS LOCAL bench/benchmarkpb
158 changes: 158 additions & 0 deletions bench/benchmark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* MIT License
*
* Copyright (c) 2022-2024 Tochemey
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package bench

import (
"context"
"fmt"
"math/rand/v2"
"sync"
"sync/atomic"
"time"

"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/bench/benchmarkpb"
"github.com/tochemey/goakt/v2/log"
)

const receivingTimeout = 100 * time.Millisecond

var (
totalSent *atomic.Int64
totalRecv *atomic.Int64
)

func init() {
totalSent = new(atomic.Int64)
totalRecv = new(atomic.Int64)
}

// Benchmarker is an actor that helps run benchmark tests
type Benchmarker struct{}

func (p *Benchmarker) PreStart(context.Context) error {
return nil
}

func (p *Benchmarker) Receive(ctx actors.ReceiveContext) {
switch ctx.Message().(type) {
case *benchmarkpb.BenchTell:
totalRecv.Add(1)
case *benchmarkpb.BenchRequest:
ctx.Response(&benchmarkpb.BenchResponse{})
default:
ctx.Unhandled()
}
}

func (p *Benchmarker) PostStop(context.Context) error {
return nil
}

// Benchmark defines a load testing engine
type Benchmark struct {
// actorsCount defines the number of actors to create
// on each actor system created by the loader
actorsCount int
// workersCount define the number of message senders
workersCount int
// duration specifies how long the load testing will run
duration time.Duration
pids []actors.PID
system actors.ActorSystem
}

// NewBenchmark creates an instance of Loader
func NewBenchmark(actorsCount, workersCount int, duration time.Duration) *Benchmark {
return &Benchmark{
actorsCount: actorsCount,
workersCount: workersCount,
duration: duration,
pids: make([]actors.PID, 0, actorsCount),
}
}

// Start starts the Benchmark
func (b *Benchmark) Start(ctx context.Context) error {
// create the benchmark actor system
name := "benchmark-system"
b.system, _ = actors.NewActorSystem(name,
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

if err := b.system.Start(ctx); err != nil {
return err
}

// wait for the actor system to properly start
time.Sleep(time.Second)

for i := 0; i < b.actorsCount; i++ {
actorName := fmt.Sprintf("actor-%d", i)
pid, err := b.system.Spawn(ctx, actorName, &Benchmarker{})
if err != nil {
return err
}
b.pids = append(b.pids, pid)
}
// wait for the actors to properly start
time.Sleep(time.Second)
return nil
}

// Stop stops the benchmark
func (b *Benchmark) Stop(ctx context.Context) error {
return b.system.Stop(ctx)
}

// Bench sends messages to a random actor
func (b *Benchmark) Bench(ctx context.Context) error {
wg := sync.WaitGroup{}
wg.Add(b.workersCount)
deadline := time.Now().Add(b.duration)
for i := 0; i < b.workersCount; i++ {
go func() {
defer wg.Done()
for time.Now().Before(deadline) {
// randomly pick and actor
pid := b.pids[rand.IntN(len(b.pids))] //nolint:gosec
// send a message
_ = actors.Tell(ctx, pid, new(benchmarkpb.BenchTell))
// increase sent counter
totalSent.Add(1)
}
}()
}

// wait for the messages to be delivered
wg.Wait()
time.Sleep(500 * time.Millisecond)
if totalSent.Load() != totalRecv.Load() {
return fmt.Errorf("send count and receive count does not match: %d != %d", totalSent.Load(), totalRecv.Load())
}
return nil
}
74 changes: 35 additions & 39 deletions bench/bench_test.go → bench/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,47 +26,25 @@ package bench

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/tochemey/goakt/v2/actors"
"github.com/tochemey/goakt/v2/bench/benchmarkpb"
"github.com/tochemey/goakt/v2/log"
testspb "github.com/tochemey/goakt/v2/test/data/testpb"
)

const (
receivingTimeout = 100 * time.Millisecond
)

// Benchmarker is an actor that helps run benchmark tests
type Benchmarker struct {
}

func (p *Benchmarker) PreStart(context.Context) error {
return nil
}

func (p *Benchmarker) Receive(ctx actors.ReceiveContext) {
switch ctx.Message().(type) {
case *testspb.TestSend:
case *testspb.TestReply:
ctx.Response(&testspb.Reply{Content: "received message"})
}
}

func (p *Benchmarker) PostStop(context.Context) error {
return nil
}

func BenchmarkActor(b *testing.B) {
b.Run("tell", func(b *testing.B) {
ctx := context.TODO()

// create the actor system
actorSystem, _ := actors.NewActorSystem("testSys",
actorSystem, _ := actors.NewActorSystem("bench",
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithMailboxSize(uint64(b.N)),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithReplyTimeout(receivingTimeout))

Expand All @@ -82,7 +60,7 @@ func BenchmarkActor(b *testing.B) {
runParallel(b, func(pb *testing.PB) {
for pb.Next() {
// send a message to the actor
_ = actors.Tell(ctx, pid, &testspb.TestSend{})
_ = actors.Tell(ctx, pid, new(benchmarkpb.BenchTell))
}
})

Expand All @@ -93,10 +71,9 @@ func BenchmarkActor(b *testing.B) {
ctx := context.TODO()

// create the actor system
actorSystem, _ := actors.NewActorSystem("testSys",
actorSystem, _ := actors.NewActorSystem("bench",
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithMailboxSize(uint64(b.N)),
actors.WithSupervisorDirective(actors.NewStopDirective()),
actors.WithPassivationDisabled(),
actors.WithReplyTimeout(receivingTimeout))
Expand All @@ -113,21 +90,19 @@ func BenchmarkActor(b *testing.B) {
runParallel(b, func(pb *testing.PB) {
for pb.Next() {
// send a message to the actor
_ = actors.Tell(ctx, pid, &testspb.TestSend{})
_ = actors.Tell(ctx, pid, new(benchmarkpb.BenchTell))
}
})

_ = pid.Shutdown(ctx)
_ = actorSystem.Stop(ctx)
})

b.Run("ask", func(b *testing.B) {
ctx := context.TODO()
// create the actor system
actorSystem, _ := actors.NewActorSystem("testSys",
actorSystem, _ := actors.NewActorSystem("bench",
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithMailboxSize(uint64(b.N)),
actors.WithExpireActorAfter(5*time.Second),
actors.WithReplyTimeout(receivingTimeout))

Expand All @@ -142,7 +117,7 @@ func BenchmarkActor(b *testing.B) {
runParallel(b, func(pb *testing.PB) {
for pb.Next() {
// send a message to the actor
_, _ = actors.Ask(ctx, pid, new(testspb.TestReply), receivingTimeout)
_, _ = actors.Ask(ctx, pid, new(benchmarkpb.BenchRequest), receivingTimeout)
}
})

Expand All @@ -152,10 +127,9 @@ func BenchmarkActor(b *testing.B) {
b.Run("ask without passivation", func(b *testing.B) {
ctx := context.TODO()
// create the actor system
actorSystem, _ := actors.NewActorSystem("testSys",
actorSystem, _ := actors.NewActorSystem("bench",
actors.WithLogger(log.DiscardLogger),
actors.WithActorInitMaxRetries(1),
actors.WithMailboxSize(uint64(b.N)),
actors.WithPassivationDisabled(),
actors.WithReplyTimeout(receivingTimeout))

Expand All @@ -170,7 +144,7 @@ func BenchmarkActor(b *testing.B) {
runParallel(b, func(pb *testing.PB) {
for pb.Next() {
// send a message to the actor
_, _ = actors.Ask(ctx, pid, new(testspb.TestReply), receivingTimeout)
_, _ = actors.Ask(ctx, pid, new(benchmarkpb.BenchRequest), receivingTimeout)
}
})

Expand All @@ -184,6 +158,28 @@ func runParallel(b *testing.B, benchFn func(pb *testing.PB)) {
b.ResetTimer()
start := time.Now()
b.RunParallel(benchFn)
opsPerSec := float64(b.N) / float64(time.Since(start).Seconds())
opsPerSec := float64(b.N) / time.Since(start).Seconds()
b.ReportMetric(opsPerSec, "ops/s")
}

func TestBenchmark_Bench(t *testing.T) {
ctx := context.TODO()

actorsCount := 2000
workersCount := 20
duration := 10 * time.Second

benchmark := NewBenchmark(actorsCount, workersCount, duration)
require.NoError(t, benchmark.Start(ctx))

fmt.Printf("starting benchmark for (%v): num workers:(%d)\n", duration, workersCount)
if err := benchmark.Bench(ctx); err != nil {
t.Fatal(err)
}

fmt.Printf("workers: %d messages sent: %d, messages received: %d - duration: %v\n", workersCount, totalSent.Load(), totalRecv.Load(), duration)
fmt.Printf("messages per second: %d\n", totalRecv.Load()/int64(duration.Seconds()))
t.Cleanup(func() {
require.NoError(t, benchmark.Stop(ctx))
})
}
Loading

0 comments on commit 0aa1a56

Please sign in to comment.