Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: enhance clustering options #324

Merged
merged 11 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ You can schedule sending messages to actor that will be acted upon in the future
#### Cron Expression Format

| Field | Required | Allowed Values | Allowed Special Characters |
|--------------|----------|-----------------|----------------------------|
| ------------ | -------- | --------------- | -------------------------- |
| Seconds | yes | 0-59 | , - * / |
| Minutes | yes | 0-59 | , - * / |
| Hours | yes | 0-23 | , - * / |
Expand Down Expand Up @@ -382,7 +382,7 @@ const (
applicationName = "accounts"
actorSystemName = "AccountsSystem"
gossipPortName = "gossip-port"
clusterPortName = "cluster-port"
peersPortName = "peers-port"
remotingPortName = "remoting-port"
)
// define the discovery config
Expand All @@ -392,7 +392,7 @@ config := kubernetes.Config{
Namespace: namespace,
GossipPortName: gossipPortName,
RemotingPortName: remotingPortName,
ClusterPortName: clusterPortName,
PeersPortName: peersPortName,
}

// instantiate the k8 discovery provider
Expand Down
97 changes: 39 additions & 58 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/alphadose/haxmap"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/codes"
Expand Down Expand Up @@ -180,10 +181,11 @@ type actorSystem struct {
// define the number of partitions to shard the actors in the cluster
partitionsCount uint64
// cluster mode
cluster cluster.Interface
clusterChan chan *internalpb.WireActor
clusterPort int
gossipPort int
cluster cluster.Interface
clusterChan chan *internalpb.WireActor
peersPort int
gossipPort int
minimumPeersQuorum uint16

partitionHasher hash.Hasher

Expand Down Expand Up @@ -214,6 +216,8 @@ type actorSystem struct {

registry types.Registry
reflection reflection

peersCache *haxmap.Map[string, []byte]
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand All @@ -230,7 +234,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
}

system := &actorSystem{
actors: newPIDMap(10),
actors: newPIDMap(1_000), // TODO need to check with memory footprint here since we change the map engine
clusterChan: make(chan *internalpb.WireActor, 10),
name: name,
logger: log.DefaultLogger,
Expand All @@ -250,6 +254,8 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
eventsChan: make(chan *cluster.Event, 1),
registry: types.NewRegistry(),
clusterSyncStopSig: make(chan types.Unit, 1),
minimumPeersQuorum: 1,
peersCache: haxmap.New[string, []byte](100), // TODO need to check with memory footprint here since we change the map engine
}

system.started.Store(false)
Expand Down Expand Up @@ -754,9 +760,6 @@ func (x *actorSystem) Start(ctx context.Context) error {
if err := x.enableClustering(spanCtx); err != nil {
return err
}
// start cluster synchronization
// TODO: revisit this
// go x.runClusterSync()
}

x.scheduler.Start(spanCtx)
Expand Down Expand Up @@ -828,6 +831,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}
}

x.actors.close()
x.reset()
return nil
}
Expand Down Expand Up @@ -864,11 +868,11 @@ func (x *actorSystem) RemoteAsk(ctx context.Context, stream *connect.BidiStream[
}

for {
switch ctx.Err() {
case context.Canceled:
return connect.NewError(connect.CodeCanceled, ctx.Err())
case context.DeadlineExceeded:
return connect.NewError(connect.CodeDeadlineExceeded, ctx.Err())
switch err := ctx.Err(); {
case errors.Is(err, context.Canceled):
return connect.NewError(connect.CodeCanceled, err)
case errors.Is(err, context.DeadlineExceeded):
return connect.NewError(connect.CodeDeadlineExceeded, err)
}

request, err := stream.Receive()
Expand Down Expand Up @@ -1074,16 +1078,17 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
Name: x.name,
Host: x.remotingHost,
GossipPort: x.gossipPort,
ClusterPort: x.clusterPort,
PeersPort: x.peersPort,
RemotingPort: int(x.remotingPort),
}

cluster, err := cluster.NewNode(x.Name(),
cluster, err := cluster.NewEngine(x.Name(),
x.discoveryProvider,
&hostNode,
cluster.WithLogger(x.logger),
cluster.WithPartitionsCount(x.partitionsCount),
cluster.WithHasher(x.partitionHasher),
cluster.WithMinimumPeersQuorum(x.minimumPeersQuorum),
)
if err != nil {
x.logger.Error(errors.Wrap(err, "failed to initialize cluster engine"))
Expand Down Expand Up @@ -1193,19 +1198,6 @@ func (x *actorSystem) reset() {
x.cluster = nil
}

// broadcast publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) broadcast(ctx context.Context) {
for wireActor := range x.clusterChan {
if x.cluster != nil {
if err := x.cluster.PutActor(ctx, wireActor); err != nil {
x.logger.Error(err.Error())
// TODO: stop or continue
return
}
}
}
}

// housekeeper time to time removes dead actors from the system
// that helps free non-utilized resources
func (x *actorSystem) housekeeper() {
Expand Down Expand Up @@ -1256,13 +1248,25 @@ func (x *actorSystem) registerMetrics() error {
return err
}

// broadcast publishes newly created actor into the cluster when cluster is enabled
func (x *actorSystem) broadcast(ctx context.Context) {
for wireActor := range x.clusterChan {
if x.cluster != nil {
// set the actor in the cluster
if err := x.cluster.PutActor(ctx, wireActor); err != nil {
x.logger.Panic(err.Error())
}
}
}
}

// broadcastClusterEvents listens to cluster events and send them to the event streams
func (x *actorSystem) broadcastClusterEvents() {
for event := range x.eventsChan {
if x.clusterEnabled.Load() {
if event != nil && event.Payload != nil {
// first need to resync actors map back to the cluster
x.clusterSync()
x.syncActors()
// push the event to the event stream
message, _ := event.Payload.UnmarshalNew()
if x.eventsStream != nil {
Expand All @@ -1275,12 +1279,12 @@ func (x *actorSystem) broadcastClusterEvents() {
}
}

// clusterSync synchronizes the node' actors map to the cluster.
func (x *actorSystem) clusterSync() {
typesMap := x.actors.props()
if len(typesMap) != 0 {
// syncActors synchronizes the node' actors map to the cluster.
func (x *actorSystem) syncActors() {
props := x.actors.props()
if len(props) != 0 {
x.logger.Info("syncing node actors map to the cluster...")
for actorID, prop := range typesMap {
for actorID, actorProp := range props {
actorPath := NewPath(actorID, NewAddress(x.name, "", -1))
if x.remotingEnabled.Load() {
actorPath = NewPath(actorID, NewAddress(x.name, x.remotingHost, int(x.remotingPort)))
Expand All @@ -1294,32 +1298,9 @@ func (x *actorSystem) clusterSync() {
ActorName: actorID,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
ActorType: prop.rtype.Name(),
ActorType: actorProp.rtype.Name(),
}
}
x.logger.Info("node actors map successfully synced back to the cluster.")
}
}

// runClusterSync runs time to time cluster synchronization
// by populating the given node actors map to the cluster for availability
func (x *actorSystem) runClusterSync() {
x.logger.Info("cluster synchronization has started...")
ticker := time.NewTicker(5 * time.Minute)
tickerStopSig := make(chan types.Unit, 1)
go func() {
for {
select {
case <-ticker.C:
x.clusterSync()
case <-x.clusterSyncStopSig:
tickerStopSig <- types.Unit{}
return
}
}
}()

<-tickerStopSig
ticker.Stop()
x.logger.Info("cluster synchronization has stopped...")
}
6 changes: 3 additions & 3 deletions actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestActorSystem(t *testing.T) {
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, gossipPort, clusterPort))
WithClustering(provider, 9, 1, gossipPort, clusterPort))
require.NoError(t, err)

provider.EXPECT().ID().Return("testDisco")
Expand Down Expand Up @@ -947,7 +947,7 @@ func TestActorSystem(t *testing.T) {
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, gossipPort, clusterPort))
WithClustering(provider, 9, 1, gossipPort, clusterPort))
require.NoError(t, err)

provider.EXPECT().ID().Return("testDisco")
Expand Down Expand Up @@ -1172,7 +1172,7 @@ func TestActorSystem(t *testing.T) {
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 9, gossipPort, clusterPort))
WithClustering(provider, 9, 1, gossipPort, clusterPort))
require.NoError(t, err)

provider.EXPECT().ID().Return("testDisco")
Expand Down
4 changes: 2 additions & 2 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
Name: host,
Host: host,
GossipPort: gossipPort,
ClusterPort: clusterPort,
PeersPort: clusterPort,
RemotingPort: remotingPort,
}

Expand All @@ -458,7 +458,7 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem,
WithLogger(logger),
WithReplyTimeout(time.Minute),
WithRemoting(host, int32(remotingPort)),
WithClustering(provider, 10, gossipPort, clusterPort))
WithClustering(provider, 10, 1, gossipPort, clusterPort))

require.NotNil(t, system)
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions actors/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ func WithRemoting(host string, port int32) Option {
})
}

// WithClustering enables clustering on the actor system.
func WithClustering(provider discovery.Provider, partitionCount uint64, gossipPort, clusterPort int) Option {
// WithClustering enables the cluster mode.
func WithClustering(provider discovery.Provider, partitionCount uint64, minimumPeersQuorum uint16, gossipPort, peersPort int) Option {
return OptionFunc(func(a *actorSystem) {
a.clusterEnabled.Store(true)
a.partitionsCount = partitionCount
a.discoveryProvider = provider
a.clusterPort = clusterPort
a.peersPort = peersPort
a.gossipPort = gossipPort
a.minimumPeersQuorum = minimumPeersQuorum
})
}

Expand Down
Loading
Loading