Skip to content

Commit

Permalink
refactor: enhance clustering options (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed May 19, 2024
1 parent 6f6452f commit 5897f00
Show file tree
Hide file tree
Showing 31 changed files with 544 additions and 447 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ You can schedule sending messages to actor that will be acted upon in the future
#### Cron Expression Format

| Field | Required | Allowed Values | Allowed Special Characters |
|--------------|----------|-----------------|----------------------------|
| ------------ | -------- | --------------- | -------------------------- |
| Seconds | yes | 0-59 | , - * / |
| Minutes | yes | 0-59 | , - * / |
| Hours | yes | 0-23 | , - * / |
Expand Down Expand Up @@ -382,7 +382,7 @@ const (
applicationName = "accounts"
actorSystemName = "AccountsSystem"
gossipPortName = "gossip-port"
clusterPortName = "cluster-port"
peersPortName = "peers-port"
remotingPortName = "remoting-port"
)
// define the discovery config
Expand All @@ -392,7 +392,7 @@ config := kubernetes.Config{
Namespace: namespace,
GossipPortName: gossipPortName,
RemotingPortName: remotingPortName,
ClusterPortName: clusterPortName,
PeersPortName: peersPortName,
}

// instantiate the k8 discovery provider
Expand Down
97 changes: 39 additions & 58 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/alphadose/haxmap"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/codes"
Expand Down Expand Up @@ -180,10 +181,11 @@ type actorSystem struct {
// define the number of partitions to shard the actors in the cluster
partitionsCount uint64
// cluster mode
cluster cluster.Interface
clusterChan chan *internalpb.WireActor
clusterPort int
gossipPort int
cluster cluster.Interface
clusterChan chan *internalpb.WireActor
peersPort int
gossipPort int
minimumPeersQuorum uint16

partitionHasher hash.Hasher

Expand Down Expand Up @@ -214,6 +216,8 @@ type actorSystem struct {

registry types.Registry
reflection reflection

peersCache *haxmap.Map[string, []byte]
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand All @@ -230,7 +234,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}

system := &actorSystem{
actors: newPIDMap(10),
actors: newPIDMap(1_000), // TODO need to check with memory footprint here since we change the map engine
clusterChan: make(chan *internalpb.WireActor, 10),
name: name,
logger: log.DefaultLogger,
Expand All @@ -250,6 +254,8 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
eventsChan: make(chan *cluster.Event, 1),
registry: types.NewRegistry(),
clusterSyncStopSig: make(chan types.Unit, 1),
minimumPeersQuorum: 1,
peersCache: haxmap.New[string, []byte](100), // TODO need to check with memory footprint here since we change the map engine
}

system.started.Store(false)
Expand Down Expand Up @@ -754,9 +760,6 @@ func (x *actorSystem) Start(ctx context.Context) error {
if err := x.enableClustering(spanCtx); err != nil {
return err
}
// start cluster synchronization
// TODO: revisit this
// go x.runClusterSync()
}

x.scheduler.Start(spanCtx)
Expand Down Expand Up @@ -828,6 +831,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}
}

x.actors.close()
x.reset()
return nil
}
Expand Down Expand Up @@ -864,11 +868,11 @@ func (x *actorSystem) RemoteAsk(ctx context.Context, stream *connect.BidiStream[
}

for {
switch ctx.Err() {
case context.Canceled:
return connect.NewError(connect.CodeCanceled, ctx.Err())
case context.DeadlineExceeded:
return connect.NewError(connect.CodeDeadlineExceeded, ctx.Err())
switch err := ctx.Err(); {
case errors.Is(err, context.Canceled):
return connect.NewError(connect.CodeCanceled, err)
case errors.Is(err, context.DeadlineExceeded):
return connect.NewError(connect.CodeDeadlineExceeded, err)
}

request, err := stream.Receive()
Expand Down Expand Up @@ -1074,16 +1078,17 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
Name: x.name,
Host: x.remotingHost,
GossipPort: x.gossipPort,
ClusterPort: x.clusterPort,
PeersPort: x.peersPort,
RemotingPort: int(x.remotingPort),
}

cluster, err := cluster.NewNode(x.Name(),
cluster, err := cluster.NewEngine(x.Name(),
x.discoveryProvider,
&hostNode,
cluster.WithLogger(x.logger),
cluster.WithPartitionsCount(x.partitionsCount),
cluster.WithHasher(x.partitionHasher),
cluster.WithMinimumPeersQuorum(x.minimumPeersQuorum),
)
if err != nil {
x.logger.Error(errors.Wrap(err, "failed to initialize cluster engine"))
Expand Down Expand Up @@ -1193,19 +1198,6 @@ func (x *actorSystem) reset() {
x.cluster = nil
}

// broadcast publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) broadcast(ctx context.Context) {
for wireActor := range x.clusterChan {
if x.cluster != nil {
if err := x.cluster.PutActor(ctx, wireActor); err != nil {
x.logger.Error(err.Error())
// TODO: stop or continue
return
}
}
}
}

// housekeeper time to time removes dead actors from the system
// that helps free non-utilized resources
func (x *actorSystem) housekeeper() {
Expand Down Expand Up @@ -1256,13 +1248,25 @@ func (x *actorSystem) registerMetrics() error {
return err
}

// broadcast publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) broadcast(ctx context.Context) {
for wireActor := range x.clusterChan {
if x.cluster != nil {
// set the actor in the cluster
if err := x.cluster.PutActor(ctx, wireActor); err != nil {
x.logger.Panic(err.Error())
}
}
}
}

// broadcastClusterEvents listens to cluster events and send them to the event streams
func (x *actorSystem) broadcastClusterEvents() {
for event := range x.eventsChan {
if x.clusterEnabled.Load() {
if event != nil && event.Payload != nil {
// first need to resync actors map back to the cluster
x.clusterSync()
x.syncActors()
// push the event to the event stream
message, _ := event.Payload.UnmarshalNew()
if x.eventsStream != nil {
Expand All @@ -1275,12 +1279,12 @@ func (x *actorSystem) broadcastClusterEvents() {
}
}

// clusterSync synchronizes the node' actors map to the cluster.
func (x *actorSystem) clusterSync() {
typesMap := x.actors.props()
if len(typesMap) != 0 {
// syncActors synchronizes the node' actors map to the cluster.
func (x *actorSystem) syncActors() {
props := x.actors.props()
if len(props) != 0 {
x.logger.Info("syncing node actors map to the cluster...")
for actorID, prop := range typesMap {
for actorID, actorProp := range props {
actorPath := NewPath(actorID, NewAddress(x.name, "", -1))
if x.remotingEnabled.Load() {
actorPath = NewPath(actorID, NewAddress(x.name, x.remotingHost, int(x.remotingPort)))
Expand All @@ -1294,32 +1298,9 @@ func (x *actorSystem) clusterSync() {
ActorName: actorID,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: prop.rtype.Name(),
ActorType: actorProp.rtype.Name(),
}
}
x.logger.Info("node actors map successfully synced back to the cluster.")
}
}

// runClusterSync runs time to time cluster synchronization
// by populating the given node actors map to the cluster for availability
func (x *actorSystem) runClusterSync() {
x.logger.Info("cluster synchronization has started...")
ticker := time.NewTicker(5 * time.Minute)
tickerStopSig := make(chan types.Unit, 1)
go func() {
for {
select {
case <-ticker.C:
x.clusterSync()
case <-x.clusterSyncStopSig:
tickerStopSig <- types.Unit{}
return
}
}
}()

<-tickerStopSig
ticker.Stop()
x.logger.Info("cluster synchronization has stopped...")
}
6 changes: 3 additions & 3 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestActorSystem(t *testing.T) {
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, gossipPort, clusterPort))
WithClustering(provider, 9, 1, gossipPort, clusterPort))
require.NoError(t, err)

provider.EXPECT().ID().Return("testDisco")
Expand Down Expand Up @@ -947,7 +947,7 @@ func TestActorSystem(t *testing.T) {
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, gossipPort, clusterPort))
WithClustering(provider, 9, 1, gossipPort, clusterPort))
require.NoError(t, err)

provider.EXPECT().ID().Return("testDisco")
Expand Down Expand Up @@ -1172,7 +1172,7 @@ func TestActorSystem(t *testing.T) {
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, gossipPort, clusterPort))
WithClustering(provider, 9, 1, gossipPort, clusterPort))
require.NoError(t, err)

provider.EXPECT().ID().Return("testDisco")
Expand Down
4 changes: 2 additions & 2 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
Name: host,
Host: host,
GossipPort: gossipPort,
ClusterPort: clusterPort,
PeersPort: clusterPort,
RemotingPort: remotingPort,
}

Expand All @@ -458,7 +458,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 10, gossipPort, clusterPort))
WithClustering(provider, 10, 1, gossipPort, clusterPort))

require.NotNil(t, system)
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ func WithRemoting(host string, port int32) Option {
})
}

// WithClustering enables clustering on the actor system.
func WithClustering(provider discovery.Provider, partitionCount uint64, gossipPort, clusterPort int) Option {
// WithClustering enables the cluster mode.
func WithClustering(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, gossipPort, peersPort int) Option {
return OptionFunc(func(a *actorSystem) {
a.clusterEnabled.Store(true)
a.partitionsCount = partitionCount
a.discoveryProvider = provider
a.clusterPort = clusterPort
a.peersPort = peersPort
a.gossipPort = gossipPort
a.minimumPeersQuorum = minimumPeersQuorum
})
}

Expand Down
Loading

0 comments on commit 5897f00

Please sign in to comment.