Skip to content

Commit

Permalink
Merge pull request #650 from rancher/cluster-watch-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky authored Oct 12, 2022
2 parents f3cd366 + d5dc487 commit 4ca9b29
Show file tree
Hide file tree
Showing 23 changed files with 1,112 additions and 490 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cert-manager/cert-manager v1.8.0
github.com/charmbracelet/bubbles v0.13.0
github.com/charmbracelet/bubbles v0.14.0
github.com/charmbracelet/bubbletea v0.22.1
github.com/charmbracelet/lipgloss v0.6.0
github.com/containerd/containerd v1.6.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,8 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cb
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 h1:7aWHqerlJ41y6FOsEUvknqgXnGmJyJSbjhAWq5pO4F8=
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5/go.mod h1:/iP1qXHoty45bqomnu2LM+VVyAEdWN+vtSHGlQgyxbw=
github.com/charmbracelet/bubbles v0.13.0 h1:zP/ROH3wJEBqZWKIsD50ZKKlx3ydLInq3LdD/Nrlb8w=
github.com/charmbracelet/bubbles v0.13.0/go.mod h1:bbeTiXwPww4M031aGi8UK2HT9RDWoiNibae+1yCMtcc=
github.com/charmbracelet/bubbles v0.14.0 h1:DJfCwnARfWjZLvMglhSQzo76UZ2gucuHPy9jLWX45Og=
github.com/charmbracelet/bubbles v0.14.0/go.mod h1:bbeTiXwPww4M031aGi8UK2HT9RDWoiNibae+1yCMtcc=
github.com/charmbracelet/bubbletea v0.21.0/go.mod h1:GgmJMec61d08zXsOhqRC/AiOx4K4pmz+VIcRIm1FKr4=
github.com/charmbracelet/bubbletea v0.22.1 h1:z66q0LWdJNOWEH9zadiAIXp2GN1AWrwNXU8obVY9X24=
github.com/charmbracelet/bubbletea v0.22.1/go.mod h1:8/7hVvbPN6ZZPkczLiB8YpLkLJ0n7DMho5Wvfd2X1C0=
Expand Down
796 changes: 400 additions & 396 deletions pkg/apis/management/v1/management.pb.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pkg/apis/management/v1/management.proto
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,13 @@ message WatchClustersRequest {
}

enum WatchEventType {
Added = 0;
Created = 0;
Updated = 1;
Deleted = 2;
}

message WatchEvent {
core.Reference cluster = 1;
core.Cluster cluster = 1;
WatchEventType type = 2;
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/apis/management/v1/management.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@
"type": "object",
"properties": {
"cluster": {
"$ref": "#/definitions/coreReference"
"$ref": "#/definitions/coreCluster"
},
"type": {
"$ref": "#/definitions/managementWatchEventType"
Expand All @@ -1759,10 +1759,11 @@
"managementWatchEventType": {
"type": "string",
"enum": [
"Added",
"Created",
"Updated",
"Deleted"
],
"default": "Added"
"default": "Created"
},
"protobufAny": {
"type": "object",
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (cc *capabilityContext[T, U]) watch() {
case storage.WatchEventDelete:
cc.err.Store(ErrObjectDeleted)
return
case storage.WatchEventPut:
case storage.WatchEventCreate:
if len(cc.capabilities) == 0 {
// check if any capabilities were lost
for _, c := range event.Previous.GetCapabilities() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func makeCluster(caps ...string) *corev1.Cluster {

func putEvent(old, new *corev1.Cluster) storage.WatchEvent[*corev1.Cluster] {
return storage.WatchEvent[*corev1.Cluster]{
EventType: storage.WatchEventPut,
EventType: storage.WatchEventCreate,
Previous: old,
Current: new,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/health/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (l *Listener) HandleConnection(ctx context.Context, clientset HealthClientS
defer clientLock.Unlock() // 4th

l.incomingHealthUpdatesMu.Lock()
incomingHealthUpdates := make(chan HealthUpdate, l.updateQueueCap)
incomingHealthUpdates := make(chan HealthUpdate, 10)
l.incomingHealthUpdates[id] = incomingHealthUpdates
l.incomingHealthUpdatesMu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions pkg/health/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/rancher/opni/pkg/test"
)

var _ = Describe("Listener", func() {
var _ = Describe("Listener", Label("unit", "slow"), func() {
When("creating a new listener", func() {
It("should not send any updates", func() {
listener := health.NewListener()
Expand Down Expand Up @@ -109,7 +109,7 @@ var _ = Describe("Listener", func() {
}
}

Eventually(statusC).Should(HaveLen(98 * numAgents))
Eventually(statusC, 5*time.Second, 100*time.Millisecond).Should(HaveLen(98 * numAgents))
Consistently(statusC).Should(HaveLen(98 * numAgents))

// reconnect all agents
Expand All @@ -121,7 +121,7 @@ var _ = Describe("Listener", func() {
go listener.HandleConnection(test.ContextWithAuthorizedID(context.Background(), agentId), a)
}

Eventually(statusC).Should(HaveLen(99 * numAgents))
Eventually(statusC, 5*time.Second, 100*time.Millisecond).Should(HaveLen(99 * numAgents))
Consistently(statusC).Should(HaveLen(99 * numAgents))

listener.Close()
Expand Down
27 changes: 10 additions & 17 deletions pkg/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,20 @@ func (m *Monitor) WatchHealthStatus(ctx context.Context, id string) <-chan *core
if !ok {
break LOOP
}
hs := &corev1.HealthStatus{
Health: health,
Status: util.ProtoClone(curStatus),
}
select {
case <-ctx.Done():
case ch <- hs:
}
curHealth = health
case status, ok := <-sl:
if !ok {
break LOOP
}

hs := &corev1.HealthStatus{
Health: util.ProtoClone(curHealth),
Status: status,
}
select {
case <-ctx.Done():
case ch <- hs:
}
curStatus = status
}
hs := &corev1.HealthStatus{
Health: util.ProtoClone(curHealth),
Status: util.ProtoClone(curStatus),
}
select {
case <-ctx.Done():
case ch <- hs:
}
}

Expand Down
68 changes: 29 additions & 39 deletions pkg/management/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"time"

capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
Expand Down Expand Up @@ -84,51 +83,42 @@ func (m *Server) WatchClusters(
if err := validation.Validate(in); err != nil {
return err
}
known := map[string]*corev1.Reference{}
var known []*corev1.Cluster
for _, cluster := range in.GetKnownClusters().GetItems() {
if _, err := m.coreDataSource.StorageBackend().GetCluster(context.Background(), cluster); err != nil {
if c, err := m.coreDataSource.StorageBackend().GetCluster(stream.Context(), cluster); err != nil {
return err
} else {
known = append(known, c)
}
known[cluster.Id] = cluster
}
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
for {
clusters, err := m.coreDataSource.StorageBackend().ListClusters(context.Background(), nil, 0)
updatedIds := map[string]struct{}{}
if err != nil {
return err
}
for _, cluster := range clusters.Items {
updatedIds[cluster.Id] = struct{}{}
if _, ok := known[cluster.Id]; !ok {
ref := cluster.Reference()
known[cluster.Id] = ref
if err := stream.Send(&managementv1.WatchEvent{
Cluster: ref,
Type: managementv1.WatchEventType_Added,
}); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}
for id, cluster := range known {
if _, ok := updatedIds[id]; !ok {
delete(known, id)
if err := stream.Send(&managementv1.WatchEvent{
Cluster: cluster,
Type: managementv1.WatchEventType_Deleted,
}); err != nil {
return status.Error(codes.Internal, err.Error())
}
}

eventC, err := m.coreDataSource.StorageBackend().WatchClusters(stream.Context(), known)
if err != nil {
return err
}

for event := range eventC {
var c *corev1.Cluster
var eventType managementv1.WatchEventType
switch event.EventType {
case storage.WatchEventCreate:
eventType = managementv1.WatchEventType_Created
c = event.Current
case storage.WatchEventUpdate:
eventType = managementv1.WatchEventType_Updated
c = event.Current
case storage.WatchEventDelete:
eventType = managementv1.WatchEventType_Deleted
c = event.Previous
}
select {
case <-tick.C:
case <-stream.Context().Done():
return stream.Context().Err()
if err := stream.Send(&managementv1.WatchEvent{
Cluster: c,
Type: eventType,
}); err != nil {
return err
}
}
return nil
}

func (m *Server) EditCluster(
Expand Down
11 changes: 9 additions & 2 deletions pkg/management/clusters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var _ = Describe("Clusters", Ordered, Label("slow"), func() {
})
It("should create clusters", func() {
for x := 0; x < 3; x++ {
time.Sleep(time.Second * 1)
ids := map[string]struct{}{}
for i := 0; i < 10; i++ {
id := uuid.NewString()
Expand All @@ -75,12 +76,16 @@ var _ = Describe("Clusters", Ordered, Label("slow"), func() {
})
Expect(err).NotTo(HaveOccurred())
}
timeout := time.After(1100 * time.Millisecond)
timeout := time.After(2000 * time.Millisecond)
for i := 0; i < 10; i++ {
select {
case event := <-events:
Expect(event.Type).To(Equal(managementv1.WatchEventType_Added))
if event == nil {
break
}
Expect(event.Type).To(Equal(managementv1.WatchEventType_Created))
Expect(ids).To(HaveKey(event.Cluster.Id))
fmt.Println(event.Cluster.Id)
cluster, err := tv.client.GetCluster(context.Background(), &corev1.Reference{
Id: event.Cluster.Id,
})
Expand Down Expand Up @@ -141,6 +146,8 @@ var _ = Describe("Clusters", Ordered, Label("slow"), func() {

for event := range events {
Expect(event.Type).To(Equal(managementv1.WatchEventType_Deleted))
Expect(event.Cluster).NotTo(BeNil())
Expect(event.Cluster.Id).NotTo(BeNil())
Expect(ids).To(HaveKey(event.Cluster.Id))
delete(ids, event.Cluster.Id)

Expand Down
15 changes: 11 additions & 4 deletions pkg/management/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ func (m *Server) WatchClusterHealthStatus(
}

healthStatusC := m.healthStatusDataSource.WatchClusterHealthStatus(stream.Context(), ref)
for healthStatus := range healthStatusC {
if err := stream.Send(healthStatus); err != nil {
return err
for {
select {
case <-stream.Context().Done():
return nil
case healthStatus, ok := <-healthStatusC:
if !ok {
return nil
}
if err := stream.Send(healthStatus); err != nil {
return err
}
}
}
return nil
}
66 changes: 54 additions & 12 deletions pkg/opni/commands/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"reflect"

tea "github.com/charmbracelet/bubbletea"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
managementv1 "github.com/rancher/opni/pkg/apis/management/v1"
"github.com/rancher/opni/pkg/opni/ui"
cliutil "github.com/rancher/opni/pkg/opni/util"
"github.com/spf13/cobra"
"go.uber.org/zap"
Expand All @@ -18,34 +20,74 @@ func BuildClustersCmd() *cobra.Command {
Short: "Manage clusters",
}
clustersCmd.AddCommand(BuildClustersListCmd())
clustersCmd.AddCommand(BuildClustersWatchCmd())
clustersCmd.AddCommand(BuildClustersDeleteCmd())
clustersCmd.AddCommand(BuildClustersLabelCmd())
ConfigureManagementCommand(clustersCmd)
return clustersCmd
}

func BuildClustersListCmd() *cobra.Command {
var watch bool
cmd := &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "List clusters",
Run: func(cmd *cobra.Command, args []string) {
t, err := mgmtClient.ListClusters(cmd.Context(), &managementv1.ListClustersRequest{})
if err != nil {
lg.Fatal(err)
}
var healthStatus []*corev1.HealthStatus
for _, c := range t.Items {
stat, err := mgmtClient.GetClusterHealthStatus(cmd.Context(), c.Reference())
RunE: func(cmd *cobra.Command, args []string) error {
if watch {
m := ui.NewClusterListModel()
w := &ui.ClusterListWatcher{
Messages: make(chan tea.Msg, 100),
Client: mgmtClient,
}
go func() {
if err := w.Run(cmd.Context()); err != nil {
lg.Fatal(err)
}
}()
p := tea.NewProgram(m)
go func() {
for {
select {
case msg := <-w.Messages:
p.Send(msg)
case <-cmd.Context().Done():
p.Send(tea.Quit())
return
}
}
}()
return p.Start()
} else {
t, err := mgmtClient.ListClusters(cmd.Context(), &managementv1.ListClustersRequest{})
if err != nil {
healthStatus = append(healthStatus, &corev1.HealthStatus{})
} else {
healthStatus = append(healthStatus, stat)
return err
}
var healthStatus []*corev1.HealthStatus
for _, c := range t.Items {
stat, err := mgmtClient.GetClusterHealthStatus(cmd.Context(), c.Reference())
if err != nil {
healthStatus = append(healthStatus, &corev1.HealthStatus{})
} else {
healthStatus = append(healthStatus, stat)
}
}
fmt.Println(cliutil.RenderClusterList(t, healthStatus))
}
fmt.Println(cliutil.RenderClusterList(t, healthStatus))
return nil
},
}
cmd.Flags().BoolVarP(&watch, "watch", "w", false, "Watch for updates")
return cmd
}

func BuildClustersWatchCmd() *cobra.Command {
cmd := BuildClustersListCmd()
cmd.Use = "watch"
cmd.Aliases = []string{}
cmd.Short = "Alias for 'list --watch'"
cmd.Flags().Set("watch", "true")
cmd.Flags().MarkHidden("watch")
return cmd
}

Expand Down
Loading

0 comments on commit 4ca9b29

Please sign in to comment.