Skip to content

Commit

Permalink
bugfix: kusion watch has no pod details (#252)
Browse files Browse the repository at this point in the history
- skip unsupported resources instead of returning error
- wait for the first watched obj if has dependent
- replace with github.com/howieyuen/uilive for ignoring ASCII color codes
  • Loading branch information
howieyuen committed Feb 22, 2023
1 parent 6028ea2 commit ca7a644
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 51 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/go-github/v50 v50.0.0
github.com/gookit/goutil v0.5.1
github.com/gosuri/uilive v0.0.4
github.com/hashicorp/errwrap v1.0.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-version v1.4.0
github.com/hashicorp/hcl/v2 v2.15.0
github.com/howieyuen/uilive v0.0.6
github.com/imdario/mergo v0.3.13
github.com/jinzhu/copier v0.3.2
github.com/lucasb-eyer/go-colorful v1.0.3
Expand Down Expand Up @@ -141,6 +141,7 @@ require (
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/gookit/color v1.5.0 // indirect
github.com/gosuri/uilive v0.0.4 // indirect
github.com/goware/prefixer v0.0.0-20160118172347-395022866408 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4Dvx
github.com/hokaccha/go-prettyjson v0.0.0-20190818114111-108c894c2c0e/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
github.com/howeyc/gopass v0.0.0-20170109162249-bf9dde6d0d2c h1:kQWxfPIHVLbgLzphqk3QUflDy9QdksZR4ygR807bpy0=
github.com/howeyc/gopass v0.0.0-20170109162249-bf9dde6d0d2c/go.mod h1:lADxMC39cJJqL93Duh1xhAs4I2Zs8mKS89XWXFGp9cs=
github.com/howieyuen/uilive v0.0.6 h1:BgyopdqMZNxsBiOazBlZOPtq4cl5yDLEXcRurZt25+c=
github.com/howieyuen/uilive v0.0.6/go.mod h1:A2XFIq01RvLom1CHxG3tZmFAvtF50S5NlnMYjC6gH3c=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down
7 changes: 3 additions & 4 deletions pkg/cmd/apply/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (o *ApplyOptions) Run() error {

if o.Watch {
fmt.Println("\nStart watching changes ...")
if err := Watch(o, sp, changes, os.Stdout); err != nil {
if err := Watch(o, sp, changes); err != nil {
return err
}
}
Expand Down Expand Up @@ -324,10 +324,9 @@ func Apply(
func Watch(o *ApplyOptions,
planResources *models.Spec,
changes *opsmodels.Changes,
out io.Writer,
) error {
if o.DryRun {
fmt.Fprintln(out, "NOTE: Watch doesn't work in DryRun mode")
fmt.Println("NOTE: Watch doesn't work in DryRun mode")
return nil
}

Expand All @@ -351,7 +350,7 @@ func Watch(o *ApplyOptions,
return err
}

fmt.Fprintln(out, "\nWatch Finish! All resources have been reconciled.")
fmt.Println("Watch Finish! All resources have been reconciled.")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/apply/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ var (

func newSA(name string) models.Resource {
return models.Resource{
ID: engine.BuildIDForKubernetes(apiVersion, kind, namespace, name),
ID: engine.BuildID(apiVersion, kind, namespace, name),
Type: "Kubernetes",
Attributes: map[string]interface{}{
"apiVersion": apiVersion,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/destroy/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ var (

func newSA(name string) models.Resource {
return models.Resource{
ID: engine.BuildIDForKubernetes(apiVersion, kind, namespace, name),
ID: engine.BuildID(apiVersion, kind, namespace, name),
Type: "Kubernetes",
Attributes: map[string]interface{}{
"apiVersion": apiVersion,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/preview/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func mockOperationPreview() {

func newSA(name string) models.Resource {
return models.Resource{
ID: engine.BuildIDForKubernetes(apiVersion, kind, namespace, name),
ID: engine.BuildID(apiVersion, kind, namespace, name),
Type: "Kubernetes",
Attributes: map[string]interface{}{
"apiVersion": apiVersion,
Expand Down
35 changes: 18 additions & 17 deletions pkg/engine/operation/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"reflect"
"time"

"github.com/gosuri/uilive"
"github.com/howieyuen/uilive"
"github.com/pterm/pterm"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8swatch "k8s.io/apimachinery/pkg/watch"
Expand All @@ -17,6 +17,7 @@ import (
"kusionstack.io/kusion/pkg/engine/printers"
"kusionstack.io/kusion/pkg/engine/runtime"
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
"kusionstack.io/kusion/pkg/log"
"kusionstack.io/kusion/pkg/status"
"kusionstack.io/kusion/pkg/util/pretty"
)
Expand All @@ -42,28 +43,28 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {
wo.RuntimeMap = runtimes

// Result channels
msgChs := make(map[string][]<-chan k8swatch.Event, len(resources))
msgChs := make(map[string]*runtime.SequentialWatchers, len(resources))
// Keep sorted
ids := make([]string, resources.Len())
// Collect watchers
for i := range resources {
res := &resources[i]

// only support k8s resources
t := res.Type
if runtime.Kubernetes != t {
return fmt.Errorf("WARNING: Watch only support Kubernetes resources for now")
}

// Get watchers
// Get watchers, only support k8s resources
resp := runtimes[t].Watch(ctx, &runtime.WatchRequest{Resource: res})
if resp == nil {
log.Debug("unsupported resource type: %s", t)
continue
}
if status.IsErr(resp.Status) {
return fmt.Errorf(resp.Status.String())
}

// Save id
ids[i] = res.ResourceKey()
// Save channels
msgChs[res.ResourceKey()] = resp.ResultChs
msgChs[res.ResourceKey()] = resp.Watchers
}

// Console writer
Expand All @@ -82,14 +83,14 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {

// Start go routine for each table
for _, id := range ids {
chs, ok := msgChs[id]
sw, ok := msgChs[id]
if !ok {
continue
}
// Get or new the target table
table, exist := tables[id]
if !exist {
table = printers.NewTable(len(chs))
table = printers.NewTable(sw.IDs)
}
go func(id string, chs []<-chan k8swatch.Event, table *printers.Table) {
// Resources selects
Expand Down Expand Up @@ -125,20 +126,20 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {
}

// Save watched msg
table.InsertOrUpdate(
engine.BuildIDForKubernetes(o.GetAPIVersion(), o.GetKind(), o.GetNamespace(), o.GetName()),
table.Update(
engine.BuildIDForKubernetes(o),
printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail))

// Write back
tables[id] = table
}

// Break when completed
if table.IsCompleted() {
if table.AllCompleted() {
break
}
}
}(id, chs, table)
}(id, sw.Watchers, table)
}

// Waiting for all tables completed
Expand All @@ -151,7 +152,7 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {
// Range tables
for id, table := range tables {
// All channels are isCompleted
if table.IsCompleted() {
if table.AllCompleted() {
finished[id] = true
}
}
Expand All @@ -166,7 +167,7 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {
func (wo *WatchOperation) printTables(w *uilive.Writer, ids []string, tables map[string]*printers.Table) {
for i, id := range ids {
// Print resource Key as heading text
_, _ = fmt.Fprintf(w, "%s\n", pretty.LightCyanBold("[%s]", id))
_, _ = fmt.Fprintln(w, pretty.LightCyanBold("[%s]", id))

table, ok := tables[id]
if !ok {
Expand Down
7 changes: 5 additions & 2 deletions pkg/engine/operation/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ func (f *fooWatchRuntime) Watch(ctx context.Context, request *runtime.WatchReque
}()

return &runtime.WatchResponse{
ResultChs: []<-chan k8sWatch.Event{out},
Status: nil,
Watchers: &runtime.SequentialWatchers{
IDs: []string{"apps/v1:Deployment:foo:bar"},
Watchers: []<-chan k8sWatch.Event{out},
},
Status: nil,
}
}
19 changes: 7 additions & 12 deletions pkg/engine/printers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type Row struct {
Detail string
}

func NewTable(capacity int) *Table {
func NewTable(ids []string) *Table {
return &Table{
IDs: make([]string, 0, capacity),
Rows: make(map[string]*Row, capacity),
IDs: ids,
Rows: make(map[string]*Row),
}
}

Expand All @@ -36,16 +36,12 @@ func NewRow(t k8swatch.EventType, kind, name, detail string) *Row {

const READY k8swatch.EventType = "READY"

func (t *Table) InsertOrUpdate(id string, row *Row) {
_, ok := t.Rows[id]
if !ok {
t.IDs = append(t.IDs, id)
}
func (t *Table) Update(id string, row *Row) {
t.Rows[id] = row
}

func (t *Table) IsCompleted() bool {
if len(t.IDs) < cap(t.IDs) {
func (t *Table) AllCompleted() bool {
if len(t.Rows) < len(t.IDs) {
return false
}
for _, row := range t.Rows {
Expand All @@ -57,8 +53,7 @@ func (t *Table) IsCompleted() bool {
}

func (t *Table) Print() [][]string {
data := [][]string{}
data = append(data, []string{"Type", "Kind", "Name", "Detail"})
data := [][]string{{"Type", "Kind", "Name", "Detail"}}
for _, id := range t.IDs {
row := t.Rows[id]
eventType := row.Type
Expand Down
31 changes: 22 additions & 9 deletions pkg/engine/runtime/kubernetes/kubernetes_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

"kusionstack.io/kusion/pkg/engine"
"kusionstack.io/kusion/pkg/engine/models"
"kusionstack.io/kusion/pkg/engine/printers/k8s"
"kusionstack.io/kusion/pkg/engine/runtime"
Expand Down Expand Up @@ -315,8 +316,8 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
})

// Collect all
var resultChs []<-chan k8swatch.Event
resultChs = append(resultChs, rootCh)
watchers := runtime.NewWatchers()
watchers.Insert(engine.BuildIDForKubernetes(reqObj), rootCh)

if reqObj.GetKind() == k8s.Service { // Watch Endpoints or EndpointSlice
if gvk, err := k.mapper.KindFor(schema.GroupVersionResource{
Expand All @@ -326,18 +327,18 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
}); gvk.Empty() || err != nil { // Watch Endpoints
log.Errorf("k8s runtime has no kind for EndpointSlice, err: %v", err)
namedGVK := getNamedGVK(reqObj.GroupVersionKind())
ch, _, err := k.WatchByRelation(ctx, reqObj, namedGVK, namedBy)
ch, dependent, err := k.WatchByRelation(ctx, reqObj, namedGVK, namedBy)
if err != nil {
return &runtime.WatchResponse{Status: status.NewErrorStatus(err)}
}
resultChs = append(resultChs, ch)
watchers.Insert(engine.BuildIDForKubernetes(dependent), ch)
} else { // Watch EndpointSlice
dependentGVK := getDependentGVK(reqObj.GroupVersionKind())
ch, _, err := k.WatchByRelation(ctx, reqObj, dependentGVK, ownedBy)
ch, dependent, err := k.WatchByRelation(ctx, reqObj, dependentGVK, ownedBy)
if err != nil {
return &runtime.WatchResponse{Status: status.NewErrorStatus(err)}
}
resultChs = append(resultChs, ch)
watchers.Insert(engine.BuildIDForKubernetes(dependent), ch)
}
} else {
// Try to get dependent resource by owner reference
Expand All @@ -349,11 +350,12 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
if err != nil {
return &runtime.WatchResponse{Status: status.NewErrorStatus(err)}
}
resultChs = append(resultChs, ch)

if dependent == nil {
break
}
watchers.Insert(engine.BuildIDForKubernetes(dependent), ch)

// Try to get deeper, max depth is 3 including root
dependentGVK = getDependentGVK(dependent.GroupVersionKind())
// Replace owner
Expand All @@ -362,7 +364,7 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq
}
}

return &runtime.WatchResponse{ResultChs: resultChs}
return &runtime.WatchResponse{Watchers: watchers}
}

// getKubernetesClient get kubernetes client
Expand Down Expand Up @@ -499,7 +501,11 @@ func (k *KubernetesRuntime) WatchByRelation(

// doWatch send watched object if check ok
func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watched *unstructured.Unstructured) bool) <-chan k8swatch.Event {
resultCh := make(chan k8swatch.Event)
// Buffered channel, store new event
resultCh := make(chan k8swatch.Event, 1)
// Wait for the first watched obj
first := true
signal := make(chan struct{})
go func() {
defer watcher.Stop()
for {
Expand All @@ -512,13 +518,20 @@ func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watch
// Check
if checker == nil || checker != nil && checker(dependent) {
resultCh <- e
if first {
signal <- struct{}{}
first = false
}
}
case <-ctx.Done():
return
}
}
}()

// Owner&Dependent check pass, return the dependent Obj
<-signal

return resultCh
}

Expand Down
19 changes: 18 additions & 1 deletion pkg/engine/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,25 @@ type WatchRequest struct {
}

type WatchResponse struct {
ResultChs []<-chan watch.Event
Watchers *SequentialWatchers

// Status contains messages will show to users
Status status.Status
}

type SequentialWatchers struct {
IDs []string
Watchers []<-chan watch.Event
}

func NewWatchers() *SequentialWatchers {
return &SequentialWatchers{
IDs: []string{},
Watchers: []<-chan watch.Event{},
}
}

func (w *SequentialWatchers) Insert(id string, watcher <-chan watch.Event) {
w.IDs = append(w.IDs, id)
w.Watchers = append(w.Watchers, watcher)
}
Loading

0 comments on commit ca7a644

Please sign in to comment.