From 127219f92328d1a17a42ed9f5081a18c2b6d4cc5 Mon Sep 17 00:00:00 2001 From: "W. Trevor King" Date: Tue, 30 Oct 2018 13:14:25 -0700 Subject: [PATCH] cmd/openshift-install/create: Destroy bootstrap on bootstrap-complete Once the cluster is up, the bootstrap stuff is just a waste of resources. With this commit we make a reasonable effort to wait for the bootstrap-complete event, and, if we see it, we tear down the bootstrap resources. The Kubernetes client initialization is based on [1] and the local wait.go is based on [2]. Both of those are, like this project, under the Apache 2.0 license. Once kubernetes/kubernetes#50102 lands, we can drop the local wait.go and use the upstream implementation. I've dropped the errors.Cause call from main.go, because it was dropping the wrapped context, which is useful context for debugging. That Cause call is from dc118f20 (cmd: switch to already vendored cobra, 2018-10-08, #429), but that commit doesn't explicitly motivate the call. [1]: https://github.com/kubernetes/client-go/blob/v9.0.0/examples/out-of-cluster-client-configuration/main.go [2]: https://github.com/kubernetes/kubernetes/pull/50102/files --- cmd/openshift-install/create.go | 81 +++++++++++++++- cmd/openshift-install/main.go | 3 +- cmd/openshift-install/watch.go | 165 ++++++++++++++++++++++++++++++++ 3 files changed, 246 insertions(+), 3 deletions(-) create mode 100644 cmd/openshift-install/watch.go diff --git a/cmd/openshift-install/create.go b/cmd/openshift-install/create.go index fff6f27b73f..3b97d0a475e 100644 --- a/cmd/openshift-install/create.go +++ b/cmd/openshift-install/create.go @@ -1,13 +1,22 @@ package main import ( + "context" "fmt" "os/exec" + "path/filepath" "strings" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "github.com/openshift/installer/pkg/asset" "github.com/openshift/installer/pkg/asset/cluster" @@ -16,6 +25,7 @@ import ( "github.com/openshift/installer/pkg/asset/installconfig" "github.com/openshift/installer/pkg/asset/kubeconfig" "github.com/openshift/installer/pkg/asset/manifests" + destroybootstrap "github.com/openshift/installer/pkg/destroy/bootstrap" ) type target struct { @@ -53,7 +63,9 @@ var targets = []target{{ command: &cobra.Command{ Use: "cluster", Short: "Create an OpenShift cluster", - Long: "", + PostRunE: func(_ *cobra.Command, _ []string) error { + return destroyBootstrap(context.Background(), rootOpts.dir) + }, }, assets: []asset.WritableAsset{&cluster.TerraformVariables{}, &kubeconfig.Admin{}, &cluster.Cluster{}}, }} @@ -128,3 +140,70 @@ func runTargetCmd(targets ...asset.WritableAsset) func(cmd *cobra.Command, args return nil } } + +// FIXME: pulling the kubeconfig and metadata out of the root +// directory is a bit cludgy when we already have them in memory. +func destroyBootstrap(ctx context.Context, directory string) (err error) { + logrus.Info("Waiting for bootstrap completion...") + config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(directory, "auth", "kubeconfig")) + if err != nil { + return errors.Wrap(err, "loading kubeconfig") + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return errors.Wrap(err, "creating a Kubernetes client") + } + + discovery := client.Discovery() + + apiContext, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + wait.Until(func() { + version, err := discovery.ServerVersion() + if err == nil { + logrus.Infof("API %s up", version) + cancel() + } + logrus.Debugf("API not up yet: %s", err) + }, 2*time.Second, apiContext.Done()) + + events := client.CoreV1().Events("kube-system") + + eventContext, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + _, err = Until( + eventContext, + "", + func(sinceResourceVersion string) (watch.Interface, error) { + return events.Watch(metav1.ListOptions{ + Watch: true, + ResourceVersion: sinceResourceVersion, + }) + }, + func(watchEvent watch.Event) (bool, error) { + event, ok := watchEvent.Object.(*corev1.Event) + if !ok { + return false, nil + } + + if watchEvent.Type == watch.Error { + logrus.Debugf("error %s: %s", event.Name, event.Message) + return false, nil + } + + if watchEvent.Type != watch.Added { + return false, nil + } + + logrus.Debugf("added %s: %s", event.Name, event.Message) + return event.Name == "bootstrap-complete", nil + }, + ) + if err != nil { + return errors.Wrap(err, "waiting for bootstrap-complete") + } + + logrus.Info("Destroying the bootstrap resources...") + return destroybootstrap.Destroy(rootOpts.dir) +} diff --git a/cmd/openshift-install/main.go b/cmd/openshift-install/main.go index f3076202df2..8e43179a87a 100644 --- a/cmd/openshift-install/main.go +++ b/cmd/openshift-install/main.go @@ -31,8 +31,7 @@ func main() { } if err := rootCmd.Execute(); err != nil { - cause := errors.Cause(err) - logrus.Fatalf("Error executing openshift-install: %v", cause) + logrus.Fatalf("Error executing openshift-install: %v", err) } } diff --git a/cmd/openshift-install/watch.go b/cmd/openshift-install/watch.go new file mode 100644 index 00000000000..8bac42d8c82 --- /dev/null +++ b/cmd/openshift-install/watch.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/watch" + watchtools "k8s.io/client-go/tools/watch" +) + +// WatcherFunc is from https://github.com/kubernetes/kubernetes/pull/50102. +type WatcherFunc func(sinceResourceVersion string) (watch.Interface, error) + +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102. +type RetryWatcher struct { + lastResourceVersion string + watcherFunc WatcherFunc + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} +} + +// Until is from https://github.com/kubernetes/kubernetes/pull/50102. +func Until(ctx context.Context, initialResourceVersion string, watcherFunc WatcherFunc, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { + return watchtools.UntilWithoutRetry(ctx, NewRetryWatcher(initialResourceVersion, watcherFunc), conditions...) +} + +// NewRetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102. +func NewRetryWatcher(initialResourceVersion string, watcherFunc WatcherFunc) *RetryWatcher { + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherFunc: watcherFunc, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + } + go rw.receive() + return rw +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking and we need to check if we need to be able to stop while doing so! + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +func (rw *RetryWatcher) doReceive() bool { + watcher, err := rw.watcherFunc(rw.lastResourceVersion) + if err != nil { + status := apierrors.NewInternalError(fmt.Errorf("retry watcher: watcherFunc failed: %v", err)).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // Stop the watcher + return true + } + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + logrus.Debug("Stopping RetryWatcher.") + return true + case event, ok := <-ch: + if !ok { + logrus.Warningf("RetryWatcher - getting event failed! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion) + return false + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + status := apierrors.NewInternalError(errors.New("__internal__: RetryWatcher: doesn't support resourceVersion")).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher: object %#v doesn't support resourceVersion", event.Object)).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true + } + + // All is fine; send the event and update lastResourceVersion + ok = rw.send(event) + if !ok { + return true + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + _ = rw.send(event) + return true + + default: + logrus.Errorf("RetryWatcher failed to recognize Event type %q", event.Type) + status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher failed to recognize Event type %q", event.Type)).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true + } + } + } +} + +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + + for { + select { + case <-rw.stopChan: + logrus.Debug("Stopping RetryWatcher.") + return + default: + done := rw.doReceive() + if done { + return + } + } + } +} + +// ResultChan is from https://github.com/kubernetes/kubernetes/pull/50102. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop is from https://github.com/kubernetes/kubernetes/pull/50102. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done is from https://github.com/kubernetes/kubernetes/pull/50102. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +}