diff --git a/.golangci.yml b/.golangci.yml index b3d46cf6..9c6cdb44 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -11,9 +11,6 @@ run: # Include test files or not. # Default: true tests: true - skip-dirs-use-default: true - skip-dirs: - - mocks linters: disable-all: true @@ -40,6 +37,10 @@ issues: text: "exported func.*returns unexported type.*which can be annoying to use" linters: - revive + exclude-dirs-use-default: true + exclude-dirs: + - mocks + linters-settings: misspell: @@ -47,3 +48,27 @@ linters-settings: ignore-words: - cancelled - behaviour + + goheader: + template: |- + # MIT License + + # Copyright (c) 2022-2024 Tochemey + + # Permission is hereby granted, free of charge, to any person obtaining a copy + # of this software and associated documentation files (the "Software"), to deal + # in the Software without restriction, including without limitation the rights + # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + # copies of the Software, and to permit persons to whom the Software is + # furnished to do so, subject to the following conditions: + + # The above copyright notice and this permission notice shall be included in all + # copies or substantial portions of the Software. + + # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + # SOFTWARE. diff --git a/LICENSE b/LICENSE index e6b73ec5..c27825b2 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022-2023 Tochemey +Copyright (c) 2022-2024 Tochemey Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index f65c95f0..1795aad1 100644 --- a/README.md +++ b/README.md @@ -342,7 +342,7 @@ go get github.com/tochemey/goakt The cluster engine depends upon the [discovery](./discovery/provider.go) mechanism to find other nodes in the cluster. Under the hood, it leverages [Olric](https://github.com/buraksezer/olric) to scale out and guarantee performant, reliable persistence, simple scalability, partitioning (sharding), and -re-balancing out-of-the-box. +re-balancing out-of-the-box. _**It requires remoting to be enabled**_. At the moment the following providers are implemented: @@ -351,19 +351,7 @@ At the moment the following providers are implemented: - the [NATS](https://nats.io/) [integration](discovery/nats) is fully functional - the [DNS](discovery/dnssd) is fully functional -Note: One can add additional discovery providers using the following [interface](./discovery/provider.go) - -In addition, one needs to set the following environment variables irrespective of the discovery provider to help -identify the host node on which the cluster service is running: - -- `NODE_NAME`: the node name. For instance in kubernetes one can just get it from the `metadata.name` -- `NODE_IP`: the node host address. For instance in kubernetes one can just get it from the `status.podIP` -- `GOSSIP_PORT`: the gossip protocol engine port. -- `CLUSTER_PORT`: the cluster port to help communicate with other GoAkt nodes in the cluster -- `REMOTING_PORT`: help remoting communication between actors - -_Note: Depending upon the discovery provider implementation, the `GOSSIP_PORT` and `CLUSTER_PORT` can be the same. -The same applies to `NODE_NAME` and `NODE_IP`. This is up to the discretion of the implementation_ +Note: One can add additional discovery providers using the following [interface](./discovery/provider.go). ### Operations Guide @@ -386,13 +374,6 @@ To get the kubernetes discovery working as expected, the following pod labels ne - `app.kubernetes.io/component`: set this label with the application name - `app.kubernetes.io/name`: set this label with the application name -In addition, each node _is required to have three different ports open_ with the following ports name for the cluster -engine to work as expected: - -- `gossip-port`: help the gossip protocol engine. This is actually the kubernetes discovery port -- `cluster-port`: help the cluster engine to communicate with other GoAkt nodes in the cluster -- `remoting-port`: help for remoting messaging between actors - ##### Get Started ```go @@ -400,17 +381,23 @@ const ( namespace = "default" applicationName = "accounts" actorSystemName = "AccountsSystem" + gossipPortName = "gossip-port" + clusterPortName = "cluster-port" + remotingPortName = "remoting-port" ) -// instantiate the k8 discovery provider -disco := kubernetes.NewDiscovery() -// define the discovery options -discoOptions := discovery.Config{ - kubernetes.ApplicationName: applicationName, - kubernetes.ActorSystemName: actorSystemName, - kubernetes.Namespace: namespace, +// define the discovery config +config := kubernetes.Config{ + ApplicationName: applicationName, + ActorSystemName: actorSystemName, + Namespace: namespace, + GossipPortName: gossipPortName, + RemotingPortName: remotingPortName, + ClusterPortName: clusterPortName, } -// define the service discovery -serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions) + +// instantiate the k8 discovery provider +disco := kubernetes.NewDiscovery(&config) + // pass the service discovery when enabling cluster mode in the actor system ``` @@ -474,17 +461,20 @@ const ( applicationName = "accounts" actorSystemName = "AccountsSystem" ) -// instantiate the NATS discovery provider -disco := nats.NewDiscovery() // define the discovery options -discoOptions := discovery.Config{ +config := nats.Config{ ApplicationName: applicationName, ActorSystemName: actorSystemName, NatsServer: natsServer, NatsSubject: natsSubject, } -// define the service discovery -serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions) + +// define the host node instance +hostNode := discovery.Node{} + +// instantiate the NATS discovery provider by passing the config and the hostNode +disco := nats.NewDiscovery(&config, &hostNode) + // pass the service discovery when enabling cluster mode in the actor system ``` @@ -500,15 +490,14 @@ To use the DNS discovery provider one needs to provide the following: ```go const domainName = "accounts" -// instantiate the dnssd discovery provider -disco := dnssd.NewDiscovery() // define the discovery options -discoOptions := discovery.Config{ +config := dnssd.Config{ dnssd.DomainName: domainName, dnssd.IPv6: false, } -// define the service discovery -serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions +// instantiate the dnssd discovery provider +disco := dnssd.NewDiscovery(&config) + // pass the service discovery when enabling cluster mode in the actor system ``` diff --git a/actors/actor_system.go b/actors/actor_system.go index 1bdb5c5a..49f5ea05 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -177,12 +177,15 @@ type actorSystem struct { // convenient field to check cluster setup clusterEnabled atomic.Bool // cluster discovery method - serviceDiscovery *discovery.ServiceDiscovery + discoveryProvider discovery.Provider // define the number of partitions to shard the actors in the cluster partitionsCount uint64 // cluster mode - cluster cluster.Interface - clusterChan chan *internalpb.WireActor + cluster cluster.Interface + clusterChan chan *internalpb.WireActor + clusterPort int + gossipPort int + partitionHasher hash.Hasher // help protect some the fields to set @@ -748,6 +751,10 @@ func (x *actorSystem) Start(ctx context.Context) error { x.started.Store(true) + if x.remotingEnabled.Load() { + x.enableRemoting(spanCtx) + } + if x.clusterEnabled.Load() { if err := x.enableClustering(spanCtx); err != nil { return err @@ -757,10 +764,6 @@ func (x *actorSystem) Start(ctx context.Context) error { // go x.runClusterSync() } - if x.remotingEnabled.Load() { - x.enableRemoting(spanCtx) - } - x.scheduler.Start(spanCtx) go x.housekeeper() @@ -1067,8 +1070,22 @@ func (x *actorSystem) handleRemoteTell(ctx context.Context, to PID, message prot func (x *actorSystem) enableClustering(ctx context.Context) error { x.logger.Info("enabling clustering...") + if !x.remotingEnabled.Load() { + x.logger.Error("clustering needs remoting to be enabled") + return errors.New("clustering needs remoting to be enabled") + } + + hostNode := discovery.Node{ + Name: x.name, + Host: x.remotingHost, + GossipPort: x.gossipPort, + ClusterPort: x.clusterPort, + RemotingPort: int(x.remotingPort), + } + cluster, err := cluster.NewNode(x.Name(), - x.serviceDiscovery, + x.discoveryProvider, + &hostNode, cluster.WithLogger(x.logger), cluster.WithPartitionsCount(x.partitionsCount), cluster.WithHasher(x.partitionHasher), @@ -1096,8 +1113,6 @@ func (x *actorSystem) enableClustering(ctx context.Context) error { x.mutex.Lock() x.cluster = cluster x.eventsChan = cluster.Events() - x.remotingHost = cluster.NodeHost() - x.remotingPort = int32(cluster.NodeRemotingPort()) x.mutex.Unlock() go x.broadcastClusterEvents() diff --git a/actors/actor_system_test.go b/actors/actor_system_test.go index e66270ee..feecd719 100644 --- a/actors/actor_system_test.go +++ b/actors/actor_system_test.go @@ -45,7 +45,6 @@ import ( "go.uber.org/atomic" "google.golang.org/protobuf/proto" - "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/goaktpb" "github.com/tochemey/goakt/log" clustermocks "github.com/tochemey/goakt/mocks/cluster" @@ -168,19 +167,8 @@ func TestActorSystem(t *testing.T) { remotingPort := nodePorts[2] logger := log.New(log.DebugLevel, os.Stdout) - - podName := "pod" host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", podName) - t.Setenv("NODE_IP", host) - t.Setenv("GRPC_GO_LOG_VERBOSITY_LEVEL", "99") - t.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "info") - // define discovered addresses addrs := []string{ net.JoinHostPort(host, strconv.Itoa(gossipPort)), @@ -188,21 +176,19 @@ func TestActorSystem(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() - sd := discovery.NewServiceDiscovery(provider, config) newActorSystem, err := NewActorSystem( "test", WithPassivationDisabled(), WithLogger(logger), WithReplyTimeout(time.Minute), - WithClustering(sd, 9)) + WithRemoting(host, int32(remotingPort)), + WithClustering(provider, 9, gossipPort, clusterPort)) require.NoError(t, err) provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) @@ -594,7 +580,6 @@ func TestActorSystem(t *testing.T) { err = newActorSystem.Start(ctx) require.NoError(t, err) - // wait for the cluster to fully start time.Sleep(time.Second) actorName := "some-actor" @@ -947,19 +932,8 @@ func TestActorSystem(t *testing.T) { remotingPort := nodePorts[2] logger := log.New(log.DebugLevel, os.Stdout) - - podName := "pod" host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", podName) - t.Setenv("NODE_IP", host) - t.Setenv("GRPC_GO_LOG_VERBOSITY_LEVEL", "99") - t.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "info") - // define discovered addresses addrs := []string{ net.JoinHostPort(host, strconv.Itoa(gossipPort)), @@ -967,21 +941,19 @@ func TestActorSystem(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() - sd := discovery.NewServiceDiscovery(provider, config) newActorSystem, err := NewActorSystem( "test", WithExpireActorAfter(passivateAfter), WithLogger(logger), WithReplyTimeout(time.Minute), - WithClustering(sd, 9)) + WithRemoting(host, int32(remotingPort)), + WithClustering(provider, 9, gossipPort, clusterPort)) require.NoError(t, err) provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) @@ -1185,19 +1157,8 @@ func TestActorSystem(t *testing.T) { remotingPort := nodePorts[2] logger := log.New(log.DebugLevel, os.Stdout) - - podName := "pod" host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", podName) - t.Setenv("NODE_IP", host) - t.Setenv("GRPC_GO_LOG_VERBOSITY_LEVEL", "99") - t.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "info") - // define discovered addresses addrs := []string{ net.JoinHostPort(host, strconv.Itoa(gossipPort)), @@ -1205,21 +1166,19 @@ func TestActorSystem(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() - sd := discovery.NewServiceDiscovery(provider, config) newActorSystem, err := NewActorSystem( "test", WithPassivationDisabled(), WithLogger(logger), WithReplyTimeout(time.Minute), - WithClustering(sd, 9)) + WithRemoting(host, int32(remotingPort)), + WithClustering(provider, 9, gossipPort, clusterPort)) require.NoError(t, err) provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) @@ -1421,7 +1380,7 @@ func TestActorSystem(t *testing.T) { }) }) - t.Run("With cluster start failure", func(t *testing.T) { + t.Run("With cluster start failure with remoting not enabled", func(t *testing.T) { ctx := context.TODO() logger := log.DiscardLogger mockedCluster := new(clustermocks.Interface) @@ -1430,22 +1389,22 @@ func TestActorSystem(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() - sd := discovery.NewServiceDiscovery(provider, config) + provider.EXPECT().ID().Return("id") system := &actorSystem{ - name: "testSystem", - logger: logger, - cluster: mockedCluster, - clusterEnabled: *atomic.NewBool(true), - telemetry: telemetry.New(), - mutex: sync.Mutex{}, - tracer: noop.NewTracerProvider().Tracer("testSystem"), - scheduler: newScheduler(logger, time.Second, withSchedulerCluster(mockedCluster)), - serviceDiscovery: sd, + name: "testSystem", + logger: logger, + cluster: mockedCluster, + clusterEnabled: *atomic.NewBool(true), + telemetry: telemetry.New(), + mutex: sync.Mutex{}, + tracer: noop.NewTracerProvider().Tracer("testSystem"), + scheduler: newScheduler(logger, time.Second, withSchedulerCluster(mockedCluster)), + discoveryProvider: provider, } err := system.Start(ctx) require.Error(t, err) + assert.EqualError(t, err, "clustering needs remoting to be enabled") }) } diff --git a/actors/actor_test.go b/actors/actor_test.go index ca075d37..252f2051 100644 --- a/actors/actor_test.go +++ b/actors/actor_test.go @@ -27,7 +27,6 @@ package actors import ( "context" "os" - "strconv" "sync" "testing" "time" @@ -429,30 +428,28 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, // create a Cluster startNode host := "127.0.0.1" - // set the environments - require.NoError(t, os.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort))) - require.NoError(t, os.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort))) - require.NoError(t, os.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort))) - require.NoError(t, os.Setenv("NODE_NAME", nodeName)) - require.NoError(t, os.Setenv("NODE_IP", host)) - // create the various config option applicationName := "accounts" actorSystemName := "testSystem" natsSubject := "some-subject" - // create the instance of provider - provider := nats.NewDiscovery() - // create the config - config := discovery.Config{ - nats.ApplicationName: applicationName, - nats.ActorSystemName: actorSystemName, - nats.NatsServer: serverAddr, - nats.NatsSubject: natsSubject, + config := nats.Config{ + ApplicationName: applicationName, + ActorSystemName: actorSystemName, + NatsServer: serverAddr, + NatsSubject: natsSubject, + } + + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, } - // create the sd - sd := discovery.NewServiceDiscovery(provider, config) + // create the instance of provider + provider := nats.NewDiscovery(&config, &hostNode) // create the actor system system, err := NewActorSystem( @@ -460,21 +457,14 @@ func startClusterSystem(t *testing.T, nodeName, serverAddr string) (ActorSystem, WithPassivationDisabled(), WithLogger(logger), WithReplyTimeout(time.Minute), - WithClustering(sd, 10)) + WithRemoting(host, int32(remotingPort)), + WithClustering(provider, 10, gossipPort, clusterPort)) require.NotNil(t, system) require.NoError(t, err) // start the node require.NoError(t, system.Start(ctx)) - - // clear the env var - require.NoError(t, os.Unsetenv("GOSSIP_PORT")) - require.NoError(t, os.Unsetenv("CLUSTER_PORT")) - require.NoError(t, os.Unsetenv("REMOTING_PORT")) - require.NoError(t, os.Unsetenv("NODE_NAME")) - require.NoError(t, os.Unsetenv("NODE_IP")) - time.Sleep(2 * time.Second) // return the cluster startNode diff --git a/actors/option.go b/actors/option.go index 91a91d51..5a27c20f 100644 --- a/actors/option.go +++ b/actors/option.go @@ -113,14 +113,14 @@ func WithRemoting(host string, port int32) Option { }) } -// WithClustering enables clustering on the actor system. This enables remoting on the actor system as well -// and set the remotingHost to the cluster node host when the cluster is fully enabled. -func WithClustering(serviceDiscovery *discovery.ServiceDiscovery, partitionCount uint64) Option { +// WithClustering enables clustering on the actor system. +func WithClustering(discoveryProvider discovery.Provider, partitionCount uint64, gossipPort, clusterPort int) Option { return OptionFunc(func(a *actorSystem) { a.clusterEnabled.Store(true) - a.remotingEnabled.Store(true) a.partitionsCount = partitionCount - a.serviceDiscovery = serviceDiscovery + a.discoveryProvider = discoveryProvider + a.clusterPort = clusterPort + a.gossipPort = gossipPort }) } diff --git a/discovery/config.go b/discovery/config.go deleted file mode 100644 index d4226937..00000000 --- a/discovery/config.go +++ /dev/null @@ -1,108 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022-2024 Tochemey - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package discovery - -import ( - "errors" - "fmt" - "strconv" -) - -// Config represents the meta information to pass to the discovery engine -type Config map[string]any - -// NewConfig initializes meta -func NewConfig() Config { - return Config{} -} - -// GetString returns the string value of a given key which value is a string -// If the key value is not a string then an error is return -func (m Config) GetString(key string) (string, error) { - val, ok := m[key] - if !ok { - return "", fmt.Errorf("key=%s not found", key) - } - - switch x := val.(type) { - case string: - return x, nil - default: - return "", errors.New("the key value is not a string") - } -} - -// GetInt returns the int value of a given key which value is an integer -// If the key value is not an integer then an error is return -func (m Config) GetInt(key string) (int, error) { - val, ok := m[key] - if !ok { - return 0, fmt.Errorf("key=%s not found", key) - } - - switch x := val.(type) { - case int: - return x, nil - default: - // maybe it is string integer - return strconv.Atoi(val.(string)) - } -} - -// GetBool returns the int value of a given key which value is a boolean -// If the key value is not a boolean then an error is return -func (m Config) GetBool(key string) (*bool, error) { - val, ok := m[key] - if !ok { - return nil, fmt.Errorf("key=%s not found", key) - } - - switch x := val.(type) { - case bool: - return &x, nil - default: - res, err := strconv.ParseBool(val.(string)) - if err != nil { - return nil, err - } - return &res, nil - } -} - -// GetMapString returns the map of string value of a given key which value is a map of string -// Map of string means that the map key value pair are both string -func (m Config) GetMapString(key string) (map[string]string, error) { - val, ok := m[key] - if !ok { - return nil, fmt.Errorf("key=%s not found", key) - } - - switch x := val.(type) { - case map[string]string: - return x, nil - default: - return nil, errors.New("the key value is not a map[string]string") - } -} diff --git a/discovery/config_test.go b/discovery/config_test.go deleted file mode 100644 index 1e4dd18b..00000000 --- a/discovery/config_test.go +++ /dev/null @@ -1,204 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022-2024 Tochemey - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package discovery - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGetString(t *testing.T) { - t.Run("With happy path", func(t *testing.T) { - meta := Config{ - "key-1": "value-1", - "key-2": "value-2", - } - key := "key-1" - actual, err := meta.GetString(key) - assert.NoError(t, err) - assert.NotEmpty(t, actual) - expected := "value-1" - assert.Equal(t, expected, actual) - }) - t.Run("With key not found", func(t *testing.T) { - meta := Config{ - "key-1": "value-1", - "key-2": "value-2", - } - key := "key-3" - actual, err := meta.GetString(key) - assert.Error(t, err) - assert.EqualError(t, err, "key=key-3 not found") - assert.Empty(t, actual) - }) - t.Run("With key value not of a type string", func(t *testing.T) { - meta := Config{ - "key-1": "value-1", - "key-2": 13, - } - key := "key-2" - actual, err := meta.GetString(key) - assert.Error(t, err) - assert.EqualError(t, err, "the key value is not a string") - assert.Empty(t, actual) - }) -} - -func TestGetMapString(t *testing.T) { - t.Run("With happy path", func(t *testing.T) { - meta := Config{ - "key-1": map[string]string{ - "key-11": "value-11", - "key-12": "value-12", - }, - } - key := "key-1" - actual, err := meta.GetMapString(key) - assert.NoError(t, err) - assert.NotNil(t, actual) - expected := map[string]string{ - "key-11": "value-11", - "key-12": "value-12", - } - assert.Equal(t, expected, actual) - }) - t.Run("With key not found", func(t *testing.T) { - meta := Config{ - "key-1": map[string]string{ - "key-11": "value-11", - "key-12": "value-12", - }, - } - key := "key-3" - actual, err := meta.GetMapString(key) - assert.Error(t, err) - assert.EqualError(t, err, "key=key-3 not found") - assert.Empty(t, actual) - }) - t.Run("With key value not of a type map[string]string", func(t *testing.T) { - meta := Config{ - "key-2": 13, - } - key := "key-2" - actual, err := meta.GetMapString(key) - assert.Error(t, err) - assert.EqualError(t, err, "the key value is not a map[string]string") - assert.Empty(t, actual) - }) -} - -func TestGetInt(t *testing.T) { - t.Run("With happy path", func(t *testing.T) { - meta := Config{ - "key-1": 20, - "key-2": 30, - } - key := "key-1" - actual, err := meta.GetInt(key) - assert.NoError(t, err) - assert.NotZero(t, actual) - expected := 20 - assert.EqualValues(t, expected, actual) - }) - t.Run("With key not found", func(t *testing.T) { - meta := Config{ - "key-1": 20, - "key-2": 30, - } - key := "key-3" - actual, err := meta.GetInt(key) - assert.Error(t, err) - assert.EqualError(t, err, "key=key-3 not found") - assert.Zero(t, actual) - }) - t.Run("With key value not an int", func(t *testing.T) { - meta := Config{ - "key-1": "a", - "key-2": 30, - } - key := "key-1" - actual, err := meta.GetInt(key) - assert.Error(t, err) - assert.Zero(t, actual) - }) - t.Run("With key value a string int", func(t *testing.T) { - meta := Config{ - "key-1": "20", - "key-2": 30, - } - key := "key-1" - actual, err := meta.GetInt(key) - assert.NoError(t, err) - assert.NotZero(t, actual) - expected := 20 - assert.EqualValues(t, expected, actual) - }) -} - -func TestGetBool(t *testing.T) { - t.Run("With happy path", func(t *testing.T) { - meta := Config{ - "key-1": true, - "key-2": 30, - } - key := "key-1" - actual, err := meta.GetBool(key) - assert.NoError(t, err) - assert.NotNil(t, actual) - assert.True(t, *actual) - }) - t.Run("With key not found", func(t *testing.T) { - meta := Config{ - "key-1": 20, - "key-2": 30, - } - key := "key-3" - actual, err := meta.GetBool(key) - assert.Error(t, err) - assert.EqualError(t, err, "key=key-3 not found") - assert.Nil(t, actual) - }) - t.Run("With key value not an boolean", func(t *testing.T) { - meta := Config{ - "key-1": "a", - "key-2": 30, - } - key := "key-1" - actual, err := meta.GetBool(key) - assert.Error(t, err) - assert.Nil(t, actual) - }) - t.Run("With key value a string boolean", func(t *testing.T) { - meta := Config{ - "key-1": "TRUE", - "key-2": 30, - } - key := "key-1" - actual, err := meta.GetBool(key) - assert.NoError(t, err) - assert.True(t, *actual) - }) -} diff --git a/discovery/dnssd/config.go b/discovery/dnssd/config.go new file mode 100644 index 00000000..d64e8335 --- /dev/null +++ b/discovery/dnssd/config.go @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package dnssd + +import "github.com/tochemey/goakt/internal/validation" + +// Config defines the discovery configuration +type Config struct { + // Domain specifies the dns name + DomainName string + // IPv6 states whether to fetch ipv6 address instead of ipv4 + // if it is false then all addresses are extracted + IPv6 *bool +} + +// Validate checks whether the given discovery configuration is valid +func (x Config) Validate() error { + return validation.New(validation.FailFast()). + AddValidator(validation.NewEmptyStringValidator("Namespace", x.DomainName)). + Validate() +} diff --git a/discovery/dnssd/config_test.go b/discovery/dnssd/config_test.go new file mode 100644 index 00000000..9315e487 --- /dev/null +++ b/discovery/dnssd/config_test.go @@ -0,0 +1,46 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package dnssd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + t.Run("With valid configuration", func(t *testing.T) { + config := &Config{ + DomainName: "google.com", + } + assert.NoError(t, config.Validate()) + }) + t.Run("With invalid configuration", func(t *testing.T) { + config := &Config{ + DomainName: "", + } + assert.Error(t, config.Validate()) + }) +} diff --git a/discovery/dnssd/discovery.go b/discovery/dnssd/discovery.go index 105af66d..8dab2f37 100644 --- a/discovery/dnssd/discovery.go +++ b/discovery/dnssd/discovery.go @@ -41,21 +41,12 @@ const ( IPv6 = "ipv6" ) -// discoConfig represents the discovery configuration -type discoConfig struct { - // Domain specifies the dns name - Domain string - // IPv6 states whether to fetch ipv6 address instead of ipv4 - // if it is false then all addresses are extracted - IPv6 *bool -} - // Discovery represents the DNS service discovery // IP addresses are looked up by querying the default // DNS resolver for A and AAAA records associated with the DNS name. type Discovery struct { mu sync.Mutex - config *discoConfig + config *Config // states whether the actor system has started or not initialized *atomic.Bool @@ -65,10 +56,10 @@ type Discovery struct { var _ discovery.Provider = &Discovery{} // NewDiscovery returns an instance of the DNS discovery provider -func NewDiscovery() *Discovery { +func NewDiscovery(config *Config) *Discovery { return &Discovery{ mu: sync.Mutex{}, - config: &discoConfig{}, + config: config, initialized: atomic.NewBool(false), } } @@ -86,7 +77,7 @@ func (d *Discovery) Initialize() error { return discovery.ErrAlreadyInitialized } - return nil + return d.config.Validate() } // Register registers this node to a service discovery directory. @@ -114,34 +105,6 @@ func (d *Discovery) Deregister() error { return nil } -// SetConfig registers the underlying discovery configuration -func (d *Discovery) SetConfig(config discovery.Config) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.initialized.Load() { - return discovery.ErrAlreadyInitialized - } - - discoConfig := new(discoConfig) - var err error - discoConfig.Domain, err = config.GetString(DomainName) - if err != nil { - return err - } - if discoConfig.Domain == "" { - return errors.New("dns name not set") - } - - discoConfig.IPv6, err = config.GetBool(IPv6) - if err != nil { - return err - } - - d.config = discoConfig - return nil -} - // DiscoverPeers returns a list of known nodes. func (d *Discovery) DiscoverPeers() ([]string, error) { d.mu.Lock() @@ -164,7 +127,7 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { // only extract ipv6 if v6 { - ips, err := net.DefaultResolver.LookupIP(ctx, "ip6", d.config.Domain) + ips, err := net.DefaultResolver.LookupIP(ctx, "ip6", d.config.DomainName) if err != nil { return nil, err } @@ -180,7 +143,7 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { } // lookup the addresses based upon the dns name - addrs, err := net.DefaultResolver.LookupIPAddr(ctx, d.config.Domain) + addrs, err := net.DefaultResolver.LookupIPAddr(ctx, d.config.DomainName) if err != nil { return nil, err } diff --git a/discovery/dnssd/discovery_test.go b/discovery/dnssd/discovery_test.go index cc71b59e..6462ec3d 100644 --- a/discovery/dnssd/discovery_test.go +++ b/discovery/dnssd/discovery_test.go @@ -40,7 +40,7 @@ import ( func TestDiscovery(t *testing.T) { t.Run("With a new instance", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) require.NotNil(t, provider) // assert that provider implements the Discovery interface // this is a cheap test @@ -53,104 +53,36 @@ func TestDiscovery(t *testing.T) { t.Run("With ID assertion", func(t *testing.T) { // cheap test // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) require.NotNil(t, provider) assert.Equal(t, "dns-sd", provider.ID()) }) - t.Run("With SetConfig", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - DomainName: "google.com", - IPv6: false, - } - // set config - assert.NoError(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig dns name not set", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - DomainName: "", - IPv6: false, - } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig dns name not provided", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - IPv6: false, - } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig ipv6 not provided", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() + t.Run("With Initialize", func(t *testing.T) { // create the config - config := discovery.Config{ + config := &Config{ DomainName: "google.com", } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig ipv6 wrong boolean value", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - DomainName: "google.com", - IPv6: "wrong-value", - } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig: already initialized", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - provider.initialized = atomic.NewBool(true) - // create the config - config := discovery.Config{ - DomainName: "google.com", - IPv6: false, - } - // set config - err := provider.SetConfig(config) - assert.Error(t, err) - assert.EqualError(t, err, discovery.ErrAlreadyInitialized.Error()) + provider := NewDiscovery(config) + assert.NoError(t, provider.Initialize()) }) - t.Run("With Initialize", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() + t.Run("With Initialize: already initialized", func(t *testing.T) { // create the config - config := discovery.Config{ + config := &Config{ DomainName: "google.com", - IPv6: false, } - // set config - assert.NoError(t, provider.SetConfig(config)) - assert.NoError(t, provider.Initialize()) - }) - t.Run("With Initialize: already initialized", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(config) provider.initialized = atomic.NewBool(true) assert.Error(t, provider.Initialize()) }) t.Run("With Register", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := &Config{ DomainName: "google.com", - IPv6: false, } - require.NoError(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(config) require.NoError(t, provider.Initialize()) require.NoError(t, provider.Register()) @@ -161,14 +93,12 @@ func TestDiscovery(t *testing.T) { assert.False(t, provider.initialized.Load()) }) t.Run("With Register when already registered", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := &Config{ DomainName: "google.com", - IPv6: false, } - require.NoError(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(config) require.NoError(t, provider.Initialize()) require.NoError(t, provider.Register()) @@ -182,15 +112,23 @@ func TestDiscovery(t *testing.T) { assert.False(t, provider.initialized.Load()) }) t.Run("With Deregister", func(t *testing.T) { + // create the config + config := &Config{ + DomainName: "google.com", + } // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(config) // for the sake of the test provider.initialized = atomic.NewBool(true) assert.NoError(t, provider.Deregister()) }) t.Run("With Deregister when not initialized", func(t *testing.T) { + // create the config + config := &Config{ + DomainName: "google.com", + } // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(config) // for the sake of the test provider.initialized = atomic.NewBool(false) err := provider.Deregister() @@ -198,20 +136,25 @@ func TestDiscovery(t *testing.T) { assert.EqualError(t, err, discovery.ErrNotInitialized.Error()) }) t.Run("With DiscoverPeers: not initialized", func(t *testing.T) { - provider := NewDiscovery() + // create the config + config := &Config{ + DomainName: "google.com", + } + // create the instance of provider + provider := NewDiscovery(config) peers, err := provider.DiscoverPeers() assert.Error(t, err) assert.Empty(t, peers) assert.EqualError(t, err, discovery.ErrNotInitialized.Error()) }) t.Run("With DiscoverPeers", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - config := discovery.Config{ + // create the config + config := &Config{ DomainName: "google.com", - IPv6: false, } - require.NoError(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(config) + require.NoError(t, provider.Initialize()) require.NoError(t, provider.Register()) diff --git a/discovery/errors.go b/discovery/errors.go index 283f30b9..5e6e0ee0 100644 --- a/discovery/errors.go +++ b/discovery/errors.go @@ -35,4 +35,6 @@ var ( ErrAlreadyRegistered = errors.New("provider already registered") // ErrNotRegistered is used when attempting to de-register the provider ErrNotRegistered = errors.New("provider is not registered") + // ErrInvalidConfig is used when the discovery provider configuration is invalid + ErrInvalidConfig = errors.New("invalid discovery provider configuration") ) diff --git a/discovery/kubernetes/config.go b/discovery/kubernetes/config.go new file mode 100644 index 00000000..b04542ad --- /dev/null +++ b/discovery/kubernetes/config.go @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package kubernetes + +import ( + "github.com/tochemey/goakt/internal/validation" +) + +// Config defines the kubernetes discovery configuration +type Config struct { + // Namespace specifies the kubernetes namespace + Namespace string + // ApplicationName specifies the application name + ApplicationName string + // GossipPortName specifies the gossip port name + GossipPortName string + // RemotingPortName specifies the remoting port name + RemotingPortName string + // ClusterPortName specifies the cluster port name + ClusterPortName string + // ActorSystemName specifies the given actor system name + ActorSystemName string +} + +// Validate checks whether the given discovery configuration is valid +func (x Config) Validate() error { + return validation.New(validation.FailFast()). + AddValidator(validation.NewEmptyStringValidator("Namespace", x.Namespace)). + AddValidator(validation.NewEmptyStringValidator("ApplicationName", x.ApplicationName)). + AddValidator(validation.NewEmptyStringValidator("GossipPortName", x.GossipPortName)). + AddValidator(validation.NewEmptyStringValidator("ClusterPortName", x.ClusterPortName)). + AddValidator(validation.NewEmptyStringValidator("RemotingPortName", x.RemotingPortName)). + AddValidator(validation.NewEmptyStringValidator("ActorSystemName", x.ActorSystemName)). + Validate() +} diff --git a/discovery/kubernetes/config_test.go b/discovery/kubernetes/config_test.go new file mode 100644 index 00000000..5683ff94 --- /dev/null +++ b/discovery/kubernetes/config_test.go @@ -0,0 +1,56 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package kubernetes + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + t.Run("With valid configuration", func(t *testing.T) { + config := &Config{ + Namespace: "namespace", + ApplicationName: "applicationName", + GossipPortName: "gossipName", + RemotingPortName: "remotingName", + ClusterPortName: "clusterPortName", + ActorSystemName: "actorSys", + } + assert.NoError(t, config.Validate()) + }) + t.Run("With invalid configuration", func(t *testing.T) { + config := &Config{ + Namespace: "namespace", + ApplicationName: "applicationName", + GossipPortName: "", + RemotingPortName: "remotingName", + ClusterPortName: "clusterPortName", + ActorSystemName: "actorSys", + } + assert.Error(t, config.Validate()) + }) +} diff --git a/discovery/kubernetes/discovery.go b/discovery/kubernetes/discovery.go index 1a1bdf4c..0d01af89 100644 --- a/discovery/kubernetes/discovery.go +++ b/discovery/kubernetes/discovery.go @@ -43,30 +43,9 @@ import ( "github.com/tochemey/goakt/discovery" ) -const ( - Namespace string = "namespace" // Namespace specifies the kubernetes namespace - ActorSystemName = "actor_system_name" // ActorSystemName specifies the actor system name - ApplicationName = "app_name" // ApplicationName specifies the application name. This often matches the actor system name - GossipPortName = "gossip-port" - ClusterPortName = "cluster-port" - RemotingPortName = "remoting-port" -) - -// discoConfig represents the kubernetes provider discoConfig -type discoConfig struct { - // Provider specifies the provider name - Provider string - // NameSpace specifies the namespace - NameSpace string - // The actor system name - ActorSystemName string - // ApplicationName specifies the running application - ApplicationName string -} - // Discovery represents the kubernetes discovery type Discovery struct { - option *discoConfig + config *Config client kubernetes.Interface mu sync.Mutex @@ -79,13 +58,13 @@ type Discovery struct { var _ discovery.Provider = &Discovery{} // NewDiscovery returns an instance of the kubernetes discovery provider -func NewDiscovery() *Discovery { +func NewDiscovery(config *Config) *Discovery { // create an instance of discovery := &Discovery{ mu: sync.Mutex{}, stopChan: make(chan struct{}, 1), initialized: atomic.NewBool(false), - option: &discoConfig{}, + config: config, } return discovery @@ -105,23 +84,7 @@ func (d *Discovery) Initialize() error { return discovery.ErrAlreadyInitialized } - if d.option.Provider == "" { - d.option.Provider = d.ID() - } - - return nil -} - -// SetConfig registers the underlying discovery configuration -func (d *Discovery) SetConfig(meta discovery.Config) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.initialized.Load() { - return discovery.ErrAlreadyInitialized - } - - return d.setConfig(meta) + return d.config.Validate() } // Register registers this node to a service discovery directory. @@ -168,16 +131,15 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { } // let us create the pod labels map - // TODO: make sure to document it on k8 discovery podLabels := map[string]string{ - "app.kubernetes.io/part-of": d.option.ActorSystemName, - "app.kubernetes.io/component": d.option.ApplicationName, // TODO: redefine it - "app.kubernetes.io/name": d.option.ApplicationName, + "app.kubernetes.io/part-of": d.config.ActorSystemName, + "app.kubernetes.io/component": d.config.ApplicationName, + "app.kubernetes.io/name": d.config.ApplicationName, } ctx := context.Background() - pods, err := d.client.CoreV1().Pods(d.option.NameSpace).List(ctx, metav1.ListOptions{ + pods, err := d.client.CoreV1().Pods(d.config.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: labels.SelectorFromSet(podLabels).String(), }) @@ -185,7 +147,7 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { return nil, err } - validPortNames := []string{ClusterPortName, GossipPortName, RemotingPortName} + validPortNames := []string{d.config.ClusterPortName, d.config.GossipPortName, d.config.RemotingPortName} // define the addresses list addresses := goset.NewSet[string]() @@ -212,7 +174,7 @@ MainLoop: continue } - if port.Name == GossipPortName { + if port.Name == d.config.GossipPortName { addresses.Add(net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(int(port.ContainerPort)))) } } @@ -225,27 +187,3 @@ MainLoop: func (d *Discovery) Close() error { return nil } - -// setConfig sets the kubernetes discoConfig -func (d *Discovery) setConfig(config discovery.Config) (err error) { - option := new(discoConfig) - - option.NameSpace, err = config.GetString(Namespace) - if err != nil { - return err - } - - option.ActorSystemName, err = config.GetString(ActorSystemName) - if err != nil { - return err - } - - option.ApplicationName, err = config.GetString(ApplicationName) - - if err != nil { - return err - } - - d.option = option - return nil -} diff --git a/discovery/kubernetes/discovery_test.go b/discovery/kubernetes/discovery_test.go index e47c7864..93fd1f3a 100644 --- a/discovery/kubernetes/discovery_test.go +++ b/discovery/kubernetes/discovery_test.go @@ -39,10 +39,16 @@ import ( "github.com/tochemey/goakt/discovery" ) +const ( + gossipPortName = "gossip-port" + clusterPortName = "cluster-port" + remotingPortName = "remoting-port" +) + func TestDiscovery(t *testing.T) { t.Run("With new instance", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) require.NotNil(t, provider) // assert that provider implements the Discovery interface // this is a cheap test @@ -55,7 +61,7 @@ func TestDiscovery(t *testing.T) { t.Run("With ID assertion", func(t *testing.T) { // cheap test // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) require.NotNil(t, provider) assert.Equal(t, "kubernetes", provider.ID()) }) @@ -67,6 +73,15 @@ func TestDiscovery(t *testing.T) { ts1 := time.Now() ts2 := time.Now() + config := &Config{ + Namespace: "test", + ActorSystemName: "test", + ApplicationName: "test", + GossipPortName: gossipPortName, + RemotingPortName: remotingPortName, + ClusterPortName: clusterPortName, + } + // create some bunch of mock pods pods := []runtime.Object{ &corev1.Pod{ @@ -84,15 +99,15 @@ func TestDiscovery(t *testing.T) { { Ports: []corev1.ContainerPort{ { - Name: GossipPortName, + Name: gossipPortName, ContainerPort: 3379, }, { - Name: ClusterPortName, + Name: clusterPortName, ContainerPort: 3380, }, { - Name: RemotingPortName, + Name: remotingPortName, ContainerPort: 9000, }, }, @@ -128,15 +143,15 @@ func TestDiscovery(t *testing.T) { { Ports: []corev1.ContainerPort{ { - Name: GossipPortName, + Name: gossipPortName, ContainerPort: 3379, }, { - Name: ClusterPortName, + Name: clusterPortName, ContainerPort: 3380, }, { - Name: RemotingPortName, + Name: remotingPortName, ContainerPort: 9000, }, }, @@ -164,11 +179,7 @@ func TestDiscovery(t *testing.T) { provider := Discovery{ client: client, initialized: atomic.NewBool(true), - option: &discoConfig{ - NameSpace: ns, - ActorSystemName: actorSystemName, - ApplicationName: appName, - }, + config: config, } // discover some nodes actual, err := provider.DiscoverPeers() @@ -186,127 +197,47 @@ func TestDiscovery(t *testing.T) { assert.NoError(t, provider.Close()) }) t.Run("With DiscoverPeers: not initialized", func(t *testing.T) { - provider := NewDiscovery() + provider := NewDiscovery(nil) peers, err := provider.DiscoverPeers() assert.Error(t, err) assert.Empty(t, peers) assert.EqualError(t, err, discovery.ErrNotInitialized.Error()) }) - t.Run("With SetConfig", func(t *testing.T) { - // create the various config option - namespace := "default" - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ApplicationName: applicationName, - ActorSystemName: actorSystemName, - Namespace: namespace, - } - - // set config - assert.NoError(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig: already initialized", func(t *testing.T) { - // create the various config option - namespace := "default" - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the instance of provider - provider := NewDiscovery() - provider.initialized = atomic.NewBool(true) - // create the config - config := discovery.Config{ - ApplicationName: applicationName, - ActorSystemName: actorSystemName, - Namespace: namespace, - } - - // set config - err := provider.SetConfig(config) - assert.Error(t, err) - assert.EqualError(t, err, discovery.ErrAlreadyInitialized.Error()) - }) - t.Run("With SetConfig: actor system not set", func(t *testing.T) { + t.Run("With Initialize", func(t *testing.T) { // create the various config option namespace := "default" applicationName := "accounts" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ApplicationName: applicationName, - Namespace: namespace, - } - - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig: application name not set", func(t *testing.T) { - // create the various config option - namespace := "default" actorSystemName := "AccountsSystem" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ActorSystemName: actorSystemName, - Namespace: namespace, - } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig: namespace not set", func(t *testing.T) { - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ApplicationName: applicationName, - ActorSystemName: actorSystemName, + config := &Config{ + Namespace: namespace, + ActorSystemName: actorSystemName, + ApplicationName: applicationName, + GossipPortName: gossipPortName, + RemotingPortName: remotingPortName, + ClusterPortName: clusterPortName, } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With Initialize", func(t *testing.T) { - // create the various config option - namespace := "default" - applicationName := "accounts" - actorSystemName := "AccountsSystem" // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ApplicationName: applicationName, - ActorSystemName: actorSystemName, - Namespace: namespace, - } - - // set config - assert.NoError(t, provider.SetConfig(config)) + provider := NewDiscovery(config) assert.NoError(t, provider.Initialize()) }) t.Run("With Initialize: already initialized", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) provider.initialized = atomic.NewBool(true) assert.Error(t, provider.Initialize()) }) t.Run("With Deregister", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) // for the sake of the test provider.initialized = atomic.NewBool(true) assert.NoError(t, provider.Deregister()) }) t.Run("With Deregister when not initialized", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) // for the sake of the test provider.initialized = atomic.NewBool(false) err := provider.Deregister() diff --git a/discovery/mdns/config.go b/discovery/mdns/config.go new file mode 100644 index 00000000..d9211c4a --- /dev/null +++ b/discovery/mdns/config.go @@ -0,0 +1,50 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package mdns + +import "github.com/tochemey/goakt/internal/validation" + +// Config represents the mDNS provider discoConfig +type Config struct { + // Service specifies the service name + ServiceName string + // Service specifies the service type + Service string + // Specifies the service domain + Domain string + // Port specifies the port the service is listening to + Port int + // IPv6 states whether to fetch ipv6 address instead of ipv4 + IPv6 *bool +} + +// Validate checks whether the given discovery configuration is valid +func (x Config) Validate() error { + return validation.New(validation.FailFast()). + AddValidator(validation.NewEmptyStringValidator("ServiceName", x.ServiceName)). + AddValidator(validation.NewEmptyStringValidator("Service", x.Service)). + AddValidator(validation.NewEmptyStringValidator("Domain", x.Domain)). + Validate() +} diff --git a/discovery/mdns/config_test.go b/discovery/mdns/config_test.go new file mode 100644 index 00000000..74391734 --- /dev/null +++ b/discovery/mdns/config_test.go @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package mdns + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + t.Run("With valid configuration", func(t *testing.T) { + config := &Config{ + ServiceName: "serviceName", + Service: "service", + Domain: "domain", + Port: 1999, + } + assert.NoError(t, config.Validate()) + }) + t.Run("With invalid configuration", func(t *testing.T) { + config := &Config{ + ServiceName: "serviceName", + Service: "", + Domain: "domain", + Port: 1999, + } + assert.Error(t, config.Validate()) + }) +} diff --git a/discovery/mdns/discovery.go b/discovery/mdns/discovery.go index 27f0e5ac..d0b050ed 100644 --- a/discovery/mdns/discovery.go +++ b/discovery/mdns/discovery.go @@ -39,33 +39,9 @@ import ( "github.com/tochemey/goakt/discovery" ) -const ( - ServiceName = "name" - Service = "service" - Domain = "domain" - Port = "port" - IPv6 = "ipv6" -) - -// discoConfig represents the mDNS provider discoConfig -type discoConfig struct { - // Provider specifies the provider name - Provider string - // Service specifies the service name - ServiceName string - // Service specifies the service type - Service string - // Specifies the service domain - Domain string - // Port specifies the port the service is listening to - Port int - // IPv6 states whether to fetch ipv6 address instead of ipv4 - IPv6 *bool -} - // Discovery defines the mDNS discovery provider type Discovery struct { - option *discoConfig + config *Config mu sync.Mutex stopChan chan struct{} @@ -82,12 +58,12 @@ type Discovery struct { var _ discovery.Provider = &Discovery{} // NewDiscovery returns an instance of the mDNS discovery provider -func NewDiscovery() *Discovery { +func NewDiscovery(config *Config) *Discovery { d := &Discovery{ mu: sync.Mutex{}, stopChan: make(chan struct{}, 1), initialized: atomic.NewBool(false), - option: &discoConfig{}, + config: config, } return d @@ -107,12 +83,7 @@ func (d *Discovery) Initialize() error { return discovery.ErrAlreadyInitialized } - // check the options - if d.option.Provider == "" { - d.option.Provider = d.ID() - } - - return nil + return d.config.Validate() } // Register registers this node to a service discovery directory. @@ -131,7 +102,7 @@ func (d *Discovery) Register() error { d.resolver = res - srv, err := zeroconf.Register(d.option.ServiceName, d.option.Service, d.option.Domain, d.option.Port, []string{"txtv=0", "lo=1", "la=2"}, nil) + srv, err := zeroconf.Register(d.config.ServiceName, d.config.Service, d.config.Domain, d.config.Port, []string{"txtv=0", "lo=1", "la=2"}, nil) if err != nil { return err } @@ -166,42 +137,6 @@ func (d *Discovery) Close() error { return nil } -// SetConfig registers the underlying discovery configuration -func (d *Discovery) SetConfig(config discovery.Config) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.initialized.Load() { - return discovery.ErrAlreadyInitialized - } - - var err error - if _, ok := config[ServiceName]; !ok { - return errors.New("mDNS service name is not provided") - } - - if _, ok := config[Service]; !ok { - return errors.New("mDNS service type is not provided") - } - - if _, ok := config[Port]; !ok { - return errors.New("mDNS listening port is not provided") - } - - if _, ok := config[Domain]; !ok { - return errors.New("mDNS domain is not provided") - } - - if _, ok := config[IPv6]; !ok { - return errors.New("mDNS ipv6 option is not provided") - } - - if err = d.setOptions(config); err != nil { - return errors.Wrap(err, "failed to instantiate the mDNS discovery provider") - } - return nil -} - // DiscoverPeers returns a list of known nodes. func (d *Discovery) DiscoverPeers() ([]string, error) { if !d.initialized.Load() { @@ -214,14 +149,14 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := d.resolver.Browse(ctx, d.option.Service, d.option.Domain, entries); err != nil { + if err := d.resolver.Browse(ctx, d.config.Service, d.config.Domain, entries); err != nil { return nil, err } <-ctx.Done() v6 := false - if d.option.IPv6 != nil { - v6 = *d.option.IPv6 + if d.config.IPv6 != nil { + v6 = *d.config.IPv6 } addresses := goset.NewSet[string]() @@ -243,45 +178,10 @@ func (d *Discovery) DiscoverPeers() ([]string, error) { return addresses.ToSlice(), nil } -// setOptions sets the kubernetes discoConfig -func (d *Discovery) setOptions(config discovery.Config) (err error) { - option := new(discoConfig) - - option.ServiceName, err = config.GetString(ServiceName) - if err != nil { - return err - } - - option.Service, err = config.GetString(Service) - if err != nil { - return err - } - - option.Domain, err = config.GetString(Domain) - if err != nil { - return err - } - - option.Port, err = config.GetInt(Port) - if err != nil { - return err - } - - // extract the type of ip address to lookup - option.IPv6, err = config.GetBool(IPv6) - if err != nil { - return err - } - - // in case none of the above extraction fails then set the option - d.option = option - return nil -} - // validateEntry validates the mDNS discovered entry func (d *Discovery) validateEntry(entry *zeroconf.ServiceEntry) bool { - return entry.Port == d.option.Port && - entry.Service == d.option.Service && - entry.Domain == d.option.Domain && - entry.Instance == d.option.ServiceName + return entry.Port == d.config.Port && + entry.Service == d.config.Service && + entry.Domain == d.config.Domain && + entry.Instance == d.config.ServiceName } diff --git a/discovery/mdns/discovery_test.go b/discovery/mdns/discovery_test.go index 9decf8db..847470d6 100644 --- a/discovery/mdns/discovery_test.go +++ b/discovery/mdns/discovery_test.go @@ -25,7 +25,6 @@ package mdns import ( - "strconv" "testing" "time" @@ -40,7 +39,7 @@ import ( func TestDiscovery(t *testing.T) { t.Run("With new instance", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) require.NotNil(t, provider) // assert that provider implements the Discovery interface // this is a cheap test @@ -53,195 +52,69 @@ func TestDiscovery(t *testing.T) { t.Run("With ID assertion", func(t *testing.T) { // cheap test // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(nil) require.NotNil(t, provider) assert.Equal(t, "mdns", provider.ID()) }) - t.Run("With SetConfig", func(t *testing.T) { - // create the various config option - ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) - serviceType := "_workstation._tcp" - serviceName := "AccountsSystem" - domain := "local." - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - Service: serviceType, - ServiceName: serviceName, - Domain: domain, - Port: port, - IPv6: false, - } - // set config - assert.NoError(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig with service name not set", func(t *testing.T) { + t.Run("With Initialize", func(t *testing.T) { // create the various config option ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) + port := ports[0] serviceType := "_workstation._tcp" - domain := "local." - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - Service: serviceType, - Domain: domain, - Port: port, - IPv6: false, - } - - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig with service not set", func(t *testing.T) { - // create the various config option - ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) serviceName := "AccountsSystem" domain := "local." - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ServiceName: serviceName, - Domain: domain, - Port: port, - IPv6: false, - } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig with port not set", func(t *testing.T) { - // create the various config option - serviceType := "_workstation._tcp" - serviceName := "AccountsSystem" - domain := "local." - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := Config{ Service: serviceType, ServiceName: serviceName, Domain: domain, - IPv6: false, - } - - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig with domain not set", func(t *testing.T) { - // create the various config option - ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) - serviceType := "_workstation._tcp" - serviceName := "AccountsSystem" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - Service: serviceType, - ServiceName: serviceName, Port: port, - IPv6: false, } - - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig with ipv6 not set", func(t *testing.T) { - // create the various config option - ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) - serviceType := "_workstation._tcp" - serviceName := "AccountsSystem" - domain := "local." // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - Service: serviceType, - ServiceName: serviceName, - Domain: domain, - Port: port, - } + provider := NewDiscovery(&config) // set config - assert.Error(t, provider.SetConfig(config)) + assert.NoError(t, provider.Initialize()) }) - t.Run("With SetConfig: already initialized", func(t *testing.T) { + t.Run("With Initialize: already initialized", func(t *testing.T) { // create the various config option ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) + port := ports[0] serviceType := "_workstation._tcp" serviceName := "AccountsSystem" domain := "local." - // create the instance of provider - provider := NewDiscovery() - provider.initialized = atomic.NewBool(true) - // create the config - config := discovery.Config{ - Service: serviceType, - ServiceName: serviceName, - Domain: domain, - Port: port, - IPv6: false, - } - // set config - err := provider.SetConfig(config) - assert.Error(t, err) - assert.EqualError(t, err, discovery.ErrAlreadyInitialized.Error()) - }) - t.Run("With Initialize", func(t *testing.T) { - // create the various config option - ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) - serviceType := "_workstation._tcp" - serviceName := "AccountsSystem" - domain := "local." - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := Config{ Service: serviceType, ServiceName: serviceName, Domain: domain, Port: port, - IPv6: false, } - // set config - assert.NoError(t, provider.SetConfig(config)) - assert.NoError(t, provider.Initialize()) - }) - t.Run("With Initialize: already initialized", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(&config) provider.initialized = atomic.NewBool(true) assert.Error(t, provider.Initialize()) }) t.Run("With Register", func(t *testing.T) { // create the various config option ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) + port := ports[0] serviceType := "_workstation._tcp" serviceName := "AccountsSystem" domain := "local" - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := Config{ Service: serviceType, ServiceName: serviceName, Domain: domain, Port: port, - IPv6: false, } - require.NoError(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(&config) + require.NoError(t, provider.Initialize()) require.NoError(t, provider.Register()) @@ -254,21 +127,19 @@ func TestDiscovery(t *testing.T) { t.Run("With Register when already registered", func(t *testing.T) { // create the various config option ports := dynaport.Get(1) - port := strconv.Itoa(ports[0]) + port := ports[0] serviceType := "_workstation._tcp" serviceName := "AccountsSystem" domain := "local" - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := Config{ Service: serviceType, ServiceName: serviceName, Domain: domain, Port: port, - IPv6: false, } - require.NoError(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(&config) require.NoError(t, provider.Initialize()) require.NoError(t, provider.Register()) @@ -282,15 +153,41 @@ func TestDiscovery(t *testing.T) { assert.False(t, provider.initialized.Load()) }) t.Run("With Deregister", func(t *testing.T) { + // create the various config option + ports := dynaport.Get(1) + port := ports[0] + serviceType := "_workstation._tcp" + serviceName := "AccountsSystem" + domain := "local" + // create the config + config := Config{ + Service: serviceType, + ServiceName: serviceName, + Domain: domain, + Port: port, + } // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(&config) // for the sake of the test provider.initialized = atomic.NewBool(true) assert.NoError(t, provider.Deregister()) }) t.Run("With Deregister when not initialized", func(t *testing.T) { + // create the various config option + ports := dynaport.Get(1) + port := ports[0] + serviceType := "_workstation._tcp" + serviceName := "AccountsSystem" + domain := "local" + // create the config + config := Config{ + Service: serviceType, + ServiceName: serviceName, + Domain: domain, + Port: port, + } // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(&config) // for the sake of the test provider.initialized = atomic.NewBool(false) err := provider.Deregister() @@ -304,17 +201,17 @@ func TestDiscovery(t *testing.T) { service := "_workstation._tcp" serviceName := "AccountsSystem" domain := "local." - // create the instance of provider - provider := NewDiscovery() + // create the config - config := discovery.Config{ + config := Config{ Service: service, ServiceName: serviceName, Domain: domain, Port: port, - IPv6: false, } - require.NoError(t, provider.SetConfig(config)) + + // create the instance of provider + provider := NewDiscovery(&config) require.NoError(t, provider.Initialize()) require.NoError(t, provider.Register()) @@ -330,7 +227,23 @@ func TestDiscovery(t *testing.T) { assert.NoError(t, provider.Deregister()) }) t.Run("With DiscoverPeers: not initialized", func(t *testing.T) { - provider := NewDiscovery() + // create the various config option + ports := dynaport.Get(1) + port := ports[0] + service := "_workstation._tcp" + serviceName := "AccountsSystem" + domain := "local." + + // create the config + config := Config{ + Service: service, + ServiceName: serviceName, + Domain: domain, + Port: port, + } + + // create the instance of provider + provider := NewDiscovery(&config) peers, err := provider.DiscoverPeers() assert.Error(t, err) assert.Empty(t, peers) diff --git a/discovery/nats/config.go b/discovery/nats/config.go new file mode 100644 index 00000000..ed43f1da --- /dev/null +++ b/discovery/nats/config.go @@ -0,0 +1,55 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nats + +import ( + "time" + + "github.com/tochemey/goakt/internal/validation" +) + +// Config represents the nats provider discoConfig +type Config struct { + // NatsServer defines the nats server in the format nats://host:port + NatsServer string + // NatsSubject defines the custom NATS subject + NatsSubject string + // The actor system name + ActorSystemName string + // ApplicationName specifies the running application + ApplicationName string + // Timeout defines the nodes discovery timeout + Timeout time.Duration +} + +// Validate checks whether the given discovery configuration is valid +func (x Config) Validate() error { + return validation.New(validation.FailFast()). + AddValidator(validation.NewEmptyStringValidator("NatsServer", x.NatsServer)). + AddValidator(validation.NewEmptyStringValidator("NatsSubject", x.NatsSubject)). + AddValidator(validation.NewEmptyStringValidator("ApplicationName", x.ApplicationName)). + AddValidator(validation.NewEmptyStringValidator("ActorSystemName", x.ActorSystemName)). + Validate() +} diff --git a/discovery/nats/config_test.go b/discovery/nats/config_test.go new file mode 100644 index 00000000..0c8472d4 --- /dev/null +++ b/discovery/nats/config_test.go @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package nats + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + t.Run("With valid configuration", func(t *testing.T) { + config := &Config{ + NatsServer: "nats://127.0.0.1:2322", + ApplicationName: "applicationName", + ActorSystemName: "actorSys", + NatsSubject: "nats-subject", + } + assert.NoError(t, config.Validate()) + }) + t.Run("With invalid configuration", func(t *testing.T) { + config := &Config{ + NatsServer: "nats://127.0.0.1:2322", + ApplicationName: "applicationName", + ActorSystemName: "", + NatsSubject: "nats-subject", + } + assert.Error(t, config.Validate()) + }) +} diff --git a/discovery/nats/discovery.go b/discovery/nats/discovery.go index 6e1984e3..220f36f9 100644 --- a/discovery/nats/discovery.go +++ b/discovery/nats/discovery.go @@ -40,32 +40,9 @@ import ( "github.com/tochemey/goakt/log" ) -const ( - NatsServer string = "nats-server" // NatsServer specifies the Nats server Address - NatsSubject string = "nats-subject" // NatsSubject specifies the NATs subject - ActorSystemName = "actor_system_name" // ActorSystemName specifies the actor system name - ApplicationName = "app_name" // ApplicationName specifies the application name. This often matches the actor system name - Timeout = "timeout" // Timeout specifies the discovery timeout. The default value is 1 second -) - -// discoConfig represents the nats provider discoConfig -type discoConfig struct { - // NatsServer defines the nats server - // nats://host:port of a nats server - NatsServer string - // NatsSubject defines the custom NATS subject - NatsSubject string - // The actor system name - ActorSystemName string - // ApplicationName specifies the running application - ApplicationName string - // Timeout defines the nodes discovery timeout - Timeout time.Duration -} - // Discovery represents the kubernetes discovery type Discovery struct { - config *discoConfig + config *Config mu sync.Mutex initialized *atomic.Bool @@ -87,14 +64,15 @@ type Discovery struct { var _ discovery.Provider = &Discovery{} // NewDiscovery returns an instance of the kubernetes discovery provider -func NewDiscovery(opts ...Option) *Discovery { +func NewDiscovery(config *Config, hostNode *discovery.Node, opts ...Option) *Discovery { // create an instance of discovery := &Discovery{ mu: sync.Mutex{}, initialized: atomic.NewBool(false), registered: atomic.NewBool(false), - config: &discoConfig{}, + config: config, logger: log.DefaultLogger, + hostNode: hostNode, } // apply the various options @@ -119,25 +97,25 @@ func (d *Discovery) Initialize() error { return discovery.ErrAlreadyInitialized } - if d.config.Timeout <= 0 { - d.config.Timeout = time.Second + if err := d.config.Validate(); err != nil { + return err } - hostNode, err := discovery.HostNode() - if err != nil { - return err + if d.config.Timeout <= 0 { + d.config.Timeout = time.Second } // create the nats connection option opts := nats.GetDefaultOptions() opts.Url = d.config.NatsServer //opts.Servers = n.Config.Servers - opts.Name = hostNode.Name + opts.Name = d.hostNode.Name opts.ReconnectWait = 2 * time.Second opts.MaxReconnect = -1 var ( connection *nats.Conn + err error ) // let us connect using an exponential backoff mechanism @@ -163,7 +141,6 @@ func (d *Discovery) Initialize() error { } d.natsConnection = encodedConn d.initialized = atomic.NewBool(true) - d.hostNode = hostNode return nil } @@ -254,18 +231,6 @@ func (d *Discovery) Deregister() error { return nil } -// SetConfig registers the underlying discovery configuration -func (d *Discovery) SetConfig(config discovery.Config) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.initialized.Load() { - return discovery.ErrAlreadyInitialized - } - - return d.setConfig(config) -} - // DiscoverPeers returns a list of known nodes. func (d *Discovery) DiscoverPeers() ([]string, error) { d.mu.Lock() @@ -354,32 +319,3 @@ func (d *Discovery) Close() error { } return nil } - -// setConfig sets the kubernetes discoConfig -func (d *Discovery) setConfig(config discovery.Config) (err error) { - option := new(discoConfig) - - option.NatsServer, err = config.GetString(NatsServer) - if err != nil { - return err - } - - option.NatsSubject, err = config.GetString(NatsSubject) - if err != nil { - return err - } - - // extract the actor system name - option.ActorSystemName, err = config.GetString(ActorSystemName) - if err != nil { - return err - } - - option.ApplicationName, err = config.GetString(ApplicationName) - if err != nil { - return err - } - - d.config = option - return nil -} diff --git a/discovery/nats/discovery_test.go b/discovery/nats/discovery_test.go index a4de1019..78aa40c5 100644 --- a/discovery/nats/discovery_test.go +++ b/discovery/nats/discovery_test.go @@ -25,8 +25,6 @@ package nats import ( - "os" - "strconv" "testing" "time" @@ -72,49 +70,73 @@ func newPeer(t *testing.T, serverAddr string) *Discovery { // create a Cluster node host := "localhost" - // set the environments - require.NoError(t, os.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort))) - require.NoError(t, os.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort))) - require.NoError(t, os.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort))) - require.NoError(t, os.Setenv("NODE_NAME", "testNode")) - require.NoError(t, os.Setenv("NODE_IP", host)) - // create the various config option applicationName := "accounts" actorSystemName := "AccountsSystem" natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery() // create the config - config := discovery.Config{ + config := &Config{ ApplicationName: applicationName, ActorSystemName: actorSystemName, NatsServer: serverAddr, NatsSubject: natsSubject, } - // set config - err := provider.SetConfig(config) - require.NoError(t, err) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + + // create the instance of provider + provider := NewDiscovery(config, &hostNode) // initialize - err = provider.Initialize() + err := provider.Initialize() require.NoError(t, err) - // clear the env var - require.NoError(t, os.Unsetenv("GOSSIP_PORT")) - require.NoError(t, os.Unsetenv("CLUSTER_PORT")) - require.NoError(t, os.Unsetenv("REMOTING_PORT")) - require.NoError(t, os.Unsetenv("NODE_NAME")) - require.NoError(t, os.Unsetenv("NODE_IP")) // return the provider return provider } func TestDiscovery(t *testing.T) { t.Run("With a new instance", func(t *testing.T) { + // start the NATS server + srv := startNatsServer(t) + + // generate the ports for the single node + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + // create a Cluster node + host := "localhost" + // create the various config option + applicationName := "accounts" + actorSystemName := "AccountsSystem" + natsSubject := "some-subject" + + // create the config + config := &Config{ + ApplicationName: applicationName, + ActorSystemName: actorSystemName, + NatsServer: srv.Addr().String(), + NatsSubject: natsSubject, + } + + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(config, &hostNode) require.NotNil(t, provider) // assert that provider implements the Discovery interface // this is a cheap test @@ -125,125 +147,172 @@ func TestDiscovery(t *testing.T) { assert.True(t, ok) }) t.Run("With ID assertion", func(t *testing.T) { - // cheap test - // create the instance of provider - provider := NewDiscovery() - require.NotNil(t, provider) - assert.Equal(t, "nats", provider.ID()) - }) - t.Run("With SetConfig", func(t *testing.T) { + // start the NATS server + srv := startNatsServer(t) + + // generate the ports for the single node + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + // create a Cluster node + host := "localhost" // create the various config option - natsServer := "nats://127.0.0.1:2322" applicationName := "accounts" actorSystemName := "AccountsSystem" natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery() + // create the config - config := discovery.Config{ + config := &Config{ ApplicationName: applicationName, ActorSystemName: actorSystemName, - NatsServer: natsServer, + NatsServer: srv.Addr().String(), NatsSubject: natsSubject, } - // set config - assert.NoError(t, provider.SetConfig(config)) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + // create the instance of provider + provider := NewDiscovery(config, &hostNode) + require.NotNil(t, provider) + assert.Equal(t, "nats", provider.ID()) }) - t.Run("With SetConfig: already initialized", func(t *testing.T) { + + t.Run("With Initialize", func(t *testing.T) { // start the NATS server srv := startNatsServer(t) + + // generate the ports for the single node + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + // create a Cluster node + host := "localhost" + // create the various config option natsServer := srv.Addr().String() applicationName := "accounts" actorSystemName := "AccountsSystem" natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery() - provider.initialized = atomic.NewBool(true) + // create the config - config := discovery.Config{ + config := &Config{ ApplicationName: applicationName, ActorSystemName: actorSystemName, NatsServer: natsServer, NatsSubject: natsSubject, } - // set config - err := provider.SetConfig(config) - assert.Error(t, err) - assert.EqualError(t, err, discovery.ErrAlreadyInitialized.Error()) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + + // create the instance of provider + provider := NewDiscovery(config, &hostNode, WithLogger(log.DiscardLogger)) + + // initialize + err := provider.Initialize() + assert.NoError(t, err) + // stop the NATS server t.Cleanup(srv.Shutdown) }) - t.Run("With SetConfig: nats server not set", func(t *testing.T) { + t.Run("With Initialize: already initialized", func(t *testing.T) { + // start the NATS server + srv := startNatsServer(t) + + // generate the ports for the single node + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + // create a Cluster node + host := "localhost" + + // create the various config option + natsServer := srv.Addr().String() applicationName := "accounts" actorSystemName := "AccountsSystem" natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery() + // create the config - config := discovery.Config{ + config := &Config{ ApplicationName: applicationName, ActorSystemName: actorSystemName, + NatsServer: natsServer, NatsSubject: natsSubject, } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig: nats subject not set", func(t *testing.T) { - // create the various config option - natsServer := "nats://127.0.0.1:2322" - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ApplicationName: applicationName, - ActorSystemName: actorSystemName, - NatsServer: natsServer, + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, } - // set config - assert.Error(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(config, &hostNode) + provider.initialized = atomic.NewBool(true) + assert.Error(t, provider.Initialize()) }) - t.Run("With SetConfig: actor system not set", func(t *testing.T) { + + t.Run("With Register: already registered", func(t *testing.T) { + // start the NATS server + srv := startNatsServer(t) + + // generate the ports for the single node + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + // create a Cluster node + host := "localhost" + // create the various config option - natsServer := "nats://127.0.0.1:2322" + natsServer := srv.Addr().String() applicationName := "accounts" + actorSystemName := "AccountsSystem" natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery() + // create the config - config := discovery.Config{ + config := &Config{ ApplicationName: applicationName, + ActorSystemName: actorSystemName, NatsServer: natsServer, NatsSubject: natsSubject, } - // set config - assert.Error(t, provider.SetConfig(config)) - }) - t.Run("With SetConfig: application name not set", func(t *testing.T) { - // create the various config option - natsServer := "nats://127.0.0.1:2322" - actorSystemName := "AccountsSystem" - natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery() - // create the config - config := discovery.Config{ - ActorSystemName: actorSystemName, - NatsServer: natsServer, - NatsSubject: natsSubject, + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, } - // set config - assert.Error(t, provider.SetConfig(config)) + // create the instance of provider + provider := NewDiscovery(config, &hostNode) + provider.registered = atomic.NewBool(true) + err := provider.Register() + assert.Error(t, err) + assert.EqualError(t, err, discovery.ErrAlreadyRegistered.Error()) }) - t.Run("With Initialize", func(t *testing.T) { + t.Run("With Deregister: already not registered", func(t *testing.T) { // start the NATS server srv := startNatsServer(t) @@ -255,62 +324,31 @@ func TestDiscovery(t *testing.T) { // create a Cluster node host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "testNode") - t.Setenv("NODE_IP", host) // create the various config option natsServer := srv.Addr().String() applicationName := "accounts" actorSystemName := "AccountsSystem" natsSubject := "some-subject" - // create the instance of provider - provider := NewDiscovery(WithLogger(log.DiscardLogger)) // create the config - config := discovery.Config{ + config := &Config{ ApplicationName: applicationName, ActorSystemName: actorSystemName, NatsServer: natsServer, NatsSubject: natsSubject, } - // set config - err := provider.SetConfig(config) - require.NoError(t, err) - - // initialize - err = provider.Initialize() - assert.NoError(t, err) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } - // stop the NATS server - t.Cleanup(srv.Shutdown) - }) - t.Run("With Initialize: already initialized", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - provider.initialized = atomic.NewBool(true) - assert.Error(t, provider.Initialize()) - }) - t.Run("With Initialize: with host config not set", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - assert.Error(t, provider.Initialize()) - }) - t.Run("With Register: already registered", func(t *testing.T) { - // create the instance of provider - provider := NewDiscovery() - provider.registered = atomic.NewBool(true) - err := provider.Register() - assert.Error(t, err) - assert.EqualError(t, err, discovery.ErrAlreadyRegistered.Error()) - }) - t.Run("With Deregister: already not registered", func(t *testing.T) { // create the instance of provider - provider := NewDiscovery() + provider := NewDiscovery(config, &hostNode) err := provider.Deregister() assert.Error(t, err) assert.EqualError(t, err, discovery.ErrNotRegistered.Error()) @@ -376,7 +414,41 @@ func TestDiscovery(t *testing.T) { t.Cleanup(srv.Shutdown) }) t.Run("With DiscoverPeers: not initialized", func(t *testing.T) { - provider := NewDiscovery() + // start the NATS server + srv := startNatsServer(t) + + // generate the ports for the single node + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + // create a Cluster node + host := "localhost" + + // create the various config option + natsServer := srv.Addr().String() + applicationName := "accounts" + actorSystemName := "AccountsSystem" + natsSubject := "some-subject" + + // create the config + config := &Config{ + ApplicationName: applicationName, + ActorSystemName: actorSystemName, + NatsServer: natsServer, + NatsSubject: natsSubject, + } + + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + + provider := NewDiscovery(config, &hostNode) peers, err := provider.DiscoverPeers() assert.Error(t, err) assert.Empty(t, peers) diff --git a/discovery/node.go b/discovery/node.go index a1309c8f..c451b689 100644 --- a/discovery/node.go +++ b/discovery/node.go @@ -26,21 +26,9 @@ package discovery import ( "net" - "os" "strconv" - - "github.com/caarlos0/env/v11" ) -// hostNodeConfig helps read the host Node settings -type hostNodeConfig struct { - GossipPort int `env:"GOSSIP_PORT"` - ClusterPort int `env:"CLUSTER_PORT"` - RemotingPort int `env:"REMOTING_PORT"` - Name string `env:"NODE_NAME" envDefault:""` - Host string `env:"NODE_IP" envDefault:""` -} - // Node represents a discovered Node type Node struct { // Name specifies the discovered node's Name @@ -64,34 +52,3 @@ func (n Node) ClusterAddress() string { func (n Node) GossipAddress() string { return net.JoinHostPort(n.Host, strconv.Itoa(n.GossipPort)) } - -// HostNode returns the Node where the discovery provider is running -func HostNode() (*Node, error) { - // load the host node configuration - cfg := &hostNodeConfig{} - opts := env.Options{RequiredIfNoDef: true, UseFieldNameByDefault: false} - if err := env.ParseWithOptions(cfg, opts); err != nil { - return nil, err - } - // check for empty host and name - if cfg.Host == "" { - // let us perform a host lookup - host, _ := os.Hostname() - // set the host - cfg.Host = host - } - - // set the name as host if it is empty - if cfg.Name == "" { - cfg.Name = cfg.Host - } - - // create the host node - return &Node{ - Name: cfg.Name, - Host: cfg.Host, - GossipPort: cfg.GossipPort, - ClusterPort: cfg.ClusterPort, - RemotingPort: cfg.RemotingPort, - }, nil -} diff --git a/discovery/node_test.go b/discovery/node_test.go deleted file mode 100644 index 3654ca76..00000000 --- a/discovery/node_test.go +++ /dev/null @@ -1,93 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2022-2024 Tochemey - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package discovery - -import ( - "fmt" - "os" - "strconv" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/travisjeffery/go-dynaport" -) - -func TestGetHostNode(t *testing.T) { - t.Run("With host node env vars set", func(t *testing.T) { - // generate the ports for the single node - nodePorts := dynaport.Get(3) - gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] - host := "localhost" - - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "testNode") - t.Setenv("NODE_IP", host) - - node, err := HostNode() - require.NoError(t, err) - require.NotNil(t, node) - clusterAddr := node.ClusterAddress() - gossipAddr := node.GossipAddress() - assert.Equal(t, fmt.Sprintf("%s:%d", host, clusterPort), clusterAddr) - assert.Equal(t, fmt.Sprintf("%s:%d", host, gossipPort), gossipAddr) - }) - t.Run("With empty host and name", func(t *testing.T) { - // generate the ports for the single node - nodePorts := dynaport.Get(3) - gossipPort := nodePorts[0] - clusterPort := nodePorts[1] - remotingPort := nodePorts[2] - host, err := os.Hostname() - require.NoError(t, err) - require.NotNil(t, host) - - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "") - t.Setenv("NODE_IP", "") - - node, err := HostNode() - require.NoError(t, err) - require.NotNil(t, node) - clusterAddr := node.ClusterAddress() - gossipAddr := node.GossipAddress() - assert.Equal(t, fmt.Sprintf("%s:%d", host, clusterPort), clusterAddr) - assert.Equal(t, fmt.Sprintf("%s:%d", host, gossipPort), gossipAddr) - assert.Equal(t, host, node.Host) - assert.Equal(t, host, node.Name) - }) - - t.Run("With host node env vars not set", func(t *testing.T) { - node, err := HostNode() - require.Error(t, err) - require.Nil(t, node) - }) -} diff --git a/discovery/provider.go b/discovery/provider.go index 6ae2c977..f2502996 100644 --- a/discovery/provider.go +++ b/discovery/provider.go @@ -34,8 +34,6 @@ type Provider interface { Register() error // Deregister de-registers the service discovery provider. Deregister() error - // SetConfig registers the underlying discovery options - SetConfig(config Config) error // DiscoverPeers returns a list discovered nodes' addresses. DiscoverPeers() ([]string, error) // Close closes the provider diff --git a/examples/actor-cluster/dnssd/cmd/run.go b/examples/actor-cluster/dnssd/cmd/run.go index a599202e..be5a866e 100644 --- a/examples/actor-cluster/dnssd/cmd/run.go +++ b/examples/actor-cluster/dnssd/cmd/run.go @@ -44,7 +44,6 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.21.0" goakt "github.com/tochemey/goakt/actors" - "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/discovery/dnssd" "github.com/tochemey/goakt/examples/actor-cluster/dnssd/service" "github.com/tochemey/goakt/log" @@ -113,15 +112,16 @@ var runCmd = &cobra.Command{ // initialize traces and metric providers initTracer(ctx, config.TraceURL) initMeter() - // instantiate the dnssd discovery provider - disco := dnssd.NewDiscovery() // define the discovery options - discoOptions := discovery.Config{ - dnssd.DomainName: config.ServiceName, - dnssd.IPv6: false, + discoConfig := dnssd.Config{ + DomainName: config.ServiceName, } - // define the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions) + // instantiate the dnssd discovery provider + disco := dnssd.NewDiscovery(&discoConfig) + + // grab the the host + host, _ := os.Hostname() + // create the actor system actorSystem, err := goakt.NewActorSystem( config.ActorSystemName, @@ -129,7 +129,8 @@ var runCmd = &cobra.Command{ goakt.WithLogger(logger), goakt.WithActorInitMaxRetries(3), goakt.WithTracing(), - goakt.WithClustering(serviceDiscovery, 20)) + goakt.WithRemoting(host, int32(config.RemotingPort)), + goakt.WithClustering(disco, 20, config.GossipPort, config.ClusterPort)) // handle the error if err != nil { logger.Panic(err) diff --git a/examples/actor-cluster/dnssd/service/config.go b/examples/actor-cluster/dnssd/service/config.go index e34a6b4f..2cd4c9a4 100644 --- a/examples/actor-cluster/dnssd/service/config.go +++ b/examples/actor-cluster/dnssd/service/config.go @@ -32,6 +32,9 @@ type Config struct { ServiceName string `env:"SERVICE_NAME"` ActorSystemName string `env:"SYSTEM_NAME"` TraceURL string `env:"TRACE_URL"` + GossipPort int `env:"GOSSIP_PORT"` + ClusterPort int `env:"CLUSTER_PORT"` + RemotingPort int `env:"REMOTING_PORT"` } // GetConfig returns the configuration diff --git a/examples/actor-cluster/k8s/cmd/run.go b/examples/actor-cluster/k8s/cmd/run.go index 33d3edf3..ed5ba359 100644 --- a/examples/actor-cluster/k8s/cmd/run.go +++ b/examples/actor-cluster/k8s/cmd/run.go @@ -31,22 +31,41 @@ import ( "syscall" "time" + "github.com/caarlos0/env/v11" "github.com/spf13/cobra" goakt "github.com/tochemey/goakt/actors" - "github.com/tochemey/goakt/discovery" "github.com/tochemey/goakt/discovery/kubernetes" "github.com/tochemey/goakt/examples/actor-cluster/k8s/service" "github.com/tochemey/goakt/log" ) const ( - accountServicePort = 50051 - namespace = "default" - applicationName = "accounts" - actorSystemName = "AccountsSystem" + namespace = "default" + applicationName = "accounts" + actorSystemName = "AccountsSystem" + gossipPortName = "gossip-port" + clusterPortName = "cluster-port" + remotingPortName = "remoting-port" ) +type config struct { + GossipPort int `env:"GOSSIP_PORT"` + ClusterPort int `env:"CLUSTER_PORT"` + RemotingPort int `env:"REMOTING_PORT"` + Port int `env:"PORT" envDefault:"50051"` +} + +func getConfig() *config { + // load the host node configuration + cfg := &config{} + opts := env.Options{RequiredIfNoDef: true, UseFieldNameByDefault: false} + if err := env.ParseWithOptions(cfg, opts); err != nil { + panic(err) + } + return cfg +} + // runCmd represents the run command var runCmd = &cobra.Command{ Use: "run", @@ -59,22 +78,29 @@ var runCmd = &cobra.Command{ logger := log.New(log.DebugLevel, os.Stdout) // instantiate the k8 discovery provider - disco := kubernetes.NewDiscovery() - // define the discovery options - discoOptions := discovery.Config{ - kubernetes.ApplicationName: applicationName, - kubernetes.ActorSystemName: actorSystemName, - kubernetes.Namespace: namespace, - } - // define the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(disco, discoOptions) + disco := kubernetes.NewDiscovery(&kubernetes.Config{ + ApplicationName: applicationName, + ActorSystemName: actorSystemName, + Namespace: namespace, + GossipPortName: gossipPortName, + RemotingPortName: remotingPortName, + ClusterPortName: clusterPortName, + }) + + // get the port config + config := getConfig() + + // grab the the host + host, _ := os.Hostname() + // create the actor system actorSystem, err := goakt.NewActorSystem( actorSystemName, goakt.WithPassivationDisabled(), // set big passivation time goakt.WithLogger(logger), goakt.WithActorInitMaxRetries(3), - goakt.WithClustering(serviceDiscovery, 20)) + goakt.WithRemoting(host, int32(config.RemotingPort)), + goakt.WithClustering(disco, 20, config.GossipPort, config.ClusterPort)) // handle the error if err != nil { logger.Panic(err) @@ -86,7 +112,7 @@ var runCmd = &cobra.Command{ } // create the account service - accountService := service.NewAccountService(actorSystem, logger, accountServicePort) + accountService := service.NewAccountService(actorSystem, logger, config.Port) // start the account service accountService.Start() diff --git a/go.mod b/go.mod index e28c59f1..c3be8bb4 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( go.opentelemetry.io/otel/trace v1.25.0 go.uber.org/atomic v1.11.0 go.uber.org/goleak v1.3.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/net v0.24.0 golang.org/x/sync v0.7.0 @@ -103,7 +104,6 @@ require ( github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.25.0 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.22.0 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/oauth2 v0.17.0 // indirect diff --git a/internal/cluster/discovery.go b/internal/cluster/discovery.go index 3b367614..ac221de3 100644 --- a/internal/cluster/discovery.go +++ b/internal/cluster/discovery.go @@ -71,17 +71,6 @@ func (d *discoveryProvider) SetConfig(c map[string]any) error { return errors.New("invalid discovery provider id") } - options, ok := c["options"] - if !ok { - return errors.New("discovery provider options is not set") - } - - meta := options.(discovery.Config) - if err := d.provider.SetConfig(meta); err != nil { - if !errors.Is(err, discovery.ErrAlreadyInitialized) { - return err - } - } return nil } diff --git a/internal/cluster/discovery_test.go b/internal/cluster/discovery_test.go index a26b5a1b..26c0a719 100644 --- a/internal/cluster/discovery_test.go +++ b/internal/cluster/discovery_test.go @@ -33,7 +33,6 @@ import ( "github.com/travisjeffery/go-dynaport" "github.com/tochemey/goakt/discovery" - "github.com/tochemey/goakt/discovery/kubernetes" "github.com/tochemey/goakt/log" testkit "github.com/tochemey/goakt/mocks/discovery" ) @@ -82,25 +81,13 @@ func TestDiscoveryProvider(t *testing.T) { provider.AssertExpectations(t) }) t.Run("With SetConfig: happy path", func(t *testing.T) { - // create the various config option - namespace := "default" - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the config - config := discovery.Config{ - kubernetes.ApplicationName: applicationName, - kubernetes.ActorSystemName: actorSystemName, - kubernetes.Namespace: namespace, - } // mock the underlying discovery provider provider := new(testkit.Provider) provider. - On("ID").Return("testDisco"). - On("SetConfig", config).Return(nil) + On("ID").Return("testDisco") // create the config options := map[string]any{ - "id": "testDisco", - "options": config, + "id": "testDisco", } // create the instance of the wrapper wrapper := &discoveryProvider{ @@ -148,88 +135,7 @@ func TestDiscoveryProvider(t *testing.T) { assert.EqualError(t, err, "invalid discovery provider id") provider.AssertExpectations(t) }) - t.Run("With SetConfig: options not set", func(t *testing.T) { - // mock the underlying discovery provider - provider := new(testkit.Provider) - provider. - On("ID").Return("testDisco") - // create the config - options := map[string]any{ - "id": "testDisco", - } - // create the instance of the wrapper - wrapper := &discoveryProvider{ - provider: provider, - log: log.DefaultLogger.StdLogger(), - } - // set the options - err := wrapper.SetConfig(options) - assert.Error(t, err) - assert.EqualError(t, err, "discovery provider options is not set") - provider.AssertExpectations(t) - }) - t.Run("With SetConfig: failure", func(t *testing.T) { - // create the various config option - namespace := "default" - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the config - config := discovery.Config{ - kubernetes.ApplicationName: applicationName, - kubernetes.ActorSystemName: actorSystemName, - kubernetes.Namespace: namespace, - } - // mock the underlying discovery provider - provider := new(testkit.Provider) - provider. - On("ID").Return("testDisco"). - On("SetConfig", config).Return(errors.New("failed")) - // create the config - options := map[string]any{ - "id": "testDisco", - "options": config, - } - // create the instance of the wrapper - wrapper := &discoveryProvider{ - provider: provider, - log: log.DefaultLogger.StdLogger(), - } - // set the options - err := wrapper.SetConfig(options) - assert.Error(t, err) - provider.AssertExpectations(t) - }) - t.Run("With SetConfig: already initialized", func(t *testing.T) { - // create the various config option - namespace := "default" - applicationName := "accounts" - actorSystemName := "AccountsSystem" - // create the config - config := discovery.Config{ - kubernetes.ApplicationName: applicationName, - kubernetes.ActorSystemName: actorSystemName, - kubernetes.Namespace: namespace, - } - // mock the underlying discovery provider - provider := new(testkit.Provider) - provider. - On("ID").Return("testDisco"). - On("SetConfig", config).Return(discovery.ErrAlreadyInitialized) - // create the config - options := map[string]any{ - "id": "testDisco", - "options": config, - } - // create the instance of the wrapper - wrapper := &discoveryProvider{ - provider: provider, - log: log.DefaultLogger.StdLogger(), - } - // set the options - err := wrapper.SetConfig(options) - assert.NoError(t, err) - provider.AssertExpectations(t) - }) + t.Run("With Register: happy path", func(t *testing.T) { // mock the underlying discovery provider provider := new(testkit.Provider) diff --git a/internal/cluster/node.go b/internal/cluster/node.go index 4326ac68..e8494bb5 100644 --- a/internal/cluster/node.go +++ b/internal/cluster/node.go @@ -130,8 +130,6 @@ type Node struct { // specifies the discovery provider discoveryProvider discovery.Provider - // specifies the discovery options - discoveryOptions discovery.Config writeTimeout time.Duration readTimeout time.Duration @@ -147,14 +145,13 @@ type Node struct { var _ Interface = &Node{} // NewNode creates an instance of cluster Node -func NewNode(name string, serviceDiscovery *discovery.ServiceDiscovery, opts ...Option) (*Node, error) { +func NewNode(name string, disco discovery.Provider, host *discovery.Node, opts ...Option) (*Node, error) { // create an instance of the Node node := &Node{ partitionsCount: 20, logger: log.DefaultLogger, name: name, - discoveryProvider: serviceDiscovery.Provider(), - discoveryOptions: serviceDiscovery.Config(), + discoveryProvider: disco, writeTimeout: time.Second, readTimeout: time.Second, shutdownTimeout: 3 * time.Second, @@ -169,16 +166,8 @@ func NewNode(name string, serviceDiscovery *discovery.ServiceDiscovery, opts ... opt.Apply(node) } - // get the host info - hostNode, err := discovery.HostNode() - // handle the error - if err != nil { - node.logger.Error(errors.Wrap(err, "failed get the host node.💥")) - return nil, err - } - // set the host startNode - node.host = hostNode + node.host = host return node, nil } @@ -211,9 +200,8 @@ func (n *Node) Start(ctx context.Context) error { } conf.ServiceDiscovery = map[string]any{ - "plugin": discoveryWrapper, - "id": n.discoveryProvider.ID(), - "options": n.discoveryOptions, + "plugin": discoveryWrapper, + "id": n.discoveryProvider.ID(), } // let us start the Node diff --git a/internal/cluster/node_test.go b/internal/cluster/node_test.go index 5310c021..01676a7e 100644 --- a/internal/cluster/node_test.go +++ b/internal/cluster/node_test.go @@ -28,7 +28,6 @@ import ( "context" "fmt" "os" - "strconv" "testing" "time" @@ -65,31 +64,27 @@ func TestSingleNode(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) - // create the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(provider, config) - // create a Node startNode host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "testNode") - t.Setenv("NODE_IP", host) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } logger := log.New(log.ErrorLevel, os.Stdout) - cluster, err := NewNode("test", serviceDiscovery, WithLogger(logger)) + cluster, err := NewNode("test", provider, &hostNode, WithLogger(logger)) require.NotNil(t, cluster) require.NoError(t, err) @@ -125,29 +120,25 @@ func TestSingleNode(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) - // create the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(provider, config) - // create a Node startNode host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "testNode") - t.Setenv("NODE_IP", host) - - cluster, err := NewNode("test", serviceDiscovery) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + + cluster, err := NewNode("test", provider, &hostNode) require.NotNil(t, cluster) require.NoError(t, err) @@ -199,29 +190,25 @@ func TestSingleNode(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) - // create the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(provider, config) - // create a Node startNode host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "testNode") - t.Setenv("NODE_IP", host) - - cluster, err := NewNode("test", serviceDiscovery) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + + cluster, err := NewNode("test", provider, &hostNode) require.NotNil(t, cluster) require.NoError(t, err) @@ -266,30 +253,26 @@ func TestSingleNode(t *testing.T) { // mock the discovery provider provider := new(testkit.Provider) - config := discovery.NewConfig() provider.EXPECT().ID().Return("testDisco") provider.EXPECT().Initialize().Return(nil) provider.EXPECT().Register().Return(nil) provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) provider.EXPECT().DiscoverPeers().Return(addrs, nil) provider.EXPECT().Close().Return(nil) - // create the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(provider, config) - // create a Node startNode host := "localhost" - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - t.Setenv("NODE_NAME", "testNode") - t.Setenv("NODE_IP", host) + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } logger := log.New(log.WarningLevel, os.Stdout) - cluster, err := NewNode("test", serviceDiscovery, WithLogger(logger)) + cluster, err := NewNode("test", provider, &hostNode, WithLogger(logger)) require.NotNil(t, cluster) require.NoError(t, err) @@ -324,40 +307,6 @@ func TestSingleNode(t *testing.T) { require.NoError(t, cluster.Stop(ctx)) provider.AssertExpectations(t) }) - t.Run("With host node env vars not set", func(t *testing.T) { - // generate the ports for the single startNode - nodePorts := dynaport.Get(3) - gossipPort := nodePorts[0] - remotingPort := nodePorts[2] - - // define discovered addresses - addrs := []string{ - fmt.Sprintf("localhost:%d", gossipPort), - } - - // mock the discovery provider - provider := new(testkit.Provider) - config := discovery.NewConfig() - - provider.EXPECT().ID().Return("testDisco") - provider.EXPECT().Initialize().Return(nil) - provider.EXPECT().Register().Return(nil) - provider.EXPECT().Deregister().Return(nil) - provider.EXPECT().SetConfig(config).Return(nil) - provider.EXPECT().DiscoverPeers().Return(addrs, nil) - provider.EXPECT().Close().Return(nil) - - // create the service discovery - serviceDiscovery := discovery.NewServiceDiscovery(provider, config) - - // set the environments - t.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort)) - t.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort)) - - cluster, err := NewNode("test", serviceDiscovery) - require.Nil(t, cluster) - require.Error(t, err) - }) } func TestMultipleNodes(t *testing.T) { @@ -482,43 +431,38 @@ func startNode(t *testing.T, nodeName, serverAddr string) (*Node, discovery.Prov // create a Cluster startNode host := "127.0.0.1" - // set the environments - require.NoError(t, os.Setenv("GOSSIP_PORT", strconv.Itoa(gossipPort))) - require.NoError(t, os.Setenv("CLUSTER_PORT", strconv.Itoa(clusterPort))) - require.NoError(t, os.Setenv("REMOTING_PORT", strconv.Itoa(remotingPort))) - require.NoError(t, os.Setenv("NODE_NAME", nodeName)) - require.NoError(t, os.Setenv("NODE_IP", host)) - // create the various config option applicationName := "accounts" actorSystemName := "testSystem" natsSubject := "some-subject" - // create the instance of provider - provider := nats.NewDiscovery() // create the config - config := discovery.Config{ - nats.ApplicationName: applicationName, - nats.ActorSystemName: actorSystemName, - nats.NatsServer: serverAddr, - nats.NatsSubject: natsSubject, + config := nats.Config{ + ApplicationName: applicationName, + ActorSystemName: actorSystemName, + NatsServer: serverAddr, + NatsSubject: natsSubject, } + hostNode := discovery.Node{ + Name: host, + Host: host, + GossipPort: gossipPort, + ClusterPort: clusterPort, + RemotingPort: remotingPort, + } + + // create the instance of provider + provider := nats.NewDiscovery(&config, &hostNode) + // create the startNode - node, err := NewNode(nodeName, discovery.NewServiceDiscovery(provider, config)) + node, err := NewNode(nodeName, provider, &hostNode) require.NoError(t, err) require.NotNil(t, node) // start the node require.NoError(t, node.Start(ctx)) - // clear the env var - require.NoError(t, os.Unsetenv("GOSSIP_PORT")) - require.NoError(t, os.Unsetenv("CLUSTER_PORT")) - require.NoError(t, os.Unsetenv("REMOTING_PORT")) - require.NoError(t, os.Unsetenv("NODE_NAME")) - require.NoError(t, os.Unsetenv("NODE_IP")) - // return the cluster startNode return node, provider } diff --git a/discovery/service_discovery.go b/internal/validation/boolean.go similarity index 63% rename from discovery/service_discovery.go rename to internal/validation/boolean.go index b41d39c5..8f16584b 100644 --- a/discovery/service_discovery.go +++ b/internal/validation/boolean.go @@ -22,30 +22,26 @@ * SOFTWARE. */ -package discovery +package validation -// ServiceDiscovery defines the cluster service discovery -type ServiceDiscovery struct { - // provider specifies the discovery provider - provider Provider - // config specifies the discovery config - config Config -} +import "github.com/pkg/errors" -// NewServiceDiscovery creates an instance of ServiceDiscovery -func NewServiceDiscovery(provider Provider, config Config) *ServiceDiscovery { - return &ServiceDiscovery{ - provider: provider, - config: config, - } +// booleanValidator implements Validator. +type booleanValidator struct { + boolCheck bool + errMessage string } -// Provider returns the service discovery provider -func (s ServiceDiscovery) Provider() Provider { - return s.provider +// NewBooleanValidator creates a new boolean validator that returns an error message if condition is false +// This validator will come handy when dealing with conditional validation +func NewBooleanValidator(boolCheck bool, errMessage string) Validator { + return &booleanValidator{boolCheck: boolCheck, errMessage: errMessage} } -// Config returns the service discovery config -func (s ServiceDiscovery) Config() Config { - return s.config +// Validate returns an error if boolean check is false +func (v booleanValidator) Validate() error { + if !v.boolCheck { + return errors.New(v.errMessage) + } + return nil } diff --git a/internal/validation/boolean_test.go b/internal/validation/boolean_test.go new file mode 100644 index 00000000..ac5c9c6f --- /dev/null +++ b/internal/validation/boolean_test.go @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type booleanTestSuite struct { + suite.Suite +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestBooleanValidator(t *testing.T) { + suite.Run(t, new(booleanTestSuite)) +} + +func (s *booleanTestSuite) TestBooleanValidator() { + s.Run("happy path when condition is true", func() { + err := NewBooleanValidator(true, "error message").Validate() + s.Assert().NoError(err) + }) + s.Run("happy path when condition is false", func() { + errMsg := "error message" + err := NewBooleanValidator(false, errMsg).Validate() + s.Assert().Error(err) + s.Assert().EqualError(err, errMsg) + }) +} diff --git a/internal/validation/conditional.go b/internal/validation/conditional.go new file mode 100644 index 00000000..a5b70a5e --- /dev/null +++ b/internal/validation/conditional.go @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +// conditionalValidator runs a validator when a condition is met +type conditionalValidator struct { + c bool + v Validator +} + +// NewConditionalValidator creates a conditional validator, that runs the validator if the condition is true. +// This validator will help when performing data update +func NewConditionalValidator(condition bool, validator Validator) Validator { + return &conditionalValidator{c: condition, v: validator} +} + +// Validate runs the provided conditional validator +func (v conditionalValidator) Validate() error { + if v.c { + return v.v.Validate() + } + return nil +} diff --git a/internal/validation/conditional_test.go b/internal/validation/conditional_test.go new file mode 100644 index 00000000..758ea33d --- /dev/null +++ b/internal/validation/conditional_test.go @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type conditionalTestSuite struct { + suite.Suite +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestConditionalValidator(t *testing.T) { + suite.Run(t, new(conditionalTestSuite)) +} + +func (s *conditionalTestSuite) TestConditionalValidator() { + s.Run("with condition set to true", func() { + fieldName := "field" + fieldValue := "" + validator := NewConditionalValidator(true, NewEmptyStringValidator(fieldName, fieldValue)) + err := validator.Validate() + s.Assert().Error(err) + s.Assert().EqualError(err, "the [field] is required") + }) + s.Run("with condition set to false", func() { + fieldName := "field" + fieldValue := "" + validator := NewConditionalValidator(false, NewEmptyStringValidator(fieldName, fieldValue)) + err := validator.Validate() + s.Assert().NoError(err) + }) +} diff --git a/internal/validation/empty_string.go b/internal/validation/empty_string.go new file mode 100644 index 00000000..483d9167 --- /dev/null +++ b/internal/validation/empty_string.go @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +import "fmt" + +type emptyStringValidator struct { + fieldValue string + fieldName string +} + +// NewEmptyStringValidator creates a string a emptyString validator +func NewEmptyStringValidator(fieldName, fieldValue string) Validator { + return emptyStringValidator{fieldValue: fieldValue, fieldName: fieldName} +} + +// Validate checks whether the given string is empty or not +func (v emptyStringValidator) Validate() error { + if v.fieldValue == "" { + return fmt.Errorf("the [%s] is required", v.fieldName) + } + return nil +} diff --git a/internal/validation/empty_string_test.go b/internal/validation/empty_string_test.go new file mode 100644 index 00000000..ea58cdf9 --- /dev/null +++ b/internal/validation/empty_string_test.go @@ -0,0 +1,57 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type emptyStringTestSuite struct { + suite.Suite +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestEmptyStringValidator(t *testing.T) { + suite.Run(t, new(emptyStringTestSuite)) +} + +func (s *emptyStringTestSuite) TestEmptyStringValidator() { + s.Run("with string value set", func() { + validator := NewEmptyStringValidator("field", "some-value") + s.Assert().NotNil(validator) + err := validator.Validate() + s.Assert().NoError(err) + }) + s.Run("with string value not set", func() { + validator := NewEmptyStringValidator("field", "") + s.Assert().NotNil(validator) + err := validator.Validate() + s.Assert().Error(err) + s.Assert().EqualError(err, "the [field] is required") + }) +} diff --git a/internal/validation/validation.go b/internal/validation/validation.go new file mode 100644 index 00000000..555deb64 --- /dev/null +++ b/internal/validation/validation.go @@ -0,0 +1,95 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +import ( + "go.uber.org/multierr" +) + +// Validator interface generalizes the validator implementations +type Validator interface { + Validate() error +} + +// Chain represents list of validators and is used to accumulate errors and return them as a single "error" +type Chain struct { + failFast bool + validators []Validator + violations error +} + +// ChainOption configures a validation chain at creation time. +type ChainOption func(*Chain) + +// New creates a new validation chain. +func New(opts ...ChainOption) *Chain { + chain := &Chain{ + validators: make([]Validator, 0), + } + + for _, opt := range opts { + opt(chain) + } + + return chain +} + +// FailFast sets whether a chain should stop validation on first error. +func FailFast() ChainOption { + return func(c *Chain) { c.failFast = true } +} + +// AllErrors sets whether a chain should return all errors. +func AllErrors() ChainOption { + return func(c *Chain) { c.failFast = false } +} + +// AddValidator adds validator to the validation chain. +func (c *Chain) AddValidator(v Validator) *Chain { + c.validators = append(c.validators, v) + return c +} + +// AddAssertion adds assertion to the validation chain. +func (c *Chain) AddAssertion(isTrue bool, message string) *Chain { + c.validators = append(c.validators, NewBooleanValidator(isTrue, message)) + return c +} + +// Validate runs validation chain and returns resulting error(s). +// It returns all validation error by default, use FailFast option to stop validation on first error. +func (c *Chain) Validate() error { + for _, v := range c.validators { + if violations := v.Validate(); violations != nil { + if c.failFast { + // just return the error + return violations + } + // append error to the violations + c.violations = multierr.Append(c.violations, violations) + } + } + return c.violations +} diff --git a/internal/validation/validation_test.go b/internal/validation/validation_test.go new file mode 100644 index 00000000..9986e960 --- /dev/null +++ b/internal/validation/validation_test.go @@ -0,0 +1,111 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package validation + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type validationTestSuite struct { + suite.Suite +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestValidation(t *testing.T) { + suite.Run(t, new(validationTestSuite)) +} + +func (s *validationTestSuite) TestNewChain() { + s.Run("new chain without option", func() { + chain := New() + s.Assert().NotNil(chain) + }) + s.Run("new chain with options", func() { + chain := New(FailFast()) + s.Assert().NotNil(chain) + s.Assert().True(chain.failFast) + chain2 := New(AllErrors()) + s.Assert().NotNil(chain2) + s.Assert().False(chain2.failFast) + }) +} + +func (s *validationTestSuite) TestAddValidator() { + chain := New() + s.Assert().NotNil(chain) + s.Assert().Empty(chain.validators) + chain.AddValidator(NewBooleanValidator(true, "")) + s.Assert().NotEmpty(chain.validators) + s.Assert().Equal(1, len(chain.validators)) +} + +func (s *validationTestSuite) TestAddAssertion() { + chain := New() + s.Assert().NotNil(chain) + s.Assert().Empty(chain.validators) + chain.AddAssertion(true, "") + s.Assert().NotEmpty(chain.validators) + s.Assert().Equal(1, len(chain.validators)) +} + +func (s *validationTestSuite) TestValidate() { + s.Run("with single validator", func() { + chain := New() + s.Assert().NotNil(chain) + chain.AddValidator(NewEmptyStringValidator("field", "")) + s.Assert().Nil(chain.violations) + err := chain.Validate() + s.Assert().NotNil(chain.violations) + s.Assert().Error(err) + s.Assert().EqualError(err, "the [field] is required") + }) + s.Run("with multiple validators and FailFast option", func() { + chain := New(FailFast()) + s.Assert().NotNil(chain) + chain. + AddValidator(NewEmptyStringValidator("field", "")). + AddAssertion(false, "this is false") + s.Assert().Nil(chain.violations) + err := chain.Validate() + s.Assert().Nil(chain.violations) + s.Assert().Error(err) + s.Assert().EqualError(err, "the [field] is required") + }) + s.Run("with multiple validators and AllErrors option", func() { + chain := New(AllErrors()) + s.Assert().NotNil(chain) + chain. + AddValidator(NewEmptyStringValidator("field", "")). + AddAssertion(false, "this is false") + s.Assert().Nil(chain.violations) + err := chain.Validate() + s.Assert().NotNil(chain.violations) + s.Assert().Error(err) + s.Assert().EqualError(err, "the [field] is required; this is false") + }) +} diff --git a/mocks/discovery/provider.go b/mocks/discovery/provider.go index 2a216ca5..4c934f47 100644 --- a/mocks/discovery/provider.go +++ b/mocks/discovery/provider.go @@ -2,10 +2,7 @@ package discovery -import ( - mock "github.com/stretchr/testify/mock" - goaktdiscovery "github.com/tochemey/goakt/discovery" -) +import mock "github.com/stretchr/testify/mock" // Provider is an autogenerated mock type for the Provider type type Provider struct { @@ -278,48 +275,6 @@ func (_c *Provider_Register_Call) RunAndReturn(run func() error) *Provider_Regis return _c } -// SetConfig provides a mock function with given fields: config -func (_m *Provider) SetConfig(config goaktdiscovery.Config) error { - ret := _m.Called(config) - - var r0 error - if rf, ok := ret.Get(0).(func(goaktdiscovery.Config) error); ok { - r0 = rf(config) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// Provider_SetConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetConfig' -type Provider_SetConfig_Call struct { - *mock.Call -} - -// SetConfig is a helper method to define mock.On call -// - config goaktdiscovery.Config -func (_e *Provider_Expecter) SetConfig(config interface{}) *Provider_SetConfig_Call { - return &Provider_SetConfig_Call{Call: _e.mock.On("SetConfig", config)} -} - -func (_c *Provider_SetConfig_Call) Run(run func(config goaktdiscovery.Config)) *Provider_SetConfig_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(goaktdiscovery.Config)) - }) - return _c -} - -func (_c *Provider_SetConfig_Call) Return(_a0 error) *Provider_SetConfig_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Provider_SetConfig_Call) RunAndReturn(run func(goaktdiscovery.Config) error) *Provider_SetConfig_Call { - _c.Call.Return(run) - return _c -} - // NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewProvider(t interface {