From 7ca75e3b7cf5da6dec48cec057544d4fe2fa2911 Mon Sep 17 00:00:00 2001 From: Arsene Date: Sun, 9 Jul 2023 13:27:21 +0000 Subject: [PATCH] feat: add static discovery provider (#72) * feat: add static discovery provider * fix: data race --- cluster/cluster.go | 22 +-- discovery/kubernetes/discovery.go | 186 +++++++++++++---------- discovery/kubernetes/discovery_test.go | 6 +- discovery/meta.go | 5 + discovery/node.go | 36 +++++ discovery/node_test.go | 117 ++++++++++++++ discovery/static/discovery.go | 144 ++++++++++++++++++ discovery/static/discovery_test.go | 201 +++++++++++++++++++++++++ examples/actor-cluster/k8s/cmd/run.go | 2 +- readme.md | 4 +- 10 files changed, 625 insertions(+), 98 deletions(-) create mode 100644 discovery/node_test.go create mode 100644 discovery/static/discovery.go create mode 100644 discovery/static/discovery_test.go diff --git a/cluster/cluster.go b/cluster/cluster.go index 8ad7e8c7..b7fed266 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -25,8 +25,6 @@ import ( ) const ( - clientPortName = "clients-port" - peersPortName = "peers-port" memberAddThreshold = 5 ) @@ -102,7 +100,7 @@ func (c *Cluster) Start(ctx context.Context) error { for _, discoNode := range discoNodes { if !(hostNode.host == discoNode.Host) { // build the peer URL - _, clientsURL := nodeURLs(discoNode) + _, clientsURL := discoNode.URLs() // set the endpoints existingEndpoints = append(existingEndpoints, clientsURL) } @@ -170,7 +168,7 @@ func (c *Cluster) Start(ctx context.Context) error { endpoints := make([]string, len(discoNodes)) for i, discoNode := range discoNodes { // build the peer URL - peersURL, clientsURL := nodeURLs(discoNode) + peersURL, clientsURL := discoNode.URLs() // build the initial cluster builder initialPeerURLs[i] = fmt.Sprintf("%s=%s", discoNode.Name, peersURL) // set the endpoints @@ -346,7 +344,7 @@ func (c *Cluster) handleClusterEvents(events <-chan discovery.Event) { // when matching member is found if member != nil { // grab the latest URLs - peerURL, clientURL := nodeURLs(latest) + peerURL, clientURL := latest.URLs() // create an instance of member and set the various URLs m := &etcdserverpb.Member{ ID: member.GetID(), @@ -401,9 +399,9 @@ func (c *Cluster) whoami(discoNodes []*discovery.Node) *hostNode { // here the host node is found if slices.Contains(addresses, discoNode.Host) { // get the peer port - peersPort := discoNode.Ports[peersPortName] + peersPort := discoNode.PeersPort() // get the clients port - clientsPort := discoNode.Ports[clientPortName] + clientsPort := discoNode.ClientsPort() // let us build the host peer URLs and getClient URLs peerURLs := goset.NewSet[string]() @@ -458,7 +456,7 @@ func (c *Cluster) discoverNodes(ctx context.Context) []*discovery.Node { // remove duplicate for _, discoNode := range discoNodes { // let us get the node URL - peersURL, _ := nodeURLs(discoNode) + peersURL, _ := discoNode.URLs() // check whether the Cluster has been already discovered and ignore it if _, ok := seen[peersURL]; ok { continue @@ -484,16 +482,10 @@ func (c *Cluster) discoverNodes(ctx context.Context) []*discovery.Node { return nodes } -// nodeURLs returns the actual node URLs -func nodeURLs(node *discovery.Node) (peersURL string, clientURL string) { - return fmt.Sprintf("http://%s:%d", node.Host, node.Ports[peersPortName]), - fmt.Sprintf("http://%s:%d", node.Host, node.Ports[clientPortName]) -} - // locateMember helps find a given member using its peerURL func locateMember(members []*etcdserverpb.Member, node *discovery.Node) *etcdserverpb.Member { // grab the given node URL - peerURL, _ := nodeURLs(node) + peerURL, _ := node.URLs() for _, member := range members { if slices.Contains(member.GetPeerURLs(), peerURL) && member.GetName() == node.Name { return member diff --git a/discovery/kubernetes/discovery.go b/discovery/kubernetes/discovery.go index 3f6b6612..c3bf9d7c 100644 --- a/discovery/kubernetes/discovery.go +++ b/discovery/kubernetes/discovery.go @@ -54,8 +54,8 @@ type Discovery struct { // enforce compilation error var _ discovery.Discovery = &Discovery{} -// New returns an instance of the kubernetes discovery provider -func New(logger log.Logger) *Discovery { +// NewDiscovery returns an instance of the kubernetes discovery provider +func NewDiscovery(logger log.Logger) *Discovery { // create an instance of k8 := &Discovery{ mu: sync.Mutex{}, @@ -67,39 +67,39 @@ func New(logger log.Logger) *Discovery { return k8 } -// ID returns the discovery id -func (k *Discovery) ID() string { +// ID returns the discovery provider id +func (d *Discovery) ID() string { return "kubernetes" } // Nodes returns the list of up and running Nodes at a given time -func (k *Discovery) Nodes(ctx context.Context) ([]*discovery.Node, error) { +func (d *Discovery) Nodes(ctx context.Context) ([]*discovery.Node, error) { // add a span context ctx, span := telemetry.SpanContext(ctx, "Nodes") defer span.End() - // first check whether the actor system has started - if !k.isInitialized.Load() { + // first check whether the discovery provider is running + if !d.isInitialized.Load() { return nil, errors.New("kubernetes discovery engine not initialized") } // let us create the pod labels map // TODO: make sure to document it on k8 discovery podLabels := map[string]string{ - "app.kubernetes.io/part-of": k.option.ActorSystemName, - "app.kubernetes.io/component": k.option.ApplicationName, // TODO: redefine it - "app.kubernetes.io/name": k.option.ApplicationName, + "app.kubernetes.io/part-of": d.option.ActorSystemName, + "app.kubernetes.io/component": d.option.ApplicationName, // TODO: redefine it + "app.kubernetes.io/name": d.option.ApplicationName, } // List all the pods based on the filters we requested - pods, err := k.client.CoreV1().Pods(k.option.NameSpace).List(ctx, metav1.ListOptions{ + pods, err := d.client.CoreV1().Pods(d.option.NameSpace).List(ctx, metav1.ListOptions{ LabelSelector: labels.SelectorFromSet(podLabels).String(), }) // panic when we cannot poll the pods if err != nil { // TODO maybe do not panic // TODO figure out the best approach - k.logger.Panic(errors.Wrap(err, "failed to fetch kubernetes pods")) + d.logger.Panic(errors.Wrap(err, "failed to fetch kubernetes pods")) } nodes := make([]*discovery.Node, 0, pods.Size()) @@ -123,9 +123,9 @@ MainLoop: } // create a variable holding the node - node := k.podToNode(&pod) + node := d.podToNode(&pod) // continue the loop when we did not find any node - if node == nil { + if node == nil || !node.IsValid() { continue MainLoop } // add the node to the list of nodes @@ -135,21 +135,21 @@ MainLoop: } // Watch returns event based upon node lifecycle -func (k *Discovery) Watch(ctx context.Context) (<-chan discovery.Event, error) { +func (d *Discovery) Watch(ctx context.Context) (<-chan discovery.Event, error) { // add a span context _, span := telemetry.SpanContext(ctx, "Watch") defer span.End() - // first check whether the actor system has started - if !k.isInitialized.Load() { + // first check whether the discovery provider is running + if !d.isInitialized.Load() { return nil, errors.New("kubernetes discovery engine not initialized") } // run the watcher - go k.watchPods() - return k.publicChan, nil + go d.watchPods() + return d.publicChan, nil } // Start the discovery engine -func (k *Discovery) Start(ctx context.Context, meta discovery.Meta) error { +func (d *Discovery) Start(ctx context.Context, meta discovery.Meta) error { // add a span context _, span := telemetry.SpanContext(ctx, "Start") defer span.End() @@ -177,24 +177,30 @@ func (k *Discovery) Start(ctx context.Context, meta discovery.Meta) error { } // set the k8 client - k.client = client + d.client = client // set the options - if err := k.setOptions(meta); err != nil { + if err := d.setOptions(meta); err != nil { return errors.Wrap(err, "failed to instantiate the kubernetes discovery provider") } // set initialized - k.isInitialized = atomic.NewBool(true) + d.isInitialized = atomic.NewBool(true) return nil } // EarliestNode returns the earliest node. This is based upon the node timestamp -func (k *Discovery) EarliestNode(ctx context.Context) (*discovery.Node, error) { +func (d *Discovery) EarliestNode(ctx context.Context) (*discovery.Node, error) { // fetch the list of Nodes - nodes, err := k.Nodes(ctx) + nodes, err := d.Nodes(ctx) // handle the error if err != nil { return nil, errors.Wrap(err, "failed to get the earliest node") } + + // check whether the list of nodes is not empty + if len(nodes) == 0 { + return nil, errors.New("no nodes are found") + } + // let us sort the nodes by their timestamp sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].StartTime < nodes[j].StartTime @@ -204,72 +210,90 @@ func (k *Discovery) EarliestNode(ctx context.Context) (*discovery.Node, error) { } // Stop shutdown the discovery engine -func (k *Discovery) Stop() error { +func (d *Discovery) Stop() error { + // acquire the lock + d.mu.Lock() + // release the lock + defer d.mu.Unlock() + // first check whether the actor system has started - if !k.isInitialized.Load() { + if !d.isInitialized.Load() { return errors.New("kubernetes discovery engine not initialized") } - // acquire the lock - k.mu.Lock() - // release the lock - defer k.mu.Unlock() + // set the initialized to false + d.isInitialized = atomic.NewBool(false) // close the public channel - close(k.publicChan) + close(d.publicChan) // stop the watchers - close(k.stopChan) + close(d.stopChan) + // return return nil } // handlePodAdded is called when a new pod is added -func (k *Discovery) handlePodAdded(pod *corev1.Pod) { +func (d *Discovery) handlePodAdded(pod *corev1.Pod) { // acquire the lock - k.mu.Lock() + d.mu.Lock() // release the lock - defer k.mu.Unlock() + defer d.mu.Unlock() + + // first check whether the discovery provider is running + if !d.isInitialized.Load() { + return + } + // ignore the pod when it is not running if pod.Status.Phase != corev1.PodRunning { // add some debug logging - k.logger.Debugf("pod=%s added is not running. Status=%s", pod.GetName(), pod.Status.Phase) + d.logger.Debugf("pod=%s added is not running. Status=%s", pod.GetName(), pod.Status.Phase) return } // get node - node := k.podToNode(pod) + node := d.podToNode(pod) // continue the loop when we did not find any node - if node == nil { + if node == nil || !node.IsValid() { return } // here we find a node let us raise the node registered event event := &discovery.NodeAdded{Node: node} // add to the channel - k.publicChan <- event + d.publicChan <- event } // handlePodUpdated is called when a pod is updated -func (k *Discovery) handlePodUpdated(old *corev1.Pod, pod *corev1.Pod) { +func (d *Discovery) handlePodUpdated(old *corev1.Pod, pod *corev1.Pod) { + // acquire the lock + d.mu.Lock() + // release the lock + defer d.mu.Unlock() + + // first check whether the discovery provider is running + if !d.isInitialized.Load() { + return + } + // ignore the pod when it is not running if old.Status.Phase != corev1.PodRunning { // add some debug logging - k.logger.Debugf("pod=%s to be modified is not running. Status=%s", old.GetName(), old.Status.Phase) + d.logger.Debugf("pod=%s to be modified is not running. Status=%s", old.GetName(), old.Status.Phase) return } // ignore the pod when it is not running if pod.Status.Phase != corev1.PodRunning { // add some debug logging - k.logger.Debugf("modified pod=%s is not running. Status=%s", pod.GetName(), pod.Status.Phase) + d.logger.Debugf("modified pod=%s is not running. Status=%s", pod.GetName(), pod.Status.Phase) return } - // acquire the lock - k.mu.Lock() - // release the lock - defer k.mu.Unlock() // grab the old node - oldNode := k.podToNode(old) + oldNode := d.podToNode(old) // get the new node - node := k.podToNode(pod) + node := d.podToNode(pod) // continue the loop when we did not find any node - if node == nil { + if oldNode == nil || + node == nil || !node.IsValid() || + !oldNode.IsValid() { return } // here we find a node let us raise the node modified event @@ -278,25 +302,31 @@ func (k *Discovery) handlePodUpdated(old *corev1.Pod, pod *corev1.Pod) { Current: oldNode, } // add to the channel - k.publicChan <- event + d.publicChan <- event } // handlePodDeleted is called when pod is deleted -func (k *Discovery) handlePodDeleted(pod *corev1.Pod) { +func (d *Discovery) handlePodDeleted(pod *corev1.Pod) { // acquire the lock - k.mu.Lock() + d.mu.Lock() // release the lock - defer k.mu.Unlock() + defer d.mu.Unlock() + + // first check whether the discovery provider is running + if !d.isInitialized.Load() { + return + } + // get the new node - node := k.podToNode(pod) + node := d.podToNode(pod) // here we find a node let us raise the node removed event event := &discovery.NodeRemoved{Node: node} // add to the channel - k.publicChan <- event + d.publicChan <- event } // setOptions sets the kubernetes option -func (k *Discovery) setOptions(meta discovery.Meta) (err error) { +func (d *Discovery) setOptions(meta discovery.Meta) (err error) { // create an instance of option option := new(option) // extract the namespace @@ -318,12 +348,12 @@ func (k *Discovery) setOptions(meta discovery.Meta) (err error) { return err } // in case none of the above extraction fails then set the option - k.option = option + d.option = option return nil } // podToNode takes a kubernetes pod and returns a Node -func (k *Discovery) podToNode(pod *corev1.Pod) *discovery.Node { +func (d *Discovery) podToNode(pod *corev1.Pod) *discovery.Node { // create a variable holding the node var node *discovery.Node // iterate the pod containers and find the named port @@ -355,20 +385,20 @@ func (k *Discovery) podToNode(pod *corev1.Pod) *discovery.Node { // watchPods keeps a watch on kubernetes pods activities and emit // respective event when needed -func (k *Discovery) watchPods() { +func (d *Discovery) watchPods() { // add some debug logging - k.logger.Debugf("%s start watching pods activities...", k.ID()) + d.logger.Debugf("%s start watching pods activities...", d.ID()) // TODO: make sure to document it on k8 discovery podLabels := map[string]string{ - "app.kubernetes.io/part-of": k.option.ActorSystemName, - "app.kubernetes.io/component": k.option.ApplicationName, // TODO: redefine it - "app.kubernetes.io/name": k.option.ApplicationName, + "app.kubernetes.io/part-of": d.option.ActorSystemName, + "app.kubernetes.io/component": d.option.ApplicationName, // TODO: redefine it + "app.kubernetes.io/name": d.option.ApplicationName, } // create the k8 informer factory factory := informers.NewSharedInformerFactoryWithOptions( - k.client, + d.client, time.Second, // TODO make it configurable - informers.WithNamespace(k.option.NameSpace), + informers.WithNamespace(d.option.NameSpace), informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labels.SelectorFromSet(podLabels).String() })) @@ -382,23 +412,23 @@ func (k *Discovery) watchPods() { defer mux.RUnlock() if !synced { // add some debug logging - k.logger.Debugf("%s pods watching synchronization not yet done", k.ID()) + d.logger.Debugf("%s pods watching synchronization not yet done", d.ID()) return } // Handler logic pod := obj.(*corev1.Pod) // add some debug logging - k.logger.Debugf("%s has been added", pod.Name) + d.logger.Debugf("%s has been added", pod.Name) // handle the newly added pod - k.handlePodAdded(pod) + d.handlePodAdded(pod) }, UpdateFunc: func(current, node any) { mux.RLock() defer mux.RUnlock() if !synced { // add some debug logging - k.logger.Debugf("%s pods watching synchronization not yet done", k.ID()) + d.logger.Debugf("%s pods watching synchronization not yet done", d.ID()) return } @@ -407,16 +437,16 @@ func (k *Discovery) watchPods() { pod := node.(*corev1.Pod) // add some debug logging - k.logger.Debugf("%s has been modified", old.Name) + d.logger.Debugf("%s has been modified", old.Name) - k.handlePodUpdated(old, pod) + d.handlePodUpdated(old, pod) }, DeleteFunc: func(obj any) { mux.RLock() defer mux.RUnlock() if !synced { // add some debug logging - k.logger.Debugf("%s pods watching synchronization not yet done", k.ID()) + d.logger.Debugf("%s pods watching synchronization not yet done", d.ID()) return } @@ -424,10 +454,10 @@ func (k *Discovery) watchPods() { pod := obj.(*corev1.Pod) // add some debug logging - k.logger.Debugf("%s has been deleted", pod.Name) + d.logger.Debugf("%s has been deleted", pod.Name) // handle the deleted pod - k.handlePodDeleted(pod) + d.handlePodDeleted(pod) }, }) if err != nil { @@ -435,16 +465,16 @@ func (k *Discovery) watchPods() { } // run the informer - go informer.Run(k.stopChan) + go informer.Run(d.stopChan) // wait for caches to sync - isSynced := cache.WaitForCacheSync(k.stopChan, informer.HasSynced) + isSynced := cache.WaitForCacheSync(d.stopChan, informer.HasSynced) mux.Lock() synced = isSynced mux.Unlock() // caches failed to sync if !synced { - k.logger.Fatal("caches failed to sync") + d.logger.Fatal("caches failed to sync") } } diff --git a/discovery/kubernetes/discovery_test.go b/discovery/kubernetes/discovery_test.go index 7f806014..aba4b0d9 100644 --- a/discovery/kubernetes/discovery_test.go +++ b/discovery/kubernetes/discovery_test.go @@ -27,7 +27,7 @@ func TestKubernetesProvider(t *testing.T) { // create a logger logger := log.DefaultLogger // create the instance of provider - provider := New(logger) + provider := NewDiscovery(logger) require.NotNil(t, provider) // assert that provider implements the Discovery interface // this is a cheap test @@ -37,12 +37,12 @@ func TestKubernetesProvider(t *testing.T) { _, ok := p.(discovery.Discovery) assert.True(t, ok) }) - t.Run("With id assertion", func(t *testing.T) { + t.Run("With ID assertion", func(t *testing.T) { // cheap test // create a logger logger := log.DefaultLogger // create the instance of provider - provider := New(logger) + provider := NewDiscovery(logger) require.NotNil(t, provider) assert.Equal(t, "kubernetes", provider.ID()) }) diff --git a/discovery/meta.go b/discovery/meta.go index 8ec85b55..d250d84a 100644 --- a/discovery/meta.go +++ b/discovery/meta.go @@ -8,6 +8,11 @@ import ( // Meta represents the meta information to pass to the discovery engine type Meta map[string]any +// NewMeta initializes meta +func NewMeta() Meta { + return Meta{} +} + // GetString returns the string value of a given key which value is a string // If the key value is not a string then an error is return func (m Meta) GetString(key string) (string, error) { diff --git a/discovery/node.go b/discovery/node.go index 252ae79a..5db94a23 100644 --- a/discovery/node.go +++ b/discovery/node.go @@ -1,5 +1,12 @@ package discovery +import "fmt" + +const ( + clientPortName = "clients-port" + peersPortName = "peers-port" +) + // Node represents a discovered node type Node struct { // Name specifies the discovered node's Name @@ -13,3 +20,32 @@ type Node struct { // IsRunning specifies whether the node is up and running IsRunning bool } + +// IsValid checks whether the discovered node is a valid node discovered +func (n Node) IsValid() bool { + // first let us make sure the various ports are set + if _, ok := n.Ports[clientPortName]; !ok { + return ok + } + // first let us make sure the various ports are set + if _, ok := n.Ports[peersPortName]; !ok { + return ok + } + return len(n.Host) != 0 && len(n.Name) != 0 +} + +// URLs returns the actual node URLs +func (n Node) URLs() (peersURL string, clientURL string) { + return fmt.Sprintf("http://%s:%d", n.Host, n.Ports[peersPortName]), + fmt.Sprintf("http://%s:%d", n.Host, n.Ports[clientPortName]) +} + +// PeersPort returns the node peer port +func (n Node) PeersPort() int32 { + return n.Ports[peersPortName] +} + +// ClientsPort returns the node clients ports +func (n Node) ClientsPort() int32 { + return n.Ports[clientPortName] +} diff --git a/discovery/node_test.go b/discovery/node_test.go new file mode 100644 index 00000000..9517a0fe --- /dev/null +++ b/discovery/node_test.go @@ -0,0 +1,117 @@ +package discovery + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNode(t *testing.T) { + t.Run("With valid node", func(t *testing.T) { + node := &Node{ + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + } + assert.True(t, node.IsValid()) + }) + t.Run("With invalid node: invalid clients-port name", func(t *testing.T) { + node := &Node{ + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-ports": 1111, // invalid key + "peers-port": 1112, + }, + IsRunning: true, + } + assert.False(t, node.IsValid()) + }) + t.Run("With invalid node: invalid peers-port name", func(t *testing.T) { + node := &Node{ + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-ports": 1112, // invalid key + }, + IsRunning: true, + } + assert.False(t, node.IsValid()) + }) + t.Run("With invalid node: node name not set", func(t *testing.T) { + node := &Node{ + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-ports": 1112, // invalid key + }, + IsRunning: true, + } + assert.False(t, node.IsValid()) + }) + t.Run("With invalid node: node hos not set", func(t *testing.T) { + node := &Node{ + Name: "node-1", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-ports": 1112, // invalid key + }, + IsRunning: true, + } + assert.False(t, node.IsValid()) + }) + t.Run("With URLs", func(t *testing.T) { + node := &Node{ + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + } + purls, curls := node.URLs() + assert.Equal(t, "http://localhost:1112", purls) + assert.Equal(t, "http://localhost:1111", curls) + }) + t.Run("With Peers Port", func(t *testing.T) { + node := &Node{ + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + } + port := node.PeersPort() + assert.EqualValues(t, 1112, port) + }) + t.Run("With Clients Port", func(t *testing.T) { + node := &Node{ + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + } + port := node.ClientsPort() + assert.EqualValues(t, 1111, port) + }) +} diff --git a/discovery/static/discovery.go b/discovery/static/discovery.go new file mode 100644 index 00000000..b1d982f0 --- /dev/null +++ b/discovery/static/discovery.go @@ -0,0 +1,144 @@ +package static + +import ( + "context" + "sort" + "sync" + + "github.com/pkg/errors" + "github.com/tochemey/goakt/discovery" + "github.com/tochemey/goakt/log" + "github.com/tochemey/goakt/pkg/telemetry" + "go.uber.org/atomic" +) + +// Discovery represents the static discovery +// With static discovery provider the list of Nodes are known ahead of time +// That means the discovery is not elastic. You cannot at runtime manipulate nodes. +// This is discovery method is great when running Go-Akt in a docker environment +type Discovery struct { + mu sync.Mutex + + stopChan chan struct{} + publicChan chan discovery.Event + // states whether the actor system has started or not + isInitialized *atomic.Bool + logger log.Logger + + nodes []*discovery.Node +} + +// enforce compilation error +var _ discovery.Discovery = &Discovery{} + +// NewDiscovery creates an instance of Discovery +func NewDiscovery(nodes []*discovery.Node, logger log.Logger) *Discovery { + // filter out nodes that are running + running := make([]*discovery.Node, 0, len(nodes)) + for _, node := range nodes { + // check whether the node is valid and running + if !node.IsValid() || !node.IsRunning { + continue + } + // only add running and valid node + running = append(running, node) + } + // create the instance of the Discovery and return it + return &Discovery{ + mu: sync.Mutex{}, + publicChan: make(chan discovery.Event, 2), + stopChan: make(chan struct{}, 1), + isInitialized: atomic.NewBool(false), + logger: logger, + nodes: running, + } +} + +// ID returns the discovery provider id +func (d *Discovery) ID() string { + return "static" +} + +// Start the discovery engine +func (d *Discovery) Start(ctx context.Context, _ discovery.Meta) error { + // add a span context + _, span := telemetry.SpanContext(ctx, "Start") + defer span.End() + + // check whether the list of nodes is not empty + if len(d.nodes) == 0 { + return errors.New("no nodes are set") + } + + // set initialized + d.isInitialized = atomic.NewBool(true) + // return no error + return nil +} + +// Nodes returns the list of up and running Nodes at a given time +func (d *Discovery) Nodes(ctx context.Context) ([]*discovery.Node, error) { + // add a span context + ctx, span := telemetry.SpanContext(ctx, "Nodes") + defer span.End() + + // first check whether the actor system has started + if !d.isInitialized.Load() { + return nil, errors.New("static discovery engine not initialized") + } + + return d.nodes, nil +} + +// Watch returns event based upon node lifecycle +func (d *Discovery) Watch(ctx context.Context) (<-chan discovery.Event, error) { + // add a span context + _, span := telemetry.SpanContext(ctx, "Watch") + defer span.End() + // first check whether the actor system has started + if !d.isInitialized.Load() { + return nil, errors.New("static discovery engine not initialized") + } + + // no events to publish just return the channel + return d.publicChan, nil +} + +// EarliestNode returns the earliest node. This is based upon the node timestamp +func (d *Discovery) EarliestNode(ctx context.Context) (*discovery.Node, error) { + // fetch the list of Nodes + nodes, err := d.Nodes(ctx) + // handle the error + if err != nil { + return nil, errors.Wrap(err, "failed to get the earliest node") + } + + // check whether the list of nodes is not empty + if len(nodes) == 0 { + return nil, errors.New("no nodes are found") + } + + // let us sort the nodes by their timestamp + sort.SliceStable(nodes, func(i, j int) bool { + return nodes[i].StartTime < nodes[j].StartTime + }) + // return the first element in the sorted list + return nodes[0], nil +} + +// Stop shutdown the discovery engine +func (d *Discovery) Stop() error { + // first check whether the actor system has started + if !d.isInitialized.Load() { + return errors.New("static discovery engine not initialized") + } + // acquire the lock + d.mu.Lock() + // release the lock + defer d.mu.Unlock() + // close the public channel + close(d.publicChan) + // stop the watchers + close(d.stopChan) + return nil +} diff --git a/discovery/static/discovery_test.go b/discovery/static/discovery_test.go new file mode 100644 index 00000000..461ef8b9 --- /dev/null +++ b/discovery/static/discovery_test.go @@ -0,0 +1,201 @@ +package static + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tochemey/goakt/discovery" + "github.com/tochemey/goakt/log" +) + +func TestStaticProvider(t *testing.T) { + t.Run("With new instance", func(t *testing.T) { + // create a logger + logger := log.DefaultLogger + // define the nodes + var nodes []*discovery.Node + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + // assert that provider implements the Discovery interface + // this is a cheap test + // assert the type of svc + assert.IsType(t, &Discovery{}, provider) + var p interface{} = provider + _, ok := p.(discovery.Discovery) + assert.True(t, ok) + }) + t.Run("With ID assertion", func(t *testing.T) { + // cheap test + // create a logger + logger := log.DefaultLogger + // define the nodes + var nodes []*discovery.Node + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + assert.Equal(t, "static", provider.ID()) + }) + t.Run("With Start", func(t *testing.T) { + // create the context + ctx := context.TODO() + // create a logger + logger := log.DefaultLogger + // define the nodes + nodes := []*discovery.Node{ + { + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + }, + } + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + assert.NoError(t, provider.Start(ctx, discovery.NewMeta())) + assert.NoError(t, provider.Stop()) + }) + t.Run("With failed Start", func(t *testing.T) { + // create the context + ctx := context.TODO() + // create a logger + logger := log.DefaultLogger + // define the nodes + nodes := []*discovery.Node{ + { + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: false, + }, + } + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + assert.Error(t, provider.Start(ctx, discovery.NewMeta())) + }) + t.Run("With Nodes", func(t *testing.T) { + // create the context + ctx := context.TODO() + // create a logger + logger := log.DefaultLogger + // define the nodes + nodes := []*discovery.Node{ + { + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + }, + } + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + assert.NoError(t, provider.Start(ctx, discovery.NewMeta())) + + actual, err := provider.Nodes(ctx) + require.NoError(t, err) + require.NotEmpty(t, actual) + require.Len(t, actual, 1) + + assert.NoError(t, provider.Stop()) + }) + t.Run("With Watch nodes", func(t *testing.T) { + // create the context + ctx := context.TODO() + // create a logger + logger := log.DefaultLogger + // define the nodes + nodes := []*discovery.Node{ + { + Name: "node-1", + Host: "localhost", + StartTime: time.Now().Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + }, + } + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + assert.NoError(t, provider.Start(ctx, discovery.NewMeta())) + + watchChan, err := provider.Watch(ctx) + assert.NoError(t, err) + assert.NotNil(t, watchChan) + assert.Empty(t, watchChan) + + assert.NoError(t, provider.Stop()) + }) + t.Run("With Earliest node", func(t *testing.T) { + // create the context + ctx := context.TODO() + // create a logger + logger := log.DefaultLogger + ts := time.Now() + // define the nodes + nodes := []*discovery.Node{ + { + Name: "node-1", + Host: "localhost", + StartTime: ts.AddDate(0, 0, -1).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + }, + { + Name: "node-2", + Host: "localhost", + StartTime: ts.Add(time.Second).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1113, + "peers-port": 1114, + }, + IsRunning: true, + }, + } + // create the instance of provider + provider := NewDiscovery(nodes, logger) + require.NotNil(t, provider) + assert.NoError(t, provider.Start(ctx, discovery.NewMeta())) + + actual, err := provider.EarliestNode(ctx) + require.NoError(t, err) + require.NotNil(t, actual) + + expected := &discovery.Node{ + Name: "node-1", + Host: "localhost", + StartTime: ts.AddDate(0, 0, -1).UnixMilli(), + Ports: map[string]int32{ + "clients-port": 1111, + "peers-port": 1112, + }, + IsRunning: true, + } + assert.True(t, cmp.Equal(expected, actual)) + assert.NoError(t, provider.Stop()) + }) +} diff --git a/examples/actor-cluster/k8s/cmd/run.go b/examples/actor-cluster/k8s/cmd/run.go index f71460d3..f548f7c9 100644 --- a/examples/actor-cluster/k8s/cmd/run.go +++ b/examples/actor-cluster/k8s/cmd/run.go @@ -36,7 +36,7 @@ var runCmd = &cobra.Command{ logger := log.New(log.InfoLevel, os.Stdout) // create the k8 configuration - disco := kubernetes.New(logger) + disco := kubernetes.NewDiscovery(logger) // start the discovery engine and handle error if err := disco.Start(ctx, discovery.Meta{ kubernetes.ApplicationName: applicationName, diff --git a/readme.md b/readme.md index 56aafc7d..c5d66740 100644 --- a/readme.md +++ b/readme.md @@ -75,7 +75,9 @@ The following outlines the cluster mode operations which can help have a healthy * To remove nodes, kindly remove them one at a time. Remember to have a healthy cluster you will need at least three nodes running. The cluster engine depends upon the [discovery](./discovery/iface.go) mechanism to find other nodes in the cluster. -At the moment only the [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is provided and fully functional. +At the moment the following providers are implemented: +* the [kubernetes](https://kubernetes.io/docs/home/) [api integration](./discovery/kubernetes) is provided and fully functional. +* the static provider ### Kubernetes Discovery Provider setup