Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch api bugfixes; use watch api in cli commands #650

Merged
merged 5 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/bmatcuk/doublestar/v4 v4.0.2
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cert-manager/cert-manager v1.8.0
github.com/charmbracelet/bubbles v0.13.0
github.com/charmbracelet/bubbles v0.14.0
github.com/charmbracelet/bubbletea v0.22.1
github.com/charmbracelet/lipgloss v0.6.0
github.com/containerd/containerd v1.6.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,8 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cb
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5 h1:7aWHqerlJ41y6FOsEUvknqgXnGmJyJSbjhAWq5pO4F8=
github.com/chai2010/gettext-go v0.0.0-20160711120539-c6fed771bfd5/go.mod h1:/iP1qXHoty45bqomnu2LM+VVyAEdWN+vtSHGlQgyxbw=
github.com/charmbracelet/bubbles v0.13.0 h1:zP/ROH3wJEBqZWKIsD50ZKKlx3ydLInq3LdD/Nrlb8w=
github.com/charmbracelet/bubbles v0.13.0/go.mod h1:bbeTiXwPww4M031aGi8UK2HT9RDWoiNibae+1yCMtcc=
github.com/charmbracelet/bubbles v0.14.0 h1:DJfCwnARfWjZLvMglhSQzo76UZ2gucuHPy9jLWX45Og=
github.com/charmbracelet/bubbles v0.14.0/go.mod h1:bbeTiXwPww4M031aGi8UK2HT9RDWoiNibae+1yCMtcc=
github.com/charmbracelet/bubbletea v0.21.0/go.mod h1:GgmJMec61d08zXsOhqRC/AiOx4K4pmz+VIcRIm1FKr4=
github.com/charmbracelet/bubbletea v0.22.1 h1:z66q0LWdJNOWEH9zadiAIXp2GN1AWrwNXU8obVY9X24=
github.com/charmbracelet/bubbletea v0.22.1/go.mod h1:8/7hVvbPN6ZZPkczLiB8YpLkLJ0n7DMho5Wvfd2X1C0=
Expand Down
796 changes: 400 additions & 396 deletions pkg/apis/management/v1/management.pb.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pkg/apis/management/v1/management.proto
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,13 @@ message WatchClustersRequest {
}

enum WatchEventType {
Added = 0;
Created = 0;
Updated = 1;
Deleted = 2;
}

message WatchEvent {
core.Reference cluster = 1;
core.Cluster cluster = 1;
WatchEventType type = 2;
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/apis/management/v1/management.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,7 @@
"type": "object",
"properties": {
"cluster": {
"$ref": "#/definitions/coreReference"
"$ref": "#/definitions/coreCluster"
},
"type": {
"$ref": "#/definitions/managementWatchEventType"
Expand All @@ -1759,10 +1759,11 @@
"managementWatchEventType": {
"type": "string",
"enum": [
"Added",
"Created",
"Updated",
"Deleted"
],
"default": "Added"
"default": "Created"
},
"protobufAny": {
"type": "object",
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (cc *capabilityContext[T, U]) watch() {
case storage.WatchEventDelete:
cc.err.Store(ErrObjectDeleted)
return
case storage.WatchEventPut:
case storage.WatchEventCreate:
if len(cc.capabilities) == 0 {
// check if any capabilities were lost
for _, c := range event.Previous.GetCapabilities() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func makeCluster(caps ...string) *corev1.Cluster {

func putEvent(old, new *corev1.Cluster) storage.WatchEvent[*corev1.Cluster] {
return storage.WatchEvent[*corev1.Cluster]{
EventType: storage.WatchEventPut,
EventType: storage.WatchEventCreate,
Previous: old,
Current: new,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/health/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (l *Listener) HandleConnection(ctx context.Context, clientset HealthClientS
defer clientLock.Unlock() // 4th

l.incomingHealthUpdatesMu.Lock()
incomingHealthUpdates := make(chan HealthUpdate, l.updateQueueCap)
incomingHealthUpdates := make(chan HealthUpdate, 10)
l.incomingHealthUpdates[id] = incomingHealthUpdates
l.incomingHealthUpdatesMu.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions pkg/health/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/rancher/opni/pkg/test"
)

var _ = Describe("Listener", func() {
var _ = Describe("Listener", Label("unit", "slow"), func() {
When("creating a new listener", func() {
It("should not send any updates", func() {
listener := health.NewListener()
Expand Down Expand Up @@ -109,7 +109,7 @@ var _ = Describe("Listener", func() {
}
}

Eventually(statusC).Should(HaveLen(98 * numAgents))
Eventually(statusC, 5*time.Second, 100*time.Millisecond).Should(HaveLen(98 * numAgents))
Consistently(statusC).Should(HaveLen(98 * numAgents))

// reconnect all agents
Expand All @@ -121,7 +121,7 @@ var _ = Describe("Listener", func() {
go listener.HandleConnection(test.ContextWithAuthorizedID(context.Background(), agentId), a)
}

Eventually(statusC).Should(HaveLen(99 * numAgents))
Eventually(statusC, 5*time.Second, 100*time.Millisecond).Should(HaveLen(99 * numAgents))
Consistently(statusC).Should(HaveLen(99 * numAgents))

listener.Close()
Expand Down
27 changes: 10 additions & 17 deletions pkg/health/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,20 @@ func (m *Monitor) WatchHealthStatus(ctx context.Context, id string) <-chan *core
if !ok {
break LOOP
}
hs := &corev1.HealthStatus{
Health: health,
Status: util.ProtoClone(curStatus),
}
select {
case <-ctx.Done():
case ch <- hs:
}
curHealth = health
case status, ok := <-sl:
if !ok {
break LOOP
}

hs := &corev1.HealthStatus{
Health: util.ProtoClone(curHealth),
Status: status,
}
select {
case <-ctx.Done():
case ch <- hs:
}
curStatus = status
}
hs := &corev1.HealthStatus{
Health: util.ProtoClone(curHealth),
Status: util.ProtoClone(curStatus),
}
select {
case <-ctx.Done():
case ch <- hs:
}
}

Expand Down
68 changes: 29 additions & 39 deletions pkg/management/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"time"

capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
Expand Down Expand Up @@ -84,51 +83,42 @@ func (m *Server) WatchClusters(
if err := validation.Validate(in); err != nil {
return err
}
known := map[string]*corev1.Reference{}
var known []*corev1.Cluster
for _, cluster := range in.GetKnownClusters().GetItems() {
if _, err := m.coreDataSource.StorageBackend().GetCluster(context.Background(), cluster); err != nil {
if c, err := m.coreDataSource.StorageBackend().GetCluster(stream.Context(), cluster); err != nil {
return err
} else {
known = append(known, c)
}
known[cluster.Id] = cluster
}
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
for {
clusters, err := m.coreDataSource.StorageBackend().ListClusters(context.Background(), nil, 0)
updatedIds := map[string]struct{}{}
if err != nil {
return err
}
for _, cluster := range clusters.Items {
updatedIds[cluster.Id] = struct{}{}
if _, ok := known[cluster.Id]; !ok {
ref := cluster.Reference()
known[cluster.Id] = ref
if err := stream.Send(&managementv1.WatchEvent{
Cluster: ref,
Type: managementv1.WatchEventType_Added,
}); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}
for id, cluster := range known {
if _, ok := updatedIds[id]; !ok {
delete(known, id)
if err := stream.Send(&managementv1.WatchEvent{
Cluster: cluster,
Type: managementv1.WatchEventType_Deleted,
}); err != nil {
return status.Error(codes.Internal, err.Error())
}
}

eventC, err := m.coreDataSource.StorageBackend().WatchClusters(stream.Context(), known)
if err != nil {
return err
}

for event := range eventC {
var c *corev1.Cluster
var eventType managementv1.WatchEventType
switch event.EventType {
case storage.WatchEventCreate:
eventType = managementv1.WatchEventType_Created
c = event.Current
case storage.WatchEventUpdate:
eventType = managementv1.WatchEventType_Updated
c = event.Current
case storage.WatchEventDelete:
eventType = managementv1.WatchEventType_Deleted
c = event.Previous
}
select {
case <-tick.C:
case <-stream.Context().Done():
return stream.Context().Err()
if err := stream.Send(&managementv1.WatchEvent{
Cluster: c,
Type: eventType,
}); err != nil {
return err
}
}
return nil
}

func (m *Server) EditCluster(
Expand Down
11 changes: 9 additions & 2 deletions pkg/management/clusters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var _ = Describe("Clusters", Ordered, Label("slow"), func() {
})
It("should create clusters", func() {
for x := 0; x < 3; x++ {
time.Sleep(time.Second * 1)
ids := map[string]struct{}{}
for i := 0; i < 10; i++ {
id := uuid.NewString()
Expand All @@ -75,12 +76,16 @@ var _ = Describe("Clusters", Ordered, Label("slow"), func() {
})
Expect(err).NotTo(HaveOccurred())
}
timeout := time.After(1100 * time.Millisecond)
timeout := time.After(2000 * time.Millisecond)
for i := 0; i < 10; i++ {
select {
case event := <-events:
Expect(event.Type).To(Equal(managementv1.WatchEventType_Added))
if event == nil {
break
}
Expect(event.Type).To(Equal(managementv1.WatchEventType_Created))
Expect(ids).To(HaveKey(event.Cluster.Id))
fmt.Println(event.Cluster.Id)
cluster, err := tv.client.GetCluster(context.Background(), &corev1.Reference{
Id: event.Cluster.Id,
})
Expand Down Expand Up @@ -141,6 +146,8 @@ var _ = Describe("Clusters", Ordered, Label("slow"), func() {

for event := range events {
Expect(event.Type).To(Equal(managementv1.WatchEventType_Deleted))
Expect(event.Cluster).NotTo(BeNil())
Expect(event.Cluster.Id).NotTo(BeNil())
Expect(ids).To(HaveKey(event.Cluster.Id))
delete(ids, event.Cluster.Id)

Expand Down
15 changes: 11 additions & 4 deletions pkg/management/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ func (m *Server) WatchClusterHealthStatus(
}

healthStatusC := m.healthStatusDataSource.WatchClusterHealthStatus(stream.Context(), ref)
for healthStatus := range healthStatusC {
if err := stream.Send(healthStatus); err != nil {
return err
for {
select {
case <-stream.Context().Done():
return nil
case healthStatus, ok := <-healthStatusC:
if !ok {
return nil
}
if err := stream.Send(healthStatus); err != nil {
return err
}
}
}
return nil
}
66 changes: 54 additions & 12 deletions pkg/opni/commands/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"reflect"

tea "github.com/charmbracelet/bubbletea"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
managementv1 "github.com/rancher/opni/pkg/apis/management/v1"
"github.com/rancher/opni/pkg/opni/ui"
cliutil "github.com/rancher/opni/pkg/opni/util"
"github.com/spf13/cobra"
"go.uber.org/zap"
Expand All @@ -18,34 +20,74 @@ func BuildClustersCmd() *cobra.Command {
Short: "Manage clusters",
}
clustersCmd.AddCommand(BuildClustersListCmd())
clustersCmd.AddCommand(BuildClustersWatchCmd())
clustersCmd.AddCommand(BuildClustersDeleteCmd())
clustersCmd.AddCommand(BuildClustersLabelCmd())
ConfigureManagementCommand(clustersCmd)
return clustersCmd
}

func BuildClustersListCmd() *cobra.Command {
var watch bool
cmd := &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "List clusters",
Run: func(cmd *cobra.Command, args []string) {
t, err := mgmtClient.ListClusters(cmd.Context(), &managementv1.ListClustersRequest{})
if err != nil {
lg.Fatal(err)
}
var healthStatus []*corev1.HealthStatus
for _, c := range t.Items {
stat, err := mgmtClient.GetClusterHealthStatus(cmd.Context(), c.Reference())
RunE: func(cmd *cobra.Command, args []string) error {
if watch {
m := ui.NewClusterListModel()
w := &ui.ClusterListWatcher{
Messages: make(chan tea.Msg, 100),
Client: mgmtClient,
}
go func() {
if err := w.Run(cmd.Context()); err != nil {
lg.Fatal(err)
}
}()
p := tea.NewProgram(m)
go func() {
for {
select {
case msg := <-w.Messages:
p.Send(msg)
case <-cmd.Context().Done():
p.Send(tea.Quit())
return
}
}
}()
return p.Start()
} else {
t, err := mgmtClient.ListClusters(cmd.Context(), &managementv1.ListClustersRequest{})
if err != nil {
healthStatus = append(healthStatus, &corev1.HealthStatus{})
} else {
healthStatus = append(healthStatus, stat)
return err
}
var healthStatus []*corev1.HealthStatus
for _, c := range t.Items {
stat, err := mgmtClient.GetClusterHealthStatus(cmd.Context(), c.Reference())
if err != nil {
healthStatus = append(healthStatus, &corev1.HealthStatus{})
} else {
healthStatus = append(healthStatus, stat)
}
}
fmt.Println(cliutil.RenderClusterList(t, healthStatus))
}
fmt.Println(cliutil.RenderClusterList(t, healthStatus))
return nil
},
}
cmd.Flags().BoolVarP(&watch, "watch", "w", false, "Watch for updates")
return cmd
}

func BuildClustersWatchCmd() *cobra.Command {
cmd := BuildClustersListCmd()
cmd.Use = "watch"
cmd.Aliases = []string{}
cmd.Short = "Alias for 'list --watch'"
cmd.Flags().Set("watch", "true")
cmd.Flags().MarkHidden("watch")
return cmd
}

Expand Down
Loading