Skip to content

Commit

Permalink
refactor: add test coverage for cluster rebalancing (#458)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 6, 2024
1 parent 835ad22 commit bddc103
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 5 deletions.
75 changes: 75 additions & 0 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package actors

import (
"context"
"fmt"
"net"
"sort"
"strconv"
Expand Down Expand Up @@ -1484,4 +1485,78 @@ func TestActorSystem(t *testing.T) {
assert.NoError(t, err)
})
})
t.Run("With actors redeployment", func(t *testing.T) {
// create a context
ctx := context.TODO()
// start the NATS server
srv := startNatsServer(t)

// create and start system cluster
node1, sd1 := startClusterSystem(t, "Node1", srv.Addr().String())
require.NotNil(t, node1)
require.NotNil(t, sd1)

// create and start system cluster
node2, sd2 := startClusterSystem(t, "Node2", srv.Addr().String())
require.NotNil(t, node2)
require.NotNil(t, sd2)

// create and start system cluster
node3, sd3 := startClusterSystem(t, "Node3", srv.Addr().String())
require.NotNil(t, node3)
require.NotNil(t, sd3)

// let us create 4 actors on each node
for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node1-Actor-%d", j)
pid, err := node1.Spawn(ctx, actorName, newTestActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node2-Actor-%d", j)
pid, err := node2.Spawn(ctx, actorName, newTestActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

pause(time.Second)

for j := 1; j <= 4; j++ {
actorName := fmt.Sprintf("Node3-Actor-%d", j)
pid, err := node3.Spawn(ctx, actorName, newTestActor())
require.NoError(t, err)
require.NotNil(t, pid)
}

pause(time.Second)

// take down node2
require.NoError(t, node2.Stop(ctx))
require.NoError(t, sd2.Close())

// Wait for cluster rebalancing
pause(time.Minute)

// let us access some of the node2 actors from node 1 and node 3
actorName := "Node2-Actor-1"

sender, err := node1.LocalActor("Node1-Actor-1")
require.NoError(t, err)
require.NotNil(t, sender)

err = sender.SendAsync(ctx, actorName, new(testpb.TestSend))
require.NoError(t, err)

t.Cleanup(func() {
assert.NoError(t, node1.Stop(ctx))
assert.NoError(t, node3.Stop(ctx))
assert.NoError(t, sd1.Close())
assert.NoError(t, sd3.Close())
srv.Shutdown()
})
})
}
19 changes: 14 additions & 5 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,21 @@ import (

// testActor is an actor that helps run various test scenarios
type testActor struct {
counter *atomic.Int64
counter atomic.Int64
}

// enforce compilation error
var _ Actor = (*testActor)(nil)

// newTestActor creates a testActor
func newTestActor() *testActor {
return &testActor{
counter: atomic.NewInt64(0),
}
return &testActor{}
}

// Init initialize the actor. This function can be used to set up some database connections
// or some sort of initialization before the actor init processing public
func (p *testActor) PreStart(context.Context) error {
p.counter.Store(0)
return nil
}

Expand Down Expand Up @@ -469,7 +468,17 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 10, 1, gossipPort, clusterPort, new(testActor)))
WithPeerStateLoopInterval(500*time.Millisecond),
WithCluster(
NewClusterConfig().
WithKinds(new(testActor)).
WithPartitionCount(10).
WithReplicaCount(1).
WithPeersPort(clusterPort).
WithMinimumPeersQuorum(1).
WithGossipPort(gossipPort).
WithDiscovery(provider).WithKinds(new(testActor))),
)

require.NotNil(t, system)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions actors/redistribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func (x *actorSystem) redistribute(ctx context.Context, event *cluster.Event) er

nodeLeft := new(goaktpb.NodeLeft)
if err := event.Payload.UnmarshalTo(nodeLeft); err != nil {
x.logger.Errorf("failed to unmarshal payload: (%v)", err)
return err
}

x.peersCacheMu.RLock()
bytea, ok := x.peersCache[nodeLeft.GetAddress()]
x.peersCacheMu.RUnlock()
if !ok {
x.logger.Errorf("peer %s not found", nodeLeft.GetAddress())
return ErrPeerNotFound
}

Expand Down

0 comments on commit bddc103

Please sign in to comment.