diff --git a/go.mod b/go.mod index 8da5410e..1bc8e699 100644 --- a/go.mod +++ b/go.mod @@ -48,12 +48,13 @@ require ( github.com/hashicorp/vault/api v1.10.0 github.com/howieyuen/uilive v0.0.6 github.com/jinzhu/copier v0.3.2 + github.com/liu-hm19/pterm v0.12.79-fix.2 github.com/lucasb-eyer/go-colorful v1.0.3 github.com/mitchellh/hashstructure v1.0.0 github.com/onsi/ginkgo/v2 v2.15.0 github.com/onsi/gomega v1.31.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pkg/errors v0.9.1 - github.com/pterm/pterm v0.12.79 github.com/pulumi/pulumi/sdk/v3 v3.68.0 github.com/sergi/go-diff v1.3.1 github.com/spf13/afero v1.6.0 diff --git a/go.sum b/go.sum index d3f72e18..c91c8d58 100644 --- a/go.sum +++ b/go.sum @@ -831,6 +831,8 @@ github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhn github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4= github.com/lithammer/fuzzysearch v1.1.8/go.mod h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ5ajtkr5xPLts4= +github.com/liu-hm19/pterm v0.12.79-fix.2 h1:ORG4SrqnZ/MIBllsFpit5wP8m1GfwyRkFanXdS7XgNM= +github.com/liu-hm19/pterm v0.12.79-fix.2/go.mod h1:JHlyOLeEtmSBsCKu0O0/f7ClLXxDVi5ru1nDvhXSFLc= github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s= github.com/lucasb-eyer/go-colorful v1.0.3 h1:QIbQXiugsb+q10B+MI+7DI1oQLdmnep86tWFlaaUAac= github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= @@ -930,6 +932,8 @@ github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI= @@ -972,8 +976,8 @@ github.com/pterm/pterm v0.12.31/go.mod h1:32ZAWZVXD7ZfG0s8qqHXePte42kdz8ECtRyEej github.com/pterm/pterm v0.12.33/go.mod h1:x+h2uL+n7CP/rel9+bImHD5lF3nM9vJj80k9ybiiTTE= github.com/pterm/pterm v0.12.36/go.mod h1:NjiL09hFhT/vWjQHSj1athJpx6H8cjpHXNAK5bUw8T8= github.com/pterm/pterm v0.12.40/go.mod h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkGTYf8s= -github.com/pterm/pterm v0.12.79 h1:lH3yrYMhdpeqX9y5Ep1u7DejyHy7NSQg9qrBjF9dFT4= -github.com/pterm/pterm v0.12.79/go.mod h1:1v/gzOF1N0FsjbgTHZ1wVycRkKiatFvJSJC4IGaQAAo= +github.com/pterm/pterm v0.12.53 h1:8ERV5eXyvXlAIY8LRrhapPS34j7IKKDAnb7o1Ih3T0w= +github.com/pterm/pterm v0.12.53/go.mod h1:BY2H3GtX2BX0ULqLY11C2CusIqnxsYerbkil3XvXIBg= github.com/pulumi/pulumi/sdk/v3 v3.68.0 h1:JWn3DGJhzoWL8bNbUdyLSSPeKS2F9mv14/EL9QeVT3w= github.com/pulumi/pulumi/sdk/v3 v3.68.0/go.mod h1:A/WHc5MlxU8GpX/sRmfQ9G0/Bxxl4GNdSP7TQmy4yIw= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= diff --git a/pkg/clipath/home_windows_test.go b/pkg/clipath/home_windows_test.go index 864633f7..bd14694b 100644 --- a/pkg/clipath/home_windows_test.go +++ b/pkg/clipath/home_windows_test.go @@ -16,6 +16,14 @@ package clipath +import ( + "os" + "testing" + + "github.com/adrg/xdg" + "github.com/stretchr/testify/assert" +) + func TestKusionHome(t *testing.T) { os.Setenv("XDG_CACHE_HOME", "c:\\") os.Setenv("XDG_CONFIG_HOME", "d:\\") diff --git a/pkg/cmd/apply/apply.go b/pkg/cmd/apply/apply.go index 223917c2..fafde1a6 100644 --- a/pkg/cmd/apply/apply.go +++ b/pkg/cmd/apply/apply.go @@ -15,13 +15,21 @@ package apply import ( + "bytes" + "context" + "errors" "fmt" "io" + "os" + "reflect" "strings" "sync" + "time" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/spf13/cobra" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/templates" @@ -30,12 +38,17 @@ import ( "kusionstack.io/kusion/pkg/cmd/generate" "kusionstack.io/kusion/pkg/cmd/preview" cmdutil "kusionstack.io/kusion/pkg/cmd/util" + "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/printers" "kusionstack.io/kusion/pkg/engine/release" + "kusionstack.io/kusion/pkg/engine/runtime" + runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init" "kusionstack.io/kusion/pkg/log" "kusionstack.io/kusion/pkg/util/i18n" "kusionstack.io/kusion/pkg/util/pretty" + "kusionstack.io/kusion/pkg/util/signal" "kusionstack.io/kusion/pkg/util/terminal" ) @@ -60,10 +73,29 @@ var ( # Apply without output style and color kusion apply --no-style=true + # Apply without watching the resource changes and waiting for reconciliation + kusion apply --watch=false + + # Apply with the specified timeout duration for kusion apply command, measured in second(s) + kusion apply --timeout=120 + # Apply with localhost port forwarding kusion apply --port-forward=8080`) ) +// To handle the release phase update when panic occurs. +// Fixme: adopt a more centralized approach to manage the release update before exiting, instead of +// scattering them across different go-routines. +var ( + rel *apiv1.Release + relLock = &sync.Mutex{} + releaseCreated = false + storage release.Storage + portForwarded = false +) + +var errExit = errors.New("receive SIGTERM or SIGINT, exit cmd") + // ApplyFlags directly reflect the information that CLI is gathering via flags. They will be converted to // ApplyOptions, which reflect the runtime requirements for the command. // @@ -74,6 +106,7 @@ type ApplyFlags struct { Yes bool DryRun bool Watch bool + Timeout int PortForward int genericiooptions.IOStreams @@ -86,6 +119,7 @@ type ApplyOptions struct { Yes bool DryRun bool Watch bool + Timeout int PortForward int genericiooptions.IOStreams @@ -130,7 +164,8 @@ func (f *ApplyFlags) AddFlags(cmd *cobra.Command) { cmd.Flags().BoolVarP(&f.Yes, "yes", "y", false, i18n.T("Automatically approve and perform the update after previewing it")) cmd.Flags().BoolVarP(&f.DryRun, "dry-run", "", false, i18n.T("Preview the execution effect (always successful) without actually applying the changes")) - cmd.Flags().BoolVarP(&f.Watch, "watch", "", false, i18n.T("After creating/updating/deleting the requested object, watch for changes")) + cmd.Flags().BoolVarP(&f.Watch, "watch", "", true, i18n.T("After creating/updating/deleting the requested object, watch for changes")) + cmd.Flags().IntVarP(&f.Timeout, "timeout", "", 0, i18n.T("The timeout duration for kusion apply command, measured in second(s)")) cmd.Flags().IntVarP(&f.PortForward, "port-forward", "", 0, i18n.T("Forward the specified port from local to service")) } @@ -147,6 +182,7 @@ func (f *ApplyFlags) ToOptions() (*ApplyOptions, error) { Yes: f.Yes, DryRun: f.DryRun, Watch: f.Watch, + Timeout: f.Timeout, PortForward: f.PortForward, IOStreams: f.IOStreams, } @@ -170,19 +206,17 @@ func (o *ApplyOptions) Validate(cmd *cobra.Command, args []string) error { // Run executes the `apply` command. func (o *ApplyOptions) Run() (err error) { // update release to succeeded or failed - var storage release.Storage - var rel *apiv1.Release - releaseCreated := false defer func() { if !releaseCreated { return } if err != nil { - rel.Phase = apiv1.ReleasePhaseFailed - _ = release.UpdateApplyRelease(storage, rel, o.DryRun) + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + // Join the errors if update apply release failed. + err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) } else { - rel.Phase = apiv1.ReleasePhaseSucceeded - err = release.UpdateApplyRelease(storage, rel, o.DryRun) + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseSucceeded, relLock) + err = release.UpdateApplyRelease(storage, rel, o.DryRun, relLock) } }() @@ -207,6 +241,64 @@ func (o *ApplyOptions) Run() (err error) { releaseCreated = true } + // Prepare for the timeout timer. + // Fixme: adopt a more centralized approach to manage the gracefully exit interrupted by + // the SIGINT or SIGTERM, instead of scattering them across different go-routines. + var timer <-chan time.Time + errCh := make(chan error, 1) + defer close(errCh) + + // Wait for the SIGTERM or SIGINT. + go func() { + stopCh := signal.SetupSignalHandler() + <-stopCh + errCh <- errExit + }() + + go func() { + errCh <- o.run(rel, storage) + }() + + // Check whether the kusion apply command has timed out. + if o.Timeout > 0 { + timer = time.After(time.Second * time.Duration(o.Timeout)) + select { + case err = <-errCh: + if errors.Is(err, errExit) && portForwarded { + return nil + } + return err + case <-timer: + err = fmt.Errorf("failed to execute kusion apply as: timeout for %d seconds", o.Timeout) + if !releaseCreated { + return + } + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + return err + } + } else { + err = <-errCh + if errors.Is(err, errExit) && portForwarded { + return nil + } + } + + return err +} + +// run executes the apply cmd after the release is created. +func (o *ApplyOptions) run(rel *apiv1.Release, storage release.Storage) (err error) { + defer func() { + if !releaseCreated { + return + } + if err != nil { + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + } + }() + // build parameters parameters := make(map[string]string) for _, value := range o.PreviewOptions.Values { @@ -228,10 +320,11 @@ func (o *ApplyOptions) Run() (err error) { // update release phase to previewing rel.Spec = spec - rel.Phase = apiv1.ReleasePhasePreviewing - if err = release.UpdateApplyRelease(storage, rel, o.DryRun); err != nil { + release.UpdateReleasePhase(rel, apiv1.ReleasePhasePreviewing, relLock) + if err = release.UpdateApplyRelease(storage, rel, o.DryRun, relLock); err != nil { return } + // compute changes for preview changes, err := preview.Preview(o.PreviewOptions, storage, rel.Spec, rel.State, o.RefProject, o.RefStack) if err != nil { @@ -279,39 +372,35 @@ func (o *ApplyOptions) Run() (err error) { } // update release phase to applying - rel.Phase = apiv1.ReleasePhaseApplying - if err = release.UpdateApplyRelease(storage, rel, o.DryRun); err != nil { + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseApplying, relLock) + if err = release.UpdateApplyRelease(storage, rel, o.DryRun, relLock); err != nil { return } + // start applying - fmt.Println("Start applying diffs ...") + fmt.Printf("\nStart applying diffs ...\n") var updatedRel *apiv1.Release - updatedRel, err = Apply(o, storage, rel, changes, o.IOStreams.Out) + updatedRel, err = Apply(o, storage, rel, changes) if err != nil { return } + // if dry run, print the hint if o.DryRun { fmt.Printf("\nNOTE: Currently running in the --dry-run mode, the above configuration does not really take effect\n") return nil } - rel = updatedRel - - if o.Watch { - fmt.Println("\nStart watching changes ...") - if err = Watch(o, rel.Spec, changes); err != nil { - return - } - } + *rel = *updatedRel if o.PortForward > 0 { fmt.Printf("\nStart port-forwarding ...\n") + portForwarded = true if err = PortForward(o, rel.Spec); err != nil { return } } - return nil + return } // The Apply function will apply the resources changes through the execution kusion engine. @@ -321,8 +410,23 @@ func Apply( storage release.Storage, rel *apiv1.Release, changes *models.Changes, - out io.Writer, ) (*apiv1.Release, error) { + var err error + // Update the release before exit. + defer func() { + if p := recover(); p != nil { + cmdutil.RecoverErr(&err) + log.Error(err) + } + if err != nil { + if !releaseCreated { + return + } + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + err = errors.Join([]error{err, release.UpdateApplyRelease(storage, rel, o.DryRun, relLock)}...) + } + }() + // construct the apply operation ac := &operation.ApplyOperation{ Operation: models.Operation{ @@ -333,77 +437,83 @@ func Apply( }, } + // Init a watch channel with a sufficient buffer when it is necessary to perform watching. + if o.Watch { + ac.WatchCh = make(chan string, 100) + } + // line summary var ls lineSummary + // Get the multi printer from UI option. + multi := o.UI.MultiPrinter + // Max length of resource ID for progressbar width. + maxLen := 0 + + // Prepare the writer to print the operation progress and results. + changesWriterMap := make(map[string]*pterm.SpinnerPrinter) + for _, key := range changes.Values() { + // Get the maximum length of the resource ID. + if len(key.ID) > maxLen { + maxLen = len(key.ID) + } + // Init a spinner printer for the resource to print the apply status. + changesWriterMap[key.ID], err = o.UI.SpinnerPrinter. + WithWriter(multi.NewWriter()). + Start(fmt.Sprintf("Waiting %s to apply ...", pterm.Bold.Sprint(key.ID))) + if err != nil { + return nil, fmt.Errorf("failed to init change step spinner printer: %v", err) + } + } + // Init a writer for progressbar. + pbWriter := multi.NewWriter() // progress bar, print dag walk detail progressbar, err := o.UI.ProgressbarPrinter. - WithMaxWidth(0). // Set to 0, the terminal width will be used WithTotal(len(changes.StepKeys)). - WithWriter(out). + WithWriter(pbWriter). WithRemoveWhenDone(). + WithShowCount(false). + WithMaxWidth(maxLen + 32). Start() if err != nil { return nil, err } + + // The writer below is for operation error printing. + errWriter := multi.NewWriter() + + multi.WithUpdateDelay(time.Millisecond * 50) + multi.Start() + defer multi.Stop() + // wait msgCh close var wg sync.WaitGroup // receive msg and print detail - go func() { - defer func() { - if p := recover(); p != nil { - log.Errorf("failed to receive msg and print detail as %v", p) - } - }() - wg.Add(1) - - for { - select { - case msg, ok := <-ac.MsgCh: - if !ok { - wg.Done() - return - } - changeStep := changes.Get(msg.ResourceID) - - switch msg.OpResult { - case models.Success, models.Skip: - var title string - if changeStep.Action == models.UnChanged { - title = fmt.Sprintf("%s %s, %s", - changeStep.Action.String(), - pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(models.Skip)), - ) - } else { - title = fmt.Sprintf("%s %s %s", - changeStep.Action.String(), - pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(msg.OpResult)), - ) - } - pretty.SuccessT.WithWriter(out).Printfln(title) - progressbar.UpdateTitle(title) - progressbar.Increment() - ls.Count(changeStep.Action) - case models.Failed: - title := fmt.Sprintf("%s %s %s", - changeStep.Action.String(), - pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(msg.OpResult)), - ) - pretty.ErrorT.WithWriter(out).Printf("%s\n", title) - default: - title := fmt.Sprintf("%s %s %s", - changeStep.Action.Ing(), - pterm.Bold.Sprint(changeStep.ID), - strings.ToLower(string(msg.OpResult)), - ) - progressbar.UpdateTitle(title) - } - } - } - }() + go PrintApplyDetails( + ac, + &err, + &errWriter, + &wg, + changes, + changesWriterMap, + progressbar, + &ls, + o.DryRun, + ) + + watchErrCh := make(chan error) + // Apply while watching the resources. + if o.Watch { + Watch( + ac, + changes, + &err, + o.DryRun, + watchErrCh, + multi, + changesWriterMap, + ) + } var updatedRel *apiv1.Release if o.DryRun { @@ -425,52 +535,217 @@ func Apply( Release: rel, }) if v1.IsErr(st) { - return nil, fmt.Errorf("apply failed, status:\n%v", st) + errWriter.(*bytes.Buffer).Reset() + err = fmt.Errorf("apply failed, status:\n%v", st) + return nil, err } updatedRel = rsp.Release } // wait for msgCh closed wg.Wait() + // Wait for watchWg closed if need to perform watching. + if o.Watch { + shouldBreak := false + for !shouldBreak { + select { + case watchErr := <-watchErrCh: + if watchErr != nil { + return nil, watchErr + } + shouldBreak = true + default: + continue + } + } + } + // print summary - pterm.Println() - pterm.Fprintln(out, fmt.Sprintf("Apply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) + pterm.Fprintln(pbWriter, fmt.Sprintf("\nApply complete! Resources: %d created, %d updated, %d deleted.", ls.created, ls.updated, ls.deleted)) return updatedRel, nil } -// Watch function will observe the changes of each resource by the execution engine. -func Watch( - o *ApplyOptions, - planResources *apiv1.Spec, +// PrintApplyDetails function will receive the messages of the apply operation and print the details. +// Fixme: abstract the input variables into a struct. +func PrintApplyDetails( + ac *operation.ApplyOperation, + err *error, + errWriter *io.Writer, + wg *sync.WaitGroup, changes *models.Changes, -) error { - if o.DryRun { - fmt.Println("NOTE: Watch doesn't work in DryRun mode") - return nil + changesWriterMap map[string]*pterm.SpinnerPrinter, + progressbar *pterm.ProgressbarPrinter, + ls *lineSummary, + dryRun bool, +) { + defer func() { + if p := recover(); p != nil { + cmdutil.RecoverErr(err) + log.Error(*err) + } + if *err != nil { + if !releaseCreated { + return + } + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + *err = errors.Join([]error{*err, release.UpdateApplyRelease(storage, rel, dryRun, relLock)}...) + } + (*errWriter).(*bytes.Buffer).Reset() + }() + wg.Add(1) + + for { + select { + // Get operation results from the message channel. + case msg, ok := <-ac.MsgCh: + if !ok { + wg.Done() + return + } + changeStep := changes.Get(msg.ResourceID) + + // Update the progressbar and spinner printer according to the operation result. + switch msg.OpResult { + case models.Success, models.Skip: + var title string + if changeStep.Action == models.UnChanged { + title = fmt.Sprintf("Skipped %s", pterm.Bold.Sprint(changeStep.ID)) + changesWriterMap[msg.ResourceID].Success(title) + } else { + title = fmt.Sprintf("%s %s", + changeStep.Action.Ing(), + pterm.Bold.Sprint(changeStep.ID), + ) + changesWriterMap[msg.ResourceID].UpdateText(title) + } + progressbar.Increment() + ls.Count(changeStep.Action) + case models.Failed: + title := fmt.Sprintf("Failed %s", pterm.Bold.Sprint(changeStep.ID)) + changesWriterMap[msg.ResourceID].Fail(title) + errStr := pretty.ErrorT.Sprintf("apply %s failed as: %s\n", msg.ResourceID, msg.OpErr.Error()) + pterm.Fprintln(*errWriter, errStr) + default: + title := fmt.Sprintf("%s %s", + changeStep.Action.Ing(), + pterm.Bold.Sprint(changeStep.ID), + ) + changesWriterMap[msg.ResourceID].UpdateText(title) + } + } } +} - // filter out unchanged resources +// Watch function will watch the changed Kubernetes and Terraform resources. +// Fixme: abstract the input variables into a struct. +func Watch( + ac *operation.ApplyOperation, + changes *models.Changes, + err *error, + dryRun bool, + watchErrCh chan error, + multi *pterm.MultiPrinter, + changesWriterMap map[string]*pterm.SpinnerPrinter, +) { + resourceMap := make(map[string]apiv1.Resource) + ioWriterMap := make(map[string]io.Writer) toBeWatched := apiv1.Resources{} - for _, res := range planResources.Resources { + + // Get the resources to be watched. + for _, res := range rel.Spec.Resources { if changes.ChangeOrder.ChangeSteps[res.ResourceKey()].Action != models.UnChanged { + resourceMap[res.ResourceKey()] = res toBeWatched = append(toBeWatched, res) } } - // Watch operation - wo := &operation.WatchOperation{} - if err := wo.Watch(&operation.WatchRequest{ - Request: models.Request{ - Project: changes.Project(), - Stack: changes.Stack(), - }, - Spec: &apiv1.Spec{Resources: toBeWatched}, - }); err != nil { - return err - } + go func() { + defer func() { + if p := recover(); p != nil { + cmdutil.RecoverErr(err) + log.Error(*err) + } + if *err != nil { + if !releaseCreated { + return + } + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + _ = release.UpdateApplyRelease(storage, rel, dryRun, relLock) + } - fmt.Println("Watch Finish! All resources have been reconciled.") - return nil + watchErrCh <- *err + }() + // Init the runtimes according to the resource types. + runtimes, s := runtimeinit.Runtimes(toBeWatched) + if v1.IsErr(s) { + panic(fmt.Errorf("failed to init runtimes: %s", s.String())) + } + + // Prepare the tables for printing the details of the resources. + tables := make(map[string]*printers.Table, len(toBeWatched)) + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + + // Record the watched and finished resources. + watchedIDs := []string{} + finished := make(map[string]bool) + + for !(len(finished) == len(toBeWatched)) { + select { + // Get the resource ID to be watched. + case id := <-ac.WatchCh: + res := resourceMap[id] + // Set the timeout duration for watch context, here we set an experiential value of 5 minutes. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(5)) + defer cancel() + + // Get the event channel for watching the resource. + rsp := runtimes[res.Type].Watch(ctx, &runtime.WatchRequest{Resource: &res}) + if rsp == nil { + log.Debug("unsupported resource type: %s", res.Type) + continue + } + if v1.IsErr(rsp.Status) { + panic(fmt.Errorf("failed to watch %s as %s", id, rsp.Status.String())) + } + + w := rsp.Watchers + table := printers.NewTable(w.IDs) + tables[id] = table + + // Setup a go-routine to concurrently watch K8s and TF resources. + if res.Type == apiv1.Kubernetes { + go watchK8sResources(id, w.Watchers, table, tables, dryRun) + } else if res.Type == apiv1.Terraform { + go watchTFResources(id, w.TFWatcher, table, dryRun) + } else { + log.Debug("unsupported resource type to watch: %s", string(res.Type)) + continue + } + + // Record the io writer related to the resource ID. + ioWriterMap[id] = multi.NewWriter() + watchedIDs = append(watchedIDs, id) + + // Refresh the tables printing details of the resources to be watched. + default: + for _, id := range watchedIDs { + w, ok := ioWriterMap[id] + if !ok { + panic(fmt.Errorf("failed to get io writer while watching %s", id)) + } + printTable(&w, id, tables) + } + for id, table := range tables { + if table.AllCompleted() { + finished[id] = true + changesWriterMap[id].Success(fmt.Sprintf("Succeeded %s", pterm.Bold.Sprint(id))) + } + } + <-ticker.C + } + } + }() } // PortForward function will forward the specified port from local to the project Kubernetes Service. @@ -489,6 +764,8 @@ func Watch( // if err != nil { // return err // } +// +// Fixme: gracefully exit when interrupted by SIGINT or SIGTERM. func PortForward( o *ApplyOptions, spec *apiv1.Spec, @@ -544,6 +821,12 @@ func prompt(ui *terminal.UI) (string, error) { WithDefaultText(`Do you want to apply these diffs?`). WithOptions(options). WithDefaultOption("details"). + // To gracefully exit if interrupted by SIGINT or SIGTERM. + WithOnInterruptFunc(func() { + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseSucceeded, relLock) + release.UpdateApplyRelease(storage, rel, false, relLock) + os.Exit(1) + }). Show() if err != nil { fmt.Printf("Prompt failed: %v\n", err) @@ -552,3 +835,160 @@ func prompt(ui *terminal.UI) (string, error) { return input, nil } + +func watchK8sResources( + id string, + chs []<-chan watch.Event, + table *printers.Table, + tables map[string]*printers.Table, + dryRun bool, +) { + defer func() { + var err error + if p := recover(); p != nil { + cmdutil.RecoverErr(&err) + log.Error(err) + } + if err != nil { + if !releaseCreated { + return + } + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + _ = release.UpdateApplyRelease(storage, rel, dryRun, relLock) + } + }() + + // Resources selects + cases := createSelectCases(chs) + // Default select + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectDefault, + Chan: reflect.Value{}, + Send: reflect.Value{}, + }) + + for { + chosen, recv, recvOK := reflect.Select(cases) + if cases[chosen].Dir == reflect.SelectDefault { + continue + } + if recvOK { + e := recv.Interface().(watch.Event) + o := e.Object.(*unstructured.Unstructured) + var detail string + var ready bool + if e.Type == watch.Deleted { + detail = fmt.Sprintf("%s has beed deleted", o.GetName()) + ready = true + } else { + // Restore to actual type + target := printers.Convert(o) + detail, ready = printers.Generate(target) + } + + // Mark ready for breaking loop + if ready { + e.Type = printers.READY + } + + // Save watched msg + table.Update( + engine.BuildIDForKubernetes(o), + printers.NewRow(e.Type, o.GetKind(), o.GetName(), detail)) + + // Write back + tables[id] = table + } + + // Break when completed + if table.AllCompleted() { + break + } + } +} + +func watchTFResources( + id string, + ch <-chan runtime.TFEvent, + table *printers.Table, + dryRun bool, +) { + defer func() { + var err error + if p := recover(); p != nil { + cmdutil.RecoverErr(&err) + log.Error(err) + } + if err != nil { + if !releaseCreated { + return + } + release.UpdateReleasePhase(rel, apiv1.ReleasePhaseFailed, relLock) + _ = release.UpdateApplyRelease(storage, rel, dryRun, relLock) + } + }() + + for { + parts := strings.Split(id, engine.Separator) + // A valid Terraform resource ID should consist of 4 parts, including the information of the provider type + // and resource name, for example: hashicorp:random:random_password:example-dev-kawesome. + if len(parts) != 4 { + panic(fmt.Errorf("invalid Terraform resource id: %s", id)) + } + + tfEvent := <-ch + if tfEvent == runtime.TFApplying { + table.Update( + id, + printers.NewRow(watch.EventType("Applying"), + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Applying...")) + } else if tfEvent == runtime.TFSucceeded { + table.Update( + id, + printers.NewRow(printers.READY, + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply succeeded")) + } else { + table.Update( + id, + printers.NewRow(watch.EventType("Failed"), + strings.Join([]string{parts[1], parts[2]}, engine.Separator), parts[3], "Apply failed")) + } + + // Break when all completed. + if table.AllCompleted() { + break + } + } +} + +func createSelectCases(chs []<-chan watch.Event) []reflect.SelectCase { + cases := make([]reflect.SelectCase, 0, len(chs)) + for _, ch := range chs { + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + }) + } + return cases +} + +func printTable(w *io.Writer, id string, tables map[string]*printers.Table) { + // Reset the buffer for live flushing. + (*w).(*bytes.Buffer).Reset() + + // Print resource Key as heading text + _, _ = fmt.Fprintln(*w, pretty.LightCyanBold("[%s]", id)) + + table, ok := tables[id] + if !ok { + // Unsupported resource, leave a hint + _, _ = fmt.Fprintln(*w, "Skip monitoring unsupported resources") + } else { + // Print table + data := table.Print() + _ = pterm.DefaultTable. + WithStyle(pterm.NewStyle(pterm.FgDefault)). + WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). + WithHasHeader().WithSeparator(" ").WithData(data).WithWriter(*w).Render() + } +} diff --git a/pkg/cmd/apply/apply_test.go b/pkg/cmd/apply/apply_test.go index ea9ea711..7641a8d7 100644 --- a/pkg/cmd/apply/apply_test.go +++ b/pkg/cmd/apply/apply_test.go @@ -15,15 +15,19 @@ package apply import ( + "bytes" "context" "errors" - "os" + "io" "testing" "time" "github.com/bytedance/mockey" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" v1 "kusionstack.io/kusion/pkg/apis/status/v1" @@ -34,6 +38,7 @@ import ( "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/operation" "kusionstack.io/kusion/pkg/engine/operation/models" + "kusionstack.io/kusion/pkg/engine/printers" releasestorages "kusionstack.io/kusion/pkg/engine/release/storages" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/runtime/kubernetes" @@ -243,7 +248,7 @@ func TestApply(t *testing.T) { changes := models.NewChanges(proj, stack, order) o := newApplyOptions() o.DryRun = true - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes, os.Stdout) + _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes) assert.Nil(t, err) }) mockey.PatchConvey("apply success", t, func() { @@ -279,7 +284,7 @@ func TestApply(t *testing.T) { } changes := models.NewChanges(proj, stack, order) - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes, os.Stdout) + _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes) assert.Nil(t, err) }) mockey.PatchConvey("apply failed", t, func() { @@ -310,7 +315,7 @@ func TestApply(t *testing.T) { } changes := models.NewChanges(proj, stack, order) - _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes, os.Stdout) + _, err := Apply(o, &releasestorages.LocalStorage{}, rel, changes) assert.NotNil(t, err) }) } @@ -360,3 +365,110 @@ func TestPrompt(t *testing.T) { assert.Nil(t, err) }) } + +func TestWatchK8sResources(t *testing.T) { + t.Run("successfully apply K8s resources", func(t *testing.T) { + id := "v1:Namespace:example" + chs := make([]<-chan watch.Event, 1) + events := []watch.Event{ + { + Type: watch.Added, + Object: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]interface{}{ + "name": "example", + }, + "spec": map[string]interface{}{}, + }, + }, + }, + { + Type: watch.Added, + Object: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]interface{}{ + "name": "example", + }, + "spec": map[string]interface{}{}, + "status": map[string]interface{}{ + "phase": corev1.NamespaceActive, + }, + }, + }, + }, + } + + out := make(chan watch.Event, 10) + for _, e := range events { + out <- e + } + chs[0] = out + table := &printers.Table{ + IDs: []string{id}, + Rows: map[string]*printers.Row{}, + } + tables := map[string]*printers.Table{ + id: table, + } + + watchK8sResources(id, chs, table, tables, true) + + assert.Equal(t, true, table.AllCompleted()) + }) +} + +func TestWatchTFResources(t *testing.T) { + t.Run("successfully apply TF resources", func(t *testing.T) { + eventCh := make(chan runtime.TFEvent, 10) + events := []runtime.TFEvent{ + runtime.TFApplying, + runtime.TFApplying, + runtime.TFSucceeded, + } + for _, e := range events { + eventCh <- e + } + + id := "hashicorp:random:random_password:example-dev-kawesome" + table := &printers.Table{ + IDs: []string{id}, + Rows: map[string]*printers.Row{ + "hashicorp:random:random_password:example-dev-kawesome": {}, + }, + } + + watchTFResources(id, eventCh, table, true) + + assert.Equal(t, true, table.AllCompleted()) + }) +} + +func TestPrintTable(t *testing.T) { + w := io.Writer(bytes.NewBufferString("")) + id := "fake-resource-id" + tables := map[string]*printers.Table{ + "fake-resource-id": printers.NewTable([]string{ + "fake-resource-id", + }), + } + + t.Run("skip unsupported resources", func(t *testing.T) { + printTable(&w, "fake-fake-resource-id", tables) + assert.Contains(t, w.(*bytes.Buffer).String(), "Skip monitoring unsupported resources") + }) + + t.Run("update table", func(t *testing.T) { + printTable(&w, id, tables) + tableStr, err := pterm.DefaultTable. + WithStyle(pterm.NewStyle(pterm.FgDefault)). + WithHeaderStyle(pterm.NewStyle(pterm.FgDefault)). + WithHasHeader().WithSeparator(" ").WithData(tables[id].Print()).Srender() + + assert.Nil(t, err) + assert.Contains(t, w.(*bytes.Buffer).String(), tableStr) + }) +} diff --git a/pkg/cmd/destroy/destroy.go b/pkg/cmd/destroy/destroy.go index d8705c39..8dca8878 100644 --- a/pkg/cmd/destroy/destroy.go +++ b/pkg/cmd/destroy/destroy.go @@ -20,7 +20,7 @@ import ( "strings" "sync" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/i18n" diff --git a/pkg/cmd/destroy/destroy_test.go b/pkg/cmd/destroy/destroy_test.go index 8c9205cd..450dea46 100644 --- a/pkg/cmd/destroy/destroy_test.go +++ b/pkg/cmd/destroy/destroy_test.go @@ -21,7 +21,7 @@ import ( "time" "github.com/bytedance/mockey" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/stretchr/testify/assert" apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" diff --git a/pkg/cmd/generate/generate.go b/pkg/cmd/generate/generate.go index 4af854b4..bd513d42 100644 --- a/pkg/cmd/generate/generate.go +++ b/pkg/cmd/generate/generate.go @@ -21,7 +21,7 @@ import ( "os" "strings" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/spf13/cobra" yamlv3 "gopkg.in/yaml.v3" "k8s.io/cli-runtime/pkg/genericiooptions" diff --git a/pkg/cmd/mod/list.go b/pkg/cmd/mod/list.go index dbf904fe..eb4e55ae 100644 --- a/pkg/cmd/mod/list.go +++ b/pkg/cmd/mod/list.go @@ -3,7 +3,7 @@ package mod import ( "fmt" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/templates" diff --git a/pkg/cmd/preview/preview.go b/pkg/cmd/preview/preview.go index a27bc2d0..a1c76a29 100644 --- a/pkg/cmd/preview/preview.go +++ b/pkg/cmd/preview/preview.go @@ -19,7 +19,7 @@ import ( "fmt" "strings" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericiooptions" "k8s.io/kubectl/pkg/util/templates" diff --git a/pkg/engine/api/apply.go b/pkg/engine/api/apply.go index 1c776aa4..760432e2 100644 --- a/pkg/engine/api/apply.go +++ b/pkg/engine/api/apply.go @@ -6,7 +6,7 @@ import ( "strings" "sync" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" v1 "kusionstack.io/kusion/pkg/apis/status/v1" diff --git a/pkg/engine/api/destroy.go b/pkg/engine/api/destroy.go index 99ce910b..baed0e50 100644 --- a/pkg/engine/api/destroy.go +++ b/pkg/engine/api/destroy.go @@ -5,7 +5,7 @@ import ( "strings" "sync" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" v1 "kusionstack.io/kusion/pkg/apis/status/v1" diff --git a/pkg/engine/api/generate.go b/pkg/engine/api/generate.go index aeef57cc..c4f0a249 100644 --- a/pkg/engine/api/generate.go +++ b/pkg/engine/api/generate.go @@ -6,7 +6,7 @@ import ( "io" "os" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" yamlv3 "gopkg.in/yaml.v3" v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" diff --git a/pkg/engine/operation/apply.go b/pkg/engine/operation/apply.go index 621ab745..3bcf4ee7 100644 --- a/pkg/engine/operation/apply.go +++ b/pkg/engine/operation/apply.go @@ -101,6 +101,7 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta Stack: o.Stack, IgnoreFields: o.IgnoreFields, MsgCh: o.MsgCh, + WatchCh: o.WatchCh, Lock: &sync.Mutex{}, Release: rel, }, diff --git a/pkg/engine/operation/graph/resource_node.go b/pkg/engine/operation/graph/resource_node.go index 467b6f19..94481916 100644 --- a/pkg/engine/operation/graph/resource_node.go +++ b/pkg/engine/operation/graph/resource_node.go @@ -9,6 +9,7 @@ import ( apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" v1 "kusionstack.io/kusion/pkg/apis/status/v1" + "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/operation/models" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/log" @@ -133,8 +134,10 @@ func (rn *ResourceNode) computeActionType( } else if liveResource == nil { rn.Action = models.Create } else { + // Prepare the watch channel for runtime apply. + ctx := context.WithValue(context.Background(), engine.WatchChannel, operation.WatchCh) // Dry run to fetch predictable resource - dryRunResp := operation.RuntimeMap[rn.resource.Type].Apply(context.Background(), &runtime.ApplyRequest{ + dryRunResp := operation.RuntimeMap[rn.resource.Type].Apply(ctx, &runtime.ApplyRequest{ PriorResource: priorResource, PlanResource: planedResource, Stack: operation.Stack, @@ -225,7 +228,9 @@ func (rn *ResourceNode) applyResource(operation *models.Operation, prior, planed rt := operation.RuntimeMap[resourceType] switch rn.Action { case models.Create, models.Update: - response := rt.Apply(context.Background(), &runtime.ApplyRequest{PriorResource: prior, PlanResource: planed, Stack: operation.Stack}) + // Prepare the watch channel for runtime apply. + ctx := context.WithValue(context.Background(), engine.WatchChannel, operation.WatchCh) + response := rt.Apply(ctx, &runtime.ApplyRequest{PriorResource: prior, PlanResource: planed, Stack: operation.Stack}) res = response.Resource s = response.Status log.Debugf("apply resource:%s, response: %v", planed.ID, json.Marshal2String(response)) diff --git a/pkg/engine/operation/models/change.go b/pkg/engine/operation/models/change.go index 831109a8..c751dcd7 100644 --- a/pkg/engine/operation/models/change.go +++ b/pkg/engine/operation/models/change.go @@ -6,7 +6,7 @@ import ( "io" "strings" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" "kusionstack.io/kusion/pkg/log" diff --git a/pkg/engine/operation/models/operation_context.go b/pkg/engine/operation/models/operation_context.go index 32c1886c..50a9e1bf 100644 --- a/pkg/engine/operation/models/operation_context.go +++ b/pkg/engine/operation/models/operation_context.go @@ -44,6 +44,11 @@ type Operation struct { // and this message will be displayed in the terminal MsgCh chan Message + // WatchCh is used to send the resource IDs that are ready to be watched after sending or executing + // the apply request. + // Fixme: try to merge the WatchCh with the MsgCh. + WatchCh chan string + // Lock is the operation-wide mutex Lock *sync.Mutex diff --git a/pkg/engine/operation/watch.go b/pkg/engine/operation/watch.go index 45194581..271c0959 100644 --- a/pkg/engine/operation/watch.go +++ b/pkg/engine/operation/watch.go @@ -8,7 +8,7 @@ import ( "time" "github.com/howieyuen/uilive" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" diff --git a/pkg/engine/printers/table.go b/pkg/engine/printers/table.go index 62a5772b..3ccd3fa4 100644 --- a/pkg/engine/printers/table.go +++ b/pkg/engine/printers/table.go @@ -55,23 +55,17 @@ func (t *Table) AllCompleted() bool { func (t *Table) Print() [][]string { data := [][]string{{"Type", "Kind", "Name", "Detail"}} for _, id := range t.IDs { + var eventType k8swatch.EventType row := t.Rows[id] - eventType := row.Type + if row != nil { + eventType = row.Type + } else { + // In case that the row of the table hasn't updated contents. + continue + } // Colored type - eventTypeS := "" - switch eventType { - case k8swatch.Added: - eventTypeS = pretty.Cyan(string(eventType)) - case k8swatch.Deleted: - eventTypeS = pretty.Red(string(eventType)) - case k8swatch.Modified: - eventTypeS = pretty.Yellow(string(eventType)) - case k8swatch.Error: - eventTypeS = pretty.Red(string(eventType)) - default: - eventTypeS = pretty.Green(string(eventType)) - } + eventTypeS := pretty.Normal(string(eventType)) data = append(data, []string{eventTypeS, row.Kind, row.Name, row.Detail}) } diff --git a/pkg/engine/release/util.go b/pkg/engine/release/util.go index 91b9e2ad..3a623375 100644 --- a/pkg/engine/release/util.go +++ b/pkg/engine/release/util.go @@ -2,6 +2,7 @@ package release import ( "fmt" + "sync" "time" v1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" @@ -47,9 +48,7 @@ func NewApplyRelease(storage Storage, project, stack, workspace string) (*v1.Rel return nil, fmt.Errorf("cannot create a new release of project: %s, workspace: %s. There is a release:%v in progress", project, workspace, lastRelease.Revision) } - if err != nil { - return nil, err - } + rel = &v1.Release{ Project: project, Workspace: workspace, @@ -67,7 +66,9 @@ func NewApplyRelease(storage Storage, project, stack, workspace string) (*v1.Rel // UpdateApplyRelease updates the release in the storage if dryRun is false. If release phase is failed, // only logging with no error return. -func UpdateApplyRelease(storage Storage, rel *v1.Release, dryRun bool) error { +func UpdateApplyRelease(storage Storage, rel *v1.Release, dryRun bool, relLock *sync.Mutex) error { + relLock.Lock() + defer relLock.Unlock() if dryRun { return nil } @@ -134,3 +135,10 @@ func UpdateDestroyRelease(storage Storage, rel *v1.Release) error { } return err } + +// UpdateReleasePhase updates the release with the specified phase. +func UpdateReleasePhase(rel *v1.Release, phase v1.ReleasePhase, relLock *sync.Mutex) { + relLock.Lock() + defer relLock.Unlock() + rel.Phase = phase +} diff --git a/pkg/engine/runtime/kubernetes/kubernetes_runtime.go b/pkg/engine/runtime/kubernetes/kubernetes_runtime.go index 420e2478..5c7fd1bf 100644 --- a/pkg/engine/runtime/kubernetes/kubernetes_runtime.go +++ b/pkg/engine/runtime/kubernetes/kubernetes_runtime.go @@ -159,6 +159,13 @@ func (k *KubernetesRuntime) Apply(ctx context.Context, request *runtime.ApplyReq // more concise and clean resource object. normalizeServerSideFields(res) + // Extract the watch channel from the context. + watchCh, _ := ctx.Value(engine.WatchChannel).(chan string) + if !request.DryRun && watchCh != nil { + log.Infof("Started to watch %s with the type of %s", planState.ResourceKey(), planState.Type) + watchCh <- planState.ResourceKey() + } + return &runtime.ApplyResponse{Resource: &apiv1.Resource{ ID: planState.ResourceKey(), Type: planState.Type, @@ -329,6 +336,11 @@ func (k *KubernetesRuntime) Watch(ctx context.Context, request *runtime.WatchReq return watched.GetName() == reqObj.GetName() }) + if rootCh == nil { + return &runtime.WatchResponse{Status: v1.NewErrorStatus(fmt.Errorf("failed to get the root channel for watching %s", + request.Resource.ResourceKey()))} + } + // Collect all watchers := runtime.NewWatchers() watchers.Insert(engine.BuildIDForKubernetes(reqObj), rootCh) @@ -522,13 +534,20 @@ func (k *KubernetesRuntime) WatchByRelation( } var next *unstructured.Unstructured - return doWatch(ctx, w, func(watched *unstructured.Unstructured) bool { + + eventCh := doWatch(ctx, w, func(watched *unstructured.Unstructured) bool { ok := related(watched, cur) if ok { next = watched } return ok - }), next, nil + }) + if eventCh == nil { + err = fmt.Errorf("failed to get the event channel for watching related resources of %s with kind of %s", + cur.GetName(), cur.GetKind()) + } + + return eventCh, next, err } // doWatch send watched object if check ok @@ -562,9 +581,12 @@ func doWatch(ctx context.Context, watcher k8swatch.Interface, checker func(watch }() // Owner&Dependent check pass, return the dependent Obj - <-signal - - return resultCh + select { + case <-signal: + return resultCh + case <-ctx.Done(): + return nil + } } // Judge dependent is owned by owner diff --git a/pkg/engine/runtime/runtime.go b/pkg/engine/runtime/runtime.go index f8bdc70f..213c1acf 100644 --- a/pkg/engine/runtime/runtime.go +++ b/pkg/engine/runtime/runtime.go @@ -14,9 +14,22 @@ const ( Terraform apiv1.Type = "Terraform" ) +// TFEvent represents the status of the Terraform resource operation event. +type TFEvent string + +const ( + // TFApplying means the TF resource is still being applied. + TFApplying = TFEvent("Applying") + // TFSucceeded means the TF resource operation has succeeded. + TFSucceeded = TFEvent("Succeeded") + // TFFailed means the TF resource operation has failed. + TFFailed = TFEvent("Failed") +) + // Runtime represents an actual infrastructure runtime managed by Kusion and every runtime implements this interface can be orchestrated // by Kusion like normal K8s resources. All methods in this interface are designed for manipulating one Resource at a time and will be // invoked in operations like Apply, Preview, Destroy, etc. +// Fixme: update the runtime interface definition of requests and responses. type Runtime interface { // Apply means modify this Resource to the desired state described in the request, // and it will turn into creating or updating a Resource in most scenarios. @@ -126,6 +139,8 @@ type WatchResponse struct { type SequentialWatchers struct { IDs []string Watchers []<-chan watch.Event + // TFWatcher is for watching the operation event of the Terraform resources. + TFWatcher <-chan TFEvent } func NewWatchers() *SequentialWatchers { diff --git a/pkg/engine/runtime/terraform/terraform_runtime.go b/pkg/engine/runtime/terraform/terraform_runtime.go index 263179a7..b1f401cc 100644 --- a/pkg/engine/runtime/terraform/terraform_runtime.go +++ b/pkg/engine/runtime/terraform/terraform_runtime.go @@ -2,14 +2,18 @@ package terraform import ( "context" + "fmt" "os" "path/filepath" "sync" + "time" "github.com/spf13/afero" + "github.com/patrickmn/go-cache" apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1" v1 "kusionstack.io/kusion/pkg/apis/status/v1" + "kusionstack.io/kusion/pkg/engine" "kusionstack.io/kusion/pkg/engine/runtime" "kusionstack.io/kusion/pkg/engine/runtime/terraform/tfops" "kusionstack.io/kusion/pkg/log" @@ -17,6 +21,10 @@ import ( var _ runtime.Runtime = &TerraformRuntime{} +// tfEvents is used to record the operation events of the Terraform +// resources into the related channels for watching. +var tfEvents = cache.New(cache.NoExpiration, cache.NoExpiration) + type TerraformRuntime struct { tfops.WorkSpace mu *sync.Mutex @@ -83,15 +91,67 @@ func (t *TerraformRuntime) Apply(ctx context.Context, request *runtime.ApplyRequ } } - tfstate, err := t.WorkSpace.Apply(ctx) - if err != nil { - return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} - } + var tfstate *tfops.StateRepresentation + var providerAddr string - // get terraform provider version - providerAddr, err := t.WorkSpace.GetProvider() - if err != nil { - return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + // Extract the watch channel from the context. + watchCh, _ := ctx.Value(engine.WatchChannel).(chan string) + if watchCh != nil { + // Apply while watching the resource. + errCh := make(chan error) + + // Start applying the resource. + go func() { + tfstate, err = t.WorkSpace.Apply(ctx) + if err != nil { + errCh <- err + } + + providerAddr, err = t.WorkSpace.GetProvider() + errCh <- err + }() + + // Prepare the event channel and send the resource ID to watch channel. + log.Infof("Started to watch %s with the type of %s", plan.ResourceKey(), plan.Type) + eventCh := make(chan runtime.TFEvent) + + // Prevent concurrent operations on resources with the same ID. + if _, ok := tfEvents.Get(plan.ResourceKey()); ok { + err = fmt.Errorf("failed to initiate the event channel for watching terraform resource %s as: conflict resource ID", plan.ResourceKey()) + log.Error(err) + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } + tfEvents.Set(plan.ResourceKey(), eventCh, cache.NoExpiration) + watchCh <- plan.ResourceKey() + + // Wait for the apply to be finished. + shouldBreak := false + for !shouldBreak { + select { + case err = <-errCh: + if err != nil { + eventCh <- runtime.TFFailed + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } + eventCh <- runtime.TFSucceeded + shouldBreak = true + default: + eventCh <- runtime.TFApplying + time.Sleep(time.Second * 1) + } + } + } else { + // Apply without watching. + tfstate, err = t.WorkSpace.Apply(ctx) + if err != nil { + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } + + // get terraform provider version + providerAddr, err = t.WorkSpace.GetProvider() + if err != nil { + return &runtime.ApplyResponse{Resource: nil, Status: v1.NewErrorStatus(err)} + } } r := tfops.ConvertTFState(tfstate, providerAddr) @@ -219,5 +279,17 @@ func (t *TerraformRuntime) Delete(ctx context.Context, request *runtime.DeleteRe // Watch terraform resource func (t *TerraformRuntime) Watch(ctx context.Context, request *runtime.WatchRequest) *runtime.WatchResponse { - return nil + // Get the event channel. + id := request.Resource.ResourceKey() + eventCh, ok := tfEvents.Get(id) + if !ok { + return &runtime.WatchResponse{Status: v1.NewErrorStatus(fmt.Errorf("failed to get the event channel for %s", id))} + } + + return &runtime.WatchResponse{ + Watchers: &runtime.SequentialWatchers{ + IDs: []string{id}, + TFWatcher: eventCh.(chan runtime.TFEvent), + }, + } } diff --git a/pkg/engine/util.go b/pkg/engine/util.go index 5f0ebb46..4ac139ab 100644 --- a/pkg/engine/util.go +++ b/pkg/engine/util.go @@ -2,6 +2,16 @@ package engine import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +// ContextKey is used to represent the key associated with the information +// injected into the function context. +type ContextKey string + +const ( + // WatchChannel is used to inject a channel into the runtime operation to + // assist in watching the changes of the resources during the operation process. + WatchChannel = ContextKey("WatchChannel") +) + const Separator = ":" func BuildID(apiVersion, kind, namespace, name string) string { diff --git a/pkg/server/manager/stack/stack_manager.go b/pkg/server/manager/stack/stack_manager.go index 4c414858..433cd201 100644 --- a/pkg/server/manager/stack/stack_manager.go +++ b/pkg/server/manager/stack/stack_manager.go @@ -5,6 +5,7 @@ import ( "errors" "net/http" "os" + "sync" "time" "github.com/jinzhu/copier" @@ -137,17 +138,18 @@ func (m *StackManager) ApplyStack(ctx context.Context, id uint, workspaceName, f var storage release.Storage var rel *v1.Release + relLock := &sync.Mutex{} releaseCreated := false defer func() { if !releaseCreated { return } if err != nil { - rel.Phase = v1.ReleasePhaseFailed - _ = release.UpdateApplyRelease(storage, rel, dryrun) + release.UpdateReleasePhase(rel, v1.ReleasePhaseFailed, relLock) + _ = release.UpdateApplyRelease(storage, rel, dryrun, relLock) } else { - rel.Phase = v1.ReleasePhaseSucceeded - err = release.UpdateApplyRelease(storage, rel, dryrun) + release.UpdateReleasePhase(rel, v1.ReleasePhaseSucceeded, relLock) + err = release.UpdateApplyRelease(storage, rel, dryrun, relLock) } }() @@ -194,8 +196,8 @@ func (m *StackManager) ApplyStack(ctx context.Context, id uint, workspaceName, f // update release phase to previewing rel.Spec = sp - rel.Phase = v1.ReleasePhasePreviewing - if err = release.UpdateApplyRelease(storage, rel, dryrun); err != nil { + release.UpdateReleasePhase(rel, v1.ReleasePhasePreviewing, relLock) + if err = release.UpdateApplyRelease(storage, rel, dryrun, relLock); err != nil { return } // compute changes for preview @@ -214,8 +216,8 @@ func (m *StackManager) ApplyStack(ctx context.Context, id uint, workspaceName, f return ErrDryrunDestroy } - rel.Phase = v1.ReleasePhaseApplying - if err = release.UpdateApplyRelease(storage, rel, dryrun); err != nil { + release.UpdateReleasePhase(rel, v1.ReleasePhaseApplying, relLock) + if err = release.UpdateApplyRelease(storage, rel, dryrun, relLock); err != nil { return } logger.Info("Dryrun set to false. Start applying diffs ...") diff --git a/pkg/util/pathutil/pathutil_windows.go b/pkg/util/pathutil/pathutil_windows.go index d0ed0cb8..c0c3ad3f 100644 --- a/pkg/util/pathutil/pathutil_windows.go +++ b/pkg/util/pathutil/pathutil_windows.go @@ -16,6 +16,11 @@ package pathutil +import ( + "os" + "path/filepath" +) + // Exists returns true if the specified path exists. func Exists(path string) bool { fi, err := os.Lstat(path) diff --git a/pkg/util/pretty/prefix_printer.go b/pkg/util/pretty/prefix_printer.go index 6d5c1aae..0681e54b 100644 --- a/pkg/util/pretty/prefix_printer.go +++ b/pkg/util/pretty/prefix_printer.go @@ -1,6 +1,6 @@ package pretty -import "github.com/pterm/pterm" +import "github.com/liu-hm19/pterm" // Pretty prefix printer style. // diff --git a/pkg/util/pretty/prefix_text_printer.go b/pkg/util/pretty/prefix_text_printer.go index c5d5bc7f..a8ddbc42 100644 --- a/pkg/util/pretty/prefix_text_printer.go +++ b/pkg/util/pretty/prefix_text_printer.go @@ -1,6 +1,6 @@ package pretty -import "github.com/pterm/pterm" +import "github.com/liu-hm19/pterm" // Pretty prefix text printer style. // diff --git a/pkg/util/pretty/prefix_text_printer_test.go b/pkg/util/pretty/prefix_text_printer_test.go index 9afbf44c..3d6ec91e 100644 --- a/pkg/util/pretty/prefix_text_printer_test.go +++ b/pkg/util/pretty/prefix_text_printer_test.go @@ -4,7 +4,7 @@ import ( "fmt" "testing" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" ) func TestPrefixTextPrinter(t *testing.T) { diff --git a/pkg/util/pretty/spinner.go b/pkg/util/pretty/spinner.go index 850ffbc3..aa24bd7b 100644 --- a/pkg/util/pretty/spinner.go +++ b/pkg/util/pretty/spinner.go @@ -3,7 +3,7 @@ package pretty import ( "time" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" ) // Spinner style. diff --git a/pkg/util/pretty/spinner_text.go b/pkg/util/pretty/spinner_text.go index 4c3aa3e0..a26ed96c 100644 --- a/pkg/util/pretty/spinner_text.go +++ b/pkg/util/pretty/spinner_text.go @@ -3,7 +3,7 @@ package pretty import ( "time" - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" ) // SpinnerT text style. diff --git a/pkg/util/pretty/style.go b/pkg/util/pretty/style.go index fa4f7da3..f3c7fa66 100644 --- a/pkg/util/pretty/style.go +++ b/pkg/util/pretty/style.go @@ -1,6 +1,6 @@ package pretty -import "github.com/pterm/pterm" +import "github.com/liu-hm19/pterm" // Pretty style, contains the color and the style. // diff --git a/pkg/util/signal/signal.go b/pkg/util/signal/signal.go new file mode 100644 index 00000000..7d521cf5 --- /dev/null +++ b/pkg/util/signal/signal.go @@ -0,0 +1,70 @@ +/* +Copyright 2017 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Note: This is copy from Kubernetes apiserver. + +package signal + +import ( + "context" + "os" + "os/signal" +) + +var ( + onlyOneSignalHandler = make(chan struct{}) + shutdownHandler chan os.Signal +) + +// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can +// be called once. +func SetupSignalHandler() <-chan struct{} { + return SetupSignalContext().Done() +} + +// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned. +// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can +// be called once. +func SetupSignalContext() context.Context { + close(onlyOneSignalHandler) // panics when called twice + + shutdownHandler = make(chan os.Signal, 2) + + ctx, cancel := context.WithCancel(context.Background()) + signal.Notify(shutdownHandler, shutdownSignals...) + go func() { + <-shutdownHandler + cancel() + <-shutdownHandler + os.Exit(1) // second signal. Exit directly. + }() + + return ctx +} + +// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT) +// This returns whether a handler was notified +func RequestShutdown() bool { + if shutdownHandler != nil { + select { + case shutdownHandler <- shutdownSignals[0]: + return true + default: + } + } + + return false +} diff --git a/pkg/util/signal/signal_posix.go b/pkg/util/signal/signal_posix.go new file mode 100644 index 00000000..0f76763a --- /dev/null +++ b/pkg/util/signal/signal_posix.go @@ -0,0 +1,26 @@ +//go:build !windows +// +build !windows + +/* +Copyright 2017 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Note: This is copy from Kubernetes apiserver. + +package signal + +import ( + "os" + "syscall" +) + +var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} diff --git a/pkg/util/signal/signal_windows.go b/pkg/util/signal/signal_windows.go new file mode 100644 index 00000000..80ed93c3 --- /dev/null +++ b/pkg/util/signal/signal_windows.go @@ -0,0 +1,22 @@ +/* +Copyright 2017 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Note: This is copy from Kubernetes apiserver. + +package signal + +import ( + "os" +) + +var shutdownSignals = []os.Signal{os.Interrupt} diff --git a/pkg/util/terminal/ui.go b/pkg/util/terminal/ui.go index 282bc79c..b23bf20a 100644 --- a/pkg/util/terminal/ui.go +++ b/pkg/util/terminal/ui.go @@ -1,7 +1,7 @@ package terminal import ( - "github.com/pterm/pterm" + "github.com/liu-hm19/pterm" "kusionstack.io/kusion/pkg/util/pretty" ) @@ -9,6 +9,7 @@ type UI struct { SpinnerPrinter *pterm.SpinnerPrinter ProgressbarPrinter *pterm.ProgressbarPrinter InteractiveSelectPrinter *pterm.InteractiveSelectPrinter + MultiPrinter *pterm.MultiPrinter } // DefaultUI returns a UI for Kusion CLI display with default @@ -18,5 +19,6 @@ func DefaultUI() *UI { SpinnerPrinter: &pretty.SpinnerT, ProgressbarPrinter: &pterm.DefaultProgressbar, InteractiveSelectPrinter: &pterm.DefaultInteractiveSelect, + MultiPrinter: &pterm.DefaultMultiPrinter, } } diff --git a/pkg/util/terminal/ui_test.go b/pkg/util/terminal/ui_test.go new file mode 100644 index 00000000..3c9236d7 --- /dev/null +++ b/pkg/util/terminal/ui_test.go @@ -0,0 +1,14 @@ +package terminal + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDefaultUI(t *testing.T) { + t.Run("init default ui", func(t *testing.T) { + ui := DefaultUI() + assert.NotNil(t, ui) + }) +}