diff --git a/go.mod b/go.mod index 8da5410e..9828d52c 100644 --- a/go.mod +++ b/go.mod @@ -336,3 +336,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) + +replace github.com/pterm/pterm v0.12.79 => github.com/liu-hm19/pterm v0.0.0 diff --git a/go.sum b/go.sum index d3f72e18..ed8a3d51 100644 --- a/go.sum +++ b/go.sum @@ -831,6 +831,8 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4= github.com/lithammer/fuzzysearch v1.1.8/go.mod h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ5ajtkr5xPLts4= +github.com/liu-hm19/pterm v0.0.0 h1:WfS5Rmh+znXGpPLnBR1Cn+ceTWXN8VJw4Ab54NMKy7s= +github.com/liu-hm19/pterm v0.0.0/go.mod h1:1v/gzOF1N0FsjbgTHZ1wVycRkKiatFvJSJC4IGaQAAo= github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s= github.com/lucasb-eyer/go-colorful v1.0.3 h1:QIbQXiugsb+q10B+MI+7DI1oQLdmnep86tWFlaaUAac= github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= @@ -972,8 +974,6 @@ github.com/pterm/pterm v0.12.31/go.mod h1:32ZAWZVXD7ZfG0s8qqHXePte42kdz8ECtRyEej github.com/pterm/pterm v0.12.33/go.mod h1:x+h2uL+n7CP/rel9+bImHD5lF3nM9vJj80k9ybiiTTE= github.com/pterm/pterm v0.12.36/go.mod h1:NjiL09hFhT/vWjQHSj1athJpx6H8cjpHXNAK5bUw8T8= github.com/pterm/pterm v0.12.40/go.mod h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkGTYf8s= -github.com/pterm/pterm v0.12.79 h1:lH3yrYMhdpeqX9y5Ep1u7DejyHy7NSQg9qrBjF9dFT4= -github.com/pterm/pterm v0.12.79/go.mod h1:1v/gzOF1N0FsjbgTHZ1wVycRkKiatFvJSJC4IGaQAAo= github.com/pulumi/pulumi/sdk/v3 v3.68.0 h1:JWn3DGJhzoWL8bNbUdyLSSPeKS2F9mv14/EL9QeVT3w= github.com/pulumi/pulumi/sdk/v3 v3.68.0/go.mod h1:A/WHc5MlxU8GpX/sRmfQ9G0/Bxxl4GNdSP7TQmy4yIw= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/pkg/apis/status/v1/types.go b/pkg/apis/status/v1/types.go index 5290a777..33458102 100644 --- a/pkg/apis/status/v1/types.go +++ b/pkg/apis/status/v1/types.go @@ -29,6 +29,16 @@ const ( IllegalManifest Code = "ILLEGAL_MANIFEST" ) +// ContextKey is used to represent the key associated with the information +// injected into the function context. +type ContextKey string + +const ( + // WatchChannel is used to inject a channel into the runtime operation to + // assist in watching the changes of the resources during the operation process. + WatchChannel = ContextKey("WatchChannel") +) + type Status interface { Kind() Kind Code() Code diff --git a/pkg/cmd/apply/apply.go b/pkg/cmd/apply/apply.go index 223917c2..229dc1f9 100644 --- a/pkg/cmd/apply/apply.go +++ b/pkg/cmd/apply/apply.go @@ -15,13 +15,19 @@ package apply import ( + "bytes" + "context" "fmt" "io" + "reflect" "strings" "sync" + "time" "github.com/pterm/pterm" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/templates" @@ -30,9 +36,13 @@ import ( "kusionstack.io/kusion/pkg/cmd/generate" "kusionstack.io/kusion/pkg/cmd/preview" cmdutil "kusionstack.io/kusion/pkg/cmd/util" + "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/printers" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/runtime" + runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/pkg/util/i18n" "kusionstack.io/kusion/pkg/util/pretty" @@ -60,10 +70,23 @@ var ( # Apply without output style and color kusion apply --no-style=true + # Apply without watching the resource changes + kusion apply --watch=false + + # Apply with the specified timeout duration for watching K8s resources, measured in second + kusion apply --timeout=120 + # Apply with localhost port forwarding kusion apply --port-forward=8080`) ) +// To handle the release phase update when panic occurs. +var ( + rel *apiv1.Release + releaseCreated = false + storage release.Storage +) + // ApplyFlags directly reflect the information that CLI is gathering via flags. They will be converted to // ApplyOptions, which reflect the runtime requirements for the command. // @@ -74,6 +97,7 @@ type ApplyFlags struct { Yes bool DryRun bool Watch bool + Timeout int PortForward int genericiooptions.IOStreams @@ -86,6 +110,7 @@ type ApplyOptions struct { Yes bool DryRun bool Watch bool + Timeout int PortForward int genericiooptions.IOStreams @@ -130,7 +155,8 @@ func (f *ApplyFlags) AddFlags(cmd *cobra.Command) { cmd.Flags().BoolVarP(&f.Yes, "yes", "y", false, i18n.T("Automatically approve and perform the update after previewing it")) cmd.Flags().BoolVarP(&f.DryRun, "dry-run", "", false, i18n.T("Preview the execution effect (always successful) without actually applying the changes")) - cmd.Flags().BoolVarP(&f.Watch, "watch", "", false, i18n.T("After creating/updating/deleting the requested object, watch for changes")) + cmd.Flags().BoolVarP(&f.Watch, "watch", "", true, i18n.T("After creating/updating/deleting the requested object, watch for changes")) + cmd.Flags().IntVarP(&f.Timeout, "timeout", "", 60, i18n.T("The timeout duration for watching K8s resources, measured in second (s)")) cmd.Flags().IntVarP(&f.PortForward, "port-forward", "", 0, i18n.T("Forward the specified port from local to service")) } @@ -147,6 +173,7 @@ func (f *ApplyFlags) ToOptions() (*ApplyOptions, error) { Yes: f.Yes, DryRun: f.DryRun, Watch: f.Watch, + Timeout: f.Timeout, PortForward: f.PortForward, IOStreams: f.IOStreams, } @@ -170,20 +197,8 @@ func (o *ApplyOptions) Validate(cmd *cobra.Command, args []string) error { // Run executes the `apply` command. func (o *ApplyOptions) Run() (err error) { // update release to succeeded or failed - var storage release.Storage - var rel *apiv1.Release - releaseCreated := false defer func() { - if !releaseCreated { - return - } - if err != nil { - rel.Phase = apiv1.ReleasePhaseFailed - _ = release.UpdateApplyRelease(storage, rel, o.DryRun) - } else { - rel.Phase = apiv1.ReleasePhaseSucceeded - err = release.UpdateApplyRelease(storage, rel, o.DryRun) - } + relHandler(rel, releaseCreated, err, o.DryRun) }() // set no style @@ -283,13 +298,15 @@ func (o *ApplyOptions) Run() (err error) { if err = release.UpdateApplyRelease(storage, rel, o.DryRun); err != nil { return } + // start applying - fmt.Println("Start applying diffs ...") + fmt.Printf("\nStart applying diffs ...\n") var updatedRel *apiv1.Release updatedRel, err = Apply(o, storage, rel, changes, o.IOStreams.Out) if err != nil { return } + // if dry run, print the hint if o.DryRun { fmt.Printf("\nNOTE: Currently running in the --dry-run mode, the above configuration does not really take effect\n") @@ -297,13 +314,6 @@ func (o *ApplyOptions) Run() (err error) { } rel = updatedRel - if o.Watch { - fmt.Println("\nStart watching changes ...") - if err = Watch(o, rel.Spec, changes); err != nil { - return - } - } - if o.PortForward > 0 { fmt.Printf("\nStart port-forwarding ...\n") if err = PortForward(o, rel.Spec); err != nil { @@ -323,6 +333,17 @@ func Apply( changes *models.Changes, out io.Writer, ) (*apiv1.Release, error) { + var err error + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("failed to execute kusion apply as %v", p) + log.Error(err) + } + if err != nil { + relHandler(rel, releaseCreated, err, o.DryRun) + } + }() + // construct the apply operation ac := &operation.ApplyOperation{ Operation: models.Operation{ @@ -333,9 +354,28 @@ func Apply( }, } + // Init a watch channel with a sufficient buffer when it is necessary to perform watching. + if o.Watch { + ac.WatchCh = make(chan string, 100) + } + // line summary var ls lineSummary + // Get the multi printer from UI option. + multi := o.UI.MultiPrinter + + // Prepare the writer to print the operation progress and results. + changesWriterMap := make(map[string]*pterm.SpinnerPrinter) + for _, key := range changes.Values() { + changesWriterMap[key.ID], err = o.UI.SpinnerPrinter. + WithWriter(multi.NewWriter()).Start(fmt.Sprintf("Waiting %s to apply ...", pterm.Bold.Sprint(key.ID))) + if err != nil { + return nil, fmt.Errorf("failed to init change step spinner printer: %v", err) + } + } + + out = multi.NewWriter() // progress bar, print dag walk detail progressbar, err := o.UI.ProgressbarPrinter. WithMaxWidth(0). // Set to 0, the terminal width will be used @@ -346,13 +386,25 @@ func Apply( if err != nil { return nil, err } + + // The writer below is for operation error printing. + errWriter := multi.NewWriter() + + multi.Start() + multi.WithUpdateDelay(time.Millisecond * 50) + defer multi.Stop() + // wait msgCh close var wg sync.WaitGroup // receive msg and print detail go func() { defer func() { if p := recover(); p != nil { - log.Errorf("failed to receive msg and print detail as %v", p) + err = fmt.Errorf("failed to receive msg and print detail as %v", p) + log.Error(err) + } + if err != nil { + relHandler(rel, releaseCreated, err, o.DryRun) } }() wg.Add(1) @@ -375,14 +427,14 @@ func Apply( pterm.Bold.Sprint(changeStep.ID), strings.ToLower(string(models.Skip)), ) + changesWriterMap[msg.ResourceID].Success(title) } else { - title = fmt.Sprintf("%s %s %s", - changeStep.Action.String(), + title = fmt.Sprintf("%s %s", + changeStep.Action.Ing(), pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(msg.OpResult)), ) + changesWriterMap[msg.ResourceID].UpdateText(title) } - pretty.SuccessT.WithWriter(out).Printfln(title) progressbar.UpdateTitle(title) progressbar.Increment() ls.Count(changeStep.Action) @@ -392,19 +444,115 @@ func Apply( pterm.Bold.Sprint(changeStep.ID), strings.ToLower(string(msg.OpResult)), ) - pretty.ErrorT.WithWriter(out).Printf("%s\n", title) + changesWriterMap[msg.ResourceID].Fail(title) + errStr := pterm.Red(fmt.Sprintf("apply %s failed as: %s\n", msg.ResourceID, msg.OpErr.Error())) + pterm.Fprintln(errWriter, errStr) default: - title := fmt.Sprintf("%s %s %s", + title := fmt.Sprintf("%s %s", changeStep.Action.Ing(), pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(msg.OpResult)), ) + changesWriterMap[msg.ResourceID].UpdateText(title) progressbar.UpdateTitle(title) } } } }() + watchErrCh, k8sWatchErrCh := make(chan error), make(chan error) + // Apply while watching the resources. + if o.Watch { + resourceMap := make(map[string]apiv1.Resource) + ioWriterMap := make(map[string]io.Writer) + toBeWatched := apiv1.Resources{} + + // Get the resources to be watched. + for _, res := range rel.Spec.Resources { + if changes.ChangeOrder.ChangeSteps[res.ResourceKey()].Action != models.UnChanged { + resourceMap[res.ResourceKey()] = res + toBeWatched = append(toBeWatched, res) + } + } + + go func() { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("failed to receive and print the resources to be watched as %v", p) + log.Error(err) + } + if err != nil { + relHandler(rel, releaseCreated, err, o.DryRun) + } + + watchErrCh <- err + }() + runtimes, s := runtimeinit.Runtimes(toBeWatched) + if v1.IsErr(s) { + panic(fmt.Errorf("failed to init runtimes: %s", s.String())) + } + + tables := make(map[string]*printers.Table, len(toBeWatched)) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + + finished := make(map[string]bool) + watchedIDs := []string{} + + for !(len(finished) == len(toBeWatched)) { + select { + case id := <-ac.WatchCh: + res := resourceMap[id] + + // Set the timeout duration for watch context. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(o.Timeout)) + defer cancel() + + rsp := runtimes[res.Type].Watch(ctx, &runtime.WatchRequest{Resource: &res}) + if rsp == nil { + log.Debug("unsupported resource type: %s", res.Type) + continue + } + if v1.IsErr(rsp.Status) { + panic(fmt.Errorf("failed to watch %s as %s", id, rsp.Status.String())) + } + + w := rsp.Watchers + table := printers.NewTable(w.IDs) + tables[id] = table + + if res.Type == apiv1.Kubernetes { + go watchK8sResources(id, w.Watchers, table, tables, o.Timeout, o.DryRun, k8sWatchErrCh) + } else if res.Type == apiv1.Terraform { + go watchTFResources(id, w.TFWatcher, table, o.DryRun) + } else { + log.Debug("unsupported resource type to watch: %s", string(res.Type)) + continue + } + + ioWriterMap[id] = multi.NewWriter() + watchedIDs = append(watchedIDs, id) + + default: + for _, id := range watchedIDs { + w, ok := ioWriterMap[id] + if !ok { + panic(fmt.Errorf("failed to get io writer while watching %s", id)) + } + printTable(&w, id, tables) + } + + for id, table := range tables { + if table.AllCompleted() { + finished[id] = true + changesWriterMap[id].Success(fmt.Sprintf("%s %s, success", changes.Get(id).Action.String(), pterm.Bold.Sprint(id))) + } + } + <-ticker.C + } + } + }() + } + var updatedRel *apiv1.Release if o.DryRun { for _, r := range rel.Spec.Resources { @@ -425,52 +573,37 @@ func Apply( Release: rel, }) if v1.IsErr(st) { - return nil, fmt.Errorf("apply failed, status:\n%v", st) + errWriter.(*bytes.Buffer).Reset() + err = fmt.Errorf("apply failed, status:\n%v", st) + return nil, err } updatedRel = rsp.Release } // wait for msgCh closed wg.Wait() - // print summary - pterm.Println() - pterm.Fprintln(out, fmt.Sprintf("Apply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) - return updatedRel, nil -} - -// Watch function will observe the changes of each resource by the execution engine. -func Watch( - o *ApplyOptions, - planResources *apiv1.Spec, - changes *models.Changes, -) error { - if o.DryRun { - fmt.Println("NOTE: Watch doesn't work in DryRun mode") - return nil - } - - // filter out unchanged resources - toBeWatched := apiv1.Resources{} - for _, res := range planResources.Resources { - if changes.ChangeOrder.ChangeSteps[res.ResourceKey()].Action != models.UnChanged { - toBeWatched = append(toBeWatched, res) + // Wait for watchWg closed if need to perform watching. + if o.Watch { + shouldBreak := false + for !shouldBreak { + select { + case k8sWatchErr := <-k8sWatchErrCh: + if k8sWatchErr != nil { + return nil, k8sWatchErr + } + shouldBreak = true + case watchErr := <-watchErrCh: + if watchErr != nil { + return nil, watchErr + } + shouldBreak = true + } } } - // Watch operation - wo := &operation.WatchOperation{} - if err := wo.Watch(&operation.WatchRequest{ - Request: models.Request{ - Project: changes.Project(), - Stack: changes.Stack(), - }, - Spec: &apiv1.Spec{Resources: toBeWatched}, - }); err != nil { - return err - } - - fmt.Println("Watch Finish! All resources have been reconciled.") - return nil + // print summary + pterm.Fprintln(out, fmt.Sprintf("\nApply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) + return updatedRel, nil } // PortForward function will forward the specified port from local to the project Kubernetes Service. @@ -552,3 +685,165 @@ func prompt(ui *terminal.UI) (string, error) { return input, nil } + +func watchK8sResources( + id string, + chs []<-chan watch.Event, + table *printers.Table, + tables map[string]*printers.Table, + timeout int, + dryRun bool, + errCh chan error, +) { + // Resources selects + cases := createSelectCases(chs) + // Default select + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectDefault, + Chan: reflect.Value{}, + Send: reflect.Value{}, + }) + + timer := time.After(time.Second * time.Duration(timeout)) + done := make(chan bool) + go func() { + for { + chosen, recv, recvOK := reflect.Select(cases) + if cases[chosen].Dir == reflect.SelectDefault { + continue + } + if recvOK { + e := recv.Interface().(watch.Event) + o := e.Object.(*unstructured.Unstructured) + var detail string + var ready bool + if e.Type == watch.Deleted { + detail = fmt.Sprintf("%s has beed deleted", o.GetName()) + ready = true + } else { + // Restore to actual type + target := printers.Convert(o) + detail, ready = printers.Generate(target) + } + + // Mark ready for breaking loop + if ready { + e.Type = printers.READY + } + + // Save watched msg + table.Update( + engine.BuildIDForKubernetes(o), + printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail)) + + // Write back + tables[id] = table + } + + // Break when completed + if table.AllCompleted() { + break + } + } + done <- true + }() + + select { + case <-done: + return + case <-timer: + err := fmt.Errorf("failed to watch Kubernetes resource %s as timeout for %d seconds", id, timeout) + relHandler(rel, releaseCreated, err, dryRun) + errCh <- err + } +} + +func watchTFResources( + id string, + ch <-chan runtime.TFEvent, + table *printers.Table, + dryRun bool, +) { + defer func() { + var err error + if p := recover(); p != nil { + err = fmt.Errorf("failed to watch Terraform resource %s as %v", id, p) + log.Error(err) + } + if err != nil { + relHandler(rel, releaseCreated, err, dryRun) + } + }() + + for { + parts := strings.Split(id, engine.Separator) + if len(parts) != 4 { + panic(fmt.Errorf("invalid Terraform resource id: %s", id)) + } + + tfEvent := <-ch + if tfEvent == runtime.TFApplying { + table.Update( + id, + printers.NewRow(watch.EventType("Applying"), + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Applying...")) + } else if tfEvent == runtime.TFSucceeded { + table.Update( + id, + printers.NewRow(printers.READY, + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply succeeded")) + } else { + table.Update( + id, + printers.NewRow(watch.EventType("Failed"), + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply failed")) + } + + // Break when all completed. + if table.AllCompleted() { + break + } + } +} + +func createSelectCases(chs []<-chan watch.Event) []reflect.SelectCase { + cases := make([]reflect.SelectCase, 0, len(chs)) + for _, ch := range chs { + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + }) + } + return cases +} + +func printTable(w *io.Writer, id string, tables map[string]*printers.Table) { + // Reset the buffer for live flushing. + (*w).(*bytes.Buffer).Reset() + + // Print resource Key as heading text + _, _ = fmt.Fprintln(*w, pretty.LightCyanBold("[%s]", id)) + + table, ok := tables[id] + if !ok { + // Unsupported resource, leave a hint + _, _ = fmt.Fprintln(*w, "Skip monitoring unsupported resources") + } else { + // Print table + data := table.Print() + _ = pterm.DefaultTable.WithHasHeader().WithSeparator(" ").WithData(data).WithWriter(*w).Render() + } +} + +func relHandler(rel *apiv1.Release, releaseCreated bool, err error, dryRun bool) { + if !releaseCreated { + return + } + if err != nil { + rel.Phase = apiv1.ReleasePhaseFailed + _ = release.UpdateApplyRelease(storage, rel, dryRun) + } else { + rel.Phase = apiv1.ReleasePhaseSucceeded + err = release.UpdateApplyRelease(storage, rel, dryRun) + } +} diff --git a/pkg/engine/operation/apply.go b/pkg/engine/operation/apply.go index 621ab745..3bcf4ee7 100644 --- a/pkg/engine/operation/apply.go +++ b/pkg/engine/operation/apply.go @@ -101,6 +101,7 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta Stack: o.Stack, IgnoreFields: o.IgnoreFields, MsgCh: o.MsgCh, + WatchCh: o.WatchCh, Lock: &sync.Mutex{}, Release: rel, }, diff --git a/pkg/engine/operation/graph/resource_node.go b/pkg/engine/operation/graph/resource_node.go index 467b6f19..10a21251 100644 --- a/pkg/engine/operation/graph/resource_node.go +++ b/pkg/engine/operation/graph/resource_node.go @@ -133,8 +133,10 @@ func (rn *ResourceNode) computeActionType( } else if liveResource == nil { rn.Action = models.Create } else { + // Prepare the watch channel for runtime apply. + ctx := context.WithValue(context.Background(), v1.WatchChannel, operation.WatchCh) // Dry run to fetch predictable resource - dryRunResp := operation.RuntimeMap[rn.resource.Type].Apply(context.Background(), &runtime.ApplyRequest{ + dryRunResp := operation.RuntimeMap[rn.resource.Type].Apply(ctx, &runtime.ApplyRequest{ PriorResource: priorResource, PlanResource: planedResource, Stack: operation.Stack, @@ -225,7 +227,9 @@ func (rn *ResourceNode) applyResource(operation *models.Operation, prior, planed rt := operation.RuntimeMap[resourceType] switch rn.Action { case models.Create, models.Update: - response := rt.Apply(context.Background(), &runtime.ApplyRequest{PriorResource: prior, PlanResource: planed, Stack: operation.Stack}) + // Prepare the watch channel for runtime apply. + ctx := context.WithValue(context.Background(), v1.WatchChannel, operation.WatchCh) + response := rt.Apply(ctx, &runtime.ApplyRequest{PriorResource: prior, PlanResource: planed, Stack: operation.Stack}) res = response.Resource s = response.Status log.Debugf("apply resource:%s, response: %v", planed.ID, json.Marshal2String(response)) diff --git a/pkg/engine/operation/models/operation_context.go b/pkg/engine/operation/models/operation_context.go index 32c1886c..6bd83fa9 100644 --- a/pkg/engine/operation/models/operation_context.go +++ b/pkg/engine/operation/models/operation_context.go @@ -44,6 +44,10 @@ type Operation struct { // and this message will be displayed in the terminal MsgCh chan Message + // WatchCh is used to send the resource IDs that are ready to be watched after sending or executing + // the apply request. + WatchCh chan string + // Lock is the operation-wide mutex Lock *sync.Mutex diff --git a/pkg/engine/printers/table.go b/pkg/engine/printers/table.go index 62a5772b..f1819335 100644 --- a/pkg/engine/printers/table.go +++ b/pkg/engine/printers/table.go @@ -55,8 +55,14 @@ func (t *Table) AllCompleted() bool { func (t *Table) Print() [][]string { data := [][]string{{"Type", "Kind", "Name", "Detail"}} for _, id := range t.IDs { + var eventType k8swatch.EventType row := t.Rows[id] - eventType := row.Type + if row != nil { + eventType = row.Type + } else { + // In case that the row of the table hasn't updated contents. + continue + } // Colored type eventTypeS := "" diff --git a/pkg/engine/release/util.go b/pkg/engine/release/util.go index ac0ca698..4a48536c 100644 --- a/pkg/engine/release/util.go +++ b/pkg/engine/release/util.go @@ -46,9 +46,7 @@ func NewApplyRelease(storage Storage, project, stack, workspace string) (*v1.Rel if lastRelease.Phase != v1.ReleasePhaseSucceeded && lastRelease.Phase != v1.ReleasePhaseFailed { return nil, fmt.Errorf("cannot new release of project %s, workspace %s cause there is release in progress", project, workspace) } - if err != nil { - return nil, err - } + rel = &v1.Release{ Project: project, Workspace: workspace, diff --git a/pkg/engine/runtime/kubernetes/kubernetes_runtime.go b/pkg/engine/runtime/kubernetes/kubernetes_runtime.go index 420e2478..ec6c7960 100644 --- a/pkg/engine/runtime/kubernetes/kubernetes_runtime.go +++ b/pkg/engine/runtime/kubernetes/kubernetes_runtime.go @@ -159,6 +159,13 @@ func (k *KubernetesRuntime) Apply(ctx context.Context, request *runtime.ApplyReq // more concise and clean resource object. normalizeServerSideFields(res) + // Extract the watch channel from the context. + watchCh, _ := ctx.Value(v1.WatchChannel).(chan string) + if !request.DryRun && watchCh != nil { + log.Infof("Started to watch %s with the type of %s", planState.ResourceKey(), planState.Type) + watchCh <- planState.ResourceKey() + } + return &runtime.ApplyResponse{Resource: &apiv1.Resource{ ID: planState.ResourceKey(), Type: planState.Type, @@ -329,6 +336,10 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq return watched.GetName() == reqObj.GetName() }) + if rootCh == nil { + return &runtime.WatchResponse{Status: v1.NewErrorStatus(fmt.Errorf("failed to get root channel for watch"))} + } + // Collect all watchers := runtime.NewWatchers() watchers.Insert(engine.BuildIDForKubernetes(reqObj), rootCh) @@ -522,13 +533,19 @@ func (k *KubernetesRuntime) WatchByRelation( } var next *unstructured.Unstructured - return doWatch(ctx, w, func(watched *unstructured.Unstructured) bool { + + eventCh := doWatch(ctx, w, func(watched *unstructured.Unstructured) bool { ok := related(watched, cur) if ok { next = watched } return ok - }), next, nil + }) + if eventCh == nil { + err = fmt.Errorf("failed to get event channel for watching by relation") + } + + return eventCh, next, err } // doWatch send watched object if check ok @@ -562,9 +579,12 @@ func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watch }() // Owner&Dependent check pass, return the dependent Obj - <-signal - - return resultCh + select { + case <-signal: + return resultCh + case <-ctx.Done(): + return nil + } } // Judge dependent is owned by owner diff --git a/pkg/engine/runtime/runtime.go b/pkg/engine/runtime/runtime.go index f8bdc70f..75c2410b 100644 --- a/pkg/engine/runtime/runtime.go +++ b/pkg/engine/runtime/runtime.go @@ -14,6 +14,18 @@ const ( Terraform apiv1.Type = "Terraform" ) +// TFEvent represents the status of the Terraform resource operation event. +type TFEvent string + +const ( + // TFApplying means the TF resource is still being applied. + TFApplying = TFEvent("Applying") + // TFSucceeded means the TF resource operation has succeeded. + TFSucceeded = TFEvent("Succeeded") + // TFFailed means the TF resource operation has failed. + TFFailed = TFEvent("Failed") +) + // Runtime represents an actual infrastructure runtime managed by Kusion and every runtime implements this interface can be orchestrated // by Kusion like normal K8s resources. All methods in this interface are designed for manipulating one Resource at a time and will be // invoked in operations like Apply, Preview, Destroy, etc. @@ -126,6 +138,8 @@ type WatchResponse struct { type SequentialWatchers struct { IDs []string Watchers []<-chan watch.Event + // TFWatcher is for watching the operation event of the Terraform resources. + TFWatcher <-chan TFEvent } func NewWatchers() *SequentialWatchers { diff --git a/pkg/engine/runtime/terraform/terraform_runtime.go b/pkg/engine/runtime/terraform/terraform_runtime.go index 263179a7..28b18830 100644 --- a/pkg/engine/runtime/terraform/terraform_runtime.go +++ b/pkg/engine/runtime/terraform/terraform_runtime.go @@ -2,9 +2,11 @@ package terraform import ( "context" + "fmt" "os" "path/filepath" "sync" + "time" "github.com/spf13/afero" @@ -17,6 +19,10 @@ import ( var _ runtime.Runtime = &TerraformRuntime{} +// tfEvents is used to record the operation events of the Terraform +// resources into the related channels for watching. +var tfEvents = make(map[string]chan runtime.TFEvent) + type TerraformRuntime struct { tfops.WorkSpace mu *sync.Mutex @@ -83,15 +89,60 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ } } - tfstate, err := t.WorkSpace.Apply(ctx) - if err != nil { - return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} - } + var tfstate *tfops.StateRepresentation + var providerAddr string - // get terraform provider version - providerAddr, err := t.WorkSpace.GetProvider() - if err != nil { - return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + // Extract the watch channel from the context. + watchCh, _ := ctx.Value(v1.WatchChannel).(chan string) + if watchCh != nil { + // Apply while watching the resource. + errCh := make(chan error) + + // Start applying the resource. + go func() { + tfstate, err = t.WorkSpace.Apply(ctx) + if err != nil { + errCh <- err + } + + providerAddr, err = t.WorkSpace.GetProvider() + errCh <- err + }() + + // Prepare the event channel and send the resource ID to watch channel. + log.Infof("Started to watch %s with the type of %s", plan.ResourceKey(), plan.Type) + eventCh := make(chan runtime.TFEvent) + tfEvents[plan.ResourceKey()] = eventCh + watchCh <- plan.ResourceKey() + + // Wait for the apply to be finished. + shouldBreak := false + for !shouldBreak { + select { + case err = <-errCh: + if err != nil { + eventCh <- runtime.TFFailed + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } + eventCh <- runtime.TFSucceeded + shouldBreak = true + default: + eventCh <- runtime.TFApplying + time.Sleep(time.Second * 1) + } + } + } else { + // Apply without watching. + tfstate, err = t.WorkSpace.Apply(ctx) + if err != nil { + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } + + // get terraform provider version + providerAddr, err = t.WorkSpace.GetProvider() + if err != nil { + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } } r := tfops.ConvertTFState(tfstate, providerAddr) @@ -219,5 +270,17 @@ func (t *TerraformRuntime) Delete(ctx context.Context, request *runtime.DeleteRe // Watch terraform resource func (t *TerraformRuntime) Watch(ctx context.Context, request *runtime.WatchRequest) *runtime.WatchResponse { - return nil + // Get the event channel. + id := request.Resource.ResourceKey() + eventCh, ok := tfEvents[id] + if !ok { + return &runtime.WatchResponse{Status: v1.NewErrorStatus(fmt.Errorf("failed to get the event channel for %s", id))} + } + + return &runtime.WatchResponse{ + Watchers: &runtime.SequentialWatchers{ + IDs: []string{id}, + TFWatcher: eventCh, + }, + } } diff --git a/pkg/util/terminal/ui.go b/pkg/util/terminal/ui.go index 282bc79c..d3b66d5b 100644 --- a/pkg/util/terminal/ui.go +++ b/pkg/util/terminal/ui.go @@ -9,6 +9,7 @@ type UI struct { SpinnerPrinter *pterm.SpinnerPrinter ProgressbarPrinter *pterm.ProgressbarPrinter InteractiveSelectPrinter *pterm.InteractiveSelectPrinter + MultiPrinter *pterm.MultiPrinter } // DefaultUI returns a UI for Kusion CLI display with default @@ -18,5 +19,6 @@ func DefaultUI() *UI { SpinnerPrinter: &pretty.SpinnerT, ProgressbarPrinter: &pterm.DefaultProgressbar, InteractiveSelectPrinter: &pterm.DefaultInteractiveSelect, + MultiPrinter: &pterm.DefaultMultiPrinter, } }