Skip to content

Commit

Permalink
refactor!: better configuration for discovery providers (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Apr 29, 2024
1 parent f1d8604 commit 6524ea0
Show file tree
Hide file tree
Showing 46 changed files with 1,518 additions and 1,707 deletions.
31 changes: 28 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ run:
# Include test files or not.
# Default: true
tests: true
skip-dirs-use-default: true
skip-dirs:
- mocks

linters:
disable-all: true
Expand All @@ -40,10 +37,38 @@ issues:
text: "exported func.*returns unexported type.*which can be annoying to use"
linters:
- revive
exclude-dirs-use-default: true
exclude-dirs:
- mocks


linters-settings:
misspell:
locale: US
ignore-words:
- cancelled
- behaviour

goheader:
template: |-
# MIT License
# Copyright (c) 2022-2024 Tochemey
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2022-2023 Tochemey
Copyright (c) 2022-2024 Tochemey

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
69 changes: 29 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ go get github.com/tochemey/goakt
The cluster engine depends upon the [discovery](./discovery/provider.go) mechanism to find other nodes in the cluster.
Under the hood, it leverages [Olric](https://github.com/buraksezer/olric)
to scale out and guarantee performant, reliable persistence, simple scalability, partitioning (sharding), and
re-balancing out-of-the-box.
re-balancing out-of-the-box. _**It requires remoting to be enabled**_.

At the moment the following providers are implemented:

Expand All @@ -351,19 +351,7 @@ At the moment the following providers are implemented:
- the [NATS](https://nats.io/) [integration](discovery/nats) is fully functional
- the [DNS](discovery/dnssd) is fully functional

Note: One can add additional discovery providers using the following [interface](./discovery/provider.go)

In addition, one needs to set the following environment variables irrespective of the discovery provider to help
identify the host node on which the cluster service is running:

- `NODE_NAME`: the node name. For instance in kubernetes one can just get it from the `metadata.name`
- `NODE_IP`: the node host address. For instance in kubernetes one can just get it from the `status.podIP`
- `GOSSIP_PORT`: the gossip protocol engine port.
- `CLUSTER_PORT`: the cluster port to help communicate with other GoAkt nodes in the cluster
- `REMOTING_PORT`: help remoting communication between actors

_Note: Depending upon the discovery provider implementation, the `GOSSIP_PORT` and `CLUSTER_PORT` can be the same.
The same applies to `NODE_NAME` and `NODE_IP`. This is up to the discretion of the implementation_
Note: One can add additional discovery providers using the following [interface](./discovery/provider.go).

### Operations Guide

Expand All @@ -386,31 +374,30 @@ To get the kubernetes discovery working as expected, the following pod labels ne
- `app.kubernetes.io/component`: set this label with the application name
- `app.kubernetes.io/name`: set this label with the application name

In addition, each node _is required to have three different ports open_ with the following ports name for the cluster
engine to work as expected:

- `gossip-port`: help the gossip protocol engine. This is actually the kubernetes discovery port
- `cluster-port`: help the cluster engine to communicate with other GoAkt nodes in the cluster
- `remoting-port`: help for remoting messaging between actors

##### Get Started

```go
const (
namespace = "default"
applicationName = "accounts"
actorSystemName = "AccountsSystem"
gossipPortName = "gossip-port"
clusterPortName = "cluster-port"
remotingPortName = "remoting-port"
)
// instantiate the k8 discovery provider
disco := kubernetes.NewDiscovery()
// define the discovery options
discoOptions := discovery.Config{
kubernetes.ApplicationName: applicationName,
kubernetes.ActorSystemName: actorSystemName,
kubernetes.Namespace: namespace,
// define the discovery config
config := kubernetes.Config{
ApplicationName: applicationName,
ActorSystemName: actorSystemName,
Namespace: namespace,
GossipPortName: gossipPortName,
RemotingPortName: remotingPortName,
ClusterPortName: clusterPortName,
}
// define the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions)

// instantiate the k8 discovery provider
disco := kubernetes.NewDiscovery(&config)

// pass the service discovery when enabling cluster mode in the actor system
```

Expand Down Expand Up @@ -474,17 +461,20 @@ const (
applicationName = "accounts"
actorSystemName = "AccountsSystem"
)
// instantiate the NATS discovery provider
disco := nats.NewDiscovery()
// define the discovery options
discoOptions := discovery.Config{
config := nats.Config{
ApplicationName: applicationName,
ActorSystemName: actorSystemName,
NatsServer: natsServer,
NatsSubject: natsSubject,
}
// define the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions)

// define the host node instance
hostNode := discovery.Node{}

// instantiate the NATS discovery provider by passing the config and the hostNode
disco := nats.NewDiscovery(&config, &hostNode)

// pass the service discovery when enabling cluster mode in the actor system
```

Expand All @@ -500,15 +490,14 @@ To use the DNS discovery provider one needs to provide the following:

```go
const domainName = "accounts"
// instantiate the dnssd discovery provider
disco := dnssd.NewDiscovery()
// define the discovery options
discoOptions := discovery.Config{
config := dnssd.Config{
dnssd.DomainName: domainName,
dnssd.IPv6: false,
}
// define the service discovery
serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions
// instantiate the dnssd discovery provider
disco := dnssd.NewDiscovery(&config)

// pass the service discovery when enabling cluster mode in the actor system
```

Expand Down
35 changes: 25 additions & 10 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,15 @@ type actorSystem struct {
// convenient field to check cluster setup
clusterEnabled atomic.Bool
// cluster discovery method
serviceDiscovery *discovery.ServiceDiscovery
discoveryProvider discovery.Provider
// define the number of partitions to shard the actors in the cluster
partitionsCount uint64
// cluster mode
cluster cluster.Interface
clusterChan chan *internalpb.WireActor
cluster cluster.Interface
clusterChan chan *internalpb.WireActor
clusterPort int
gossipPort int

partitionHasher hash.Hasher

// help protect some the fields to set
Expand Down Expand Up @@ -748,6 +751,10 @@ func (x *actorSystem) Start(ctx context.Context) error {

x.started.Store(true)

if x.remotingEnabled.Load() {
x.enableRemoting(spanCtx)
}

if x.clusterEnabled.Load() {
if err := x.enableClustering(spanCtx); err != nil {
return err
Expand All @@ -757,10 +764,6 @@ func (x *actorSystem) Start(ctx context.Context) error {
// go x.runClusterSync()
}

if x.remotingEnabled.Load() {
x.enableRemoting(spanCtx)
}

x.scheduler.Start(spanCtx)

go x.housekeeper()
Expand Down Expand Up @@ -1067,8 +1070,22 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot
func (x *actorSystem) enableClustering(ctx context.Context) error {
x.logger.Info("enabling clustering...")

if !x.remotingEnabled.Load() {
x.logger.Error("clustering needs remoting to be enabled")
return errors.New("clustering needs remoting to be enabled")
}

hostNode := discovery.Node{
Name: x.name,
Host: x.remotingHost,
GossipPort: x.gossipPort,
ClusterPort: x.clusterPort,
RemotingPort: int(x.remotingPort),
}

cluster, err := cluster.NewNode(x.Name(),
x.serviceDiscovery,
x.discoveryProvider,
&hostNode,
cluster.WithLogger(x.logger),
cluster.WithPartitionsCount(x.partitionsCount),
cluster.WithHasher(x.partitionHasher),
Expand Down Expand Up @@ -1096,8 +1113,6 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
x.mutex.Lock()
x.cluster = cluster
x.eventsChan = cluster.Events()
x.remotingHost = cluster.NodeHost()
x.remotingPort = int32(cluster.NodeRemotingPort())
x.mutex.Unlock()

go x.broadcastClusterEvents()
Expand Down
Loading

0 comments on commit 6524ea0

Please sign in to comment.