Skip to content

Commit

Permalink
use reflect pkg to break the limit of for-range by dynamic select/cas…
Browse files Browse the repository at this point in the history
…e set (#240)
  • Loading branch information
howieyuen committed Feb 14, 2023
1 parent bdfbf04 commit d4c20f3
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 31 deletions.
90 changes: 62 additions & 28 deletions pkg/engine/operation/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"time"

"github.com/gosuri/uilive"
Expand Down Expand Up @@ -77,30 +78,35 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {
defer ticker.Stop()

// Counting completed resource
finished := make(map[string]bool, len(ids))
// Start printing
for {
// Finish watch
if len(finished) == len(ids) {
break
}
// Range tables by id
for _, id := range ids {
chs, ok := msgChs[id]
if !ok {
continue
}

// Get or new the target table
table, exist := tables[id]
if !exist {
table = printers.NewTable()
}
finished := make(map[string]bool)

// Range channels for each table
for _, ch := range chs {
select {
case e := <-ch:
// Start go routine for each table
for _, id := range ids {
chs, ok := msgChs[id]
if !ok {
continue
}
// Get or new the target table
table, exist := tables[id]
if !exist {
table = printers.NewTable(len(chs))
}
go func(id string, chs []<-chan k8swatch.Event, table *printers.Table) {
// Resources selects
cases := createSelectCases(chs)
// Default select
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectDefault,
Chan: reflect.Value{},
Send: reflect.Value{},
})
for {
chosen, recv, recvOK := reflect.Select(cases)
if cases[chosen].Dir == reflect.SelectDefault {
continue
}
if recvOK {
e := recv.Interface().(k8swatch.Event)
o := e.Object.(*unstructured.Unstructured)
var detail string
var ready bool
Expand All @@ -122,19 +128,36 @@ func (wo *WatchOperation) Watch(req *WatchRequest) error {
table.InsertOrUpdate(
engine.BuildIDForKubernetes(o.GetAPIVersion(), o.GetKind(), o.GetNamespace(), o.GetName()),
printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail))
case <-ticker.C:
// Should never reach

// Write back
tables[id] = table
}

// Break when completed
if table.IsCompleted() {
break
}
}
}(id, chs, table)
}

// Waiting for all tables completed
for {
// Finish watch
if len(finished) == len(ids) {
break
}

// Range tables
for id, table := range tables {
// All channels are isCompleted
if table.IsCompleted() {
finished[id] = true
}

// Write back
tables[id] = table
}

// Render table every 1s
<-ticker.C
wo.printTables(writer, ids, tables)
}
return nil
Expand All @@ -161,3 +184,14 @@ func (wo *WatchOperation) printTables(w *uilive.Writer, ids []string, tables map

_ = w.Flush()
}

func createSelectCases(chs []<-chan k8swatch.Event) []reflect.SelectCase {
cases := make([]reflect.SelectCase, 0, len(chs))
for _, ch := range chs {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
return cases
}
9 changes: 6 additions & 3 deletions pkg/engine/printers/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type Row struct {
Detail string
}

func NewTable() *Table {
func NewTable(capacity int) *Table {
return &Table{
IDs: []string{},
Rows: map[string]*Row{},
IDs: make([]string, 0, capacity),
Rows: make(map[string]*Row, capacity),
}
}

Expand All @@ -45,6 +45,9 @@ func (t *Table) InsertOrUpdate(id string, row *Row) {
}

func (t *Table) IsCompleted() bool {
if len(t.IDs) < cap(t.IDs) {
return false
}
for _, row := range t.Rows {
if row.Type != READY {
return false
Expand Down

0 comments on commit d4c20f3

Please sign in to comment.