diff --git a/cmd/openshift-install/create.go b/cmd/openshift-install/create.go index fff6f27b73f..3b97d0a475e 100644 --- a/cmd/openshift-install/create.go +++ b/cmd/openshift-install/create.go @@ -1,13 +1,22 @@ package main import ( + "context" "fmt" "os/exec" + "path/filepath" "strings" + "time" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "github.com/openshift/installer/pkg/asset" "github.com/openshift/installer/pkg/asset/cluster" @@ -16,6 +25,7 @@ import ( "github.com/openshift/installer/pkg/asset/installconfig" "github.com/openshift/installer/pkg/asset/kubeconfig" "github.com/openshift/installer/pkg/asset/manifests" + destroybootstrap "github.com/openshift/installer/pkg/destroy/bootstrap" ) type target struct { @@ -53,7 +63,9 @@ var targets = []target{{ command: &cobra.Command{ Use: "cluster", Short: "Create an OpenShift cluster", - Long: "", + PostRunE: func(_ *cobra.Command, _ []string) error { + return destroyBootstrap(context.Background(), rootOpts.dir) + }, }, assets: []asset.WritableAsset{&cluster.TerraformVariables{}, &kubeconfig.Admin{}, &cluster.Cluster{}}, }} @@ -128,3 +140,70 @@ func runTargetCmd(targets ...asset.WritableAsset) func(cmd *cobra.Command, args return nil } } + +// FIXME: pulling the kubeconfig and metadata out of the root +// directory is a bit cludgy when we already have them in memory. +func destroyBootstrap(ctx context.Context, directory string) (err error) { + logrus.Info("Waiting for bootstrap completion...") + config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(directory, "auth", "kubeconfig")) + if err != nil { + return errors.Wrap(err, "loading kubeconfig") + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return errors.Wrap(err, "creating a Kubernetes client") + } + + discovery := client.Discovery() + + apiContext, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + wait.Until(func() { + version, err := discovery.ServerVersion() + if err == nil { + logrus.Infof("API %s up", version) + cancel() + } + logrus.Debugf("API not up yet: %s", err) + }, 2*time.Second, apiContext.Done()) + + events := client.CoreV1().Events("kube-system") + + eventContext, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + _, err = Until( + eventContext, + "", + func(sinceResourceVersion string) (watch.Interface, error) { + return events.Watch(metav1.ListOptions{ + Watch: true, + ResourceVersion: sinceResourceVersion, + }) + }, + func(watchEvent watch.Event) (bool, error) { + event, ok := watchEvent.Object.(*corev1.Event) + if !ok { + return false, nil + } + + if watchEvent.Type == watch.Error { + logrus.Debugf("error %s: %s", event.Name, event.Message) + return false, nil + } + + if watchEvent.Type != watch.Added { + return false, nil + } + + logrus.Debugf("added %s: %s", event.Name, event.Message) + return event.Name == "bootstrap-complete", nil + }, + ) + if err != nil { + return errors.Wrap(err, "waiting for bootstrap-complete") + } + + logrus.Info("Destroying the bootstrap resources...") + return destroybootstrap.Destroy(rootOpts.dir) +} diff --git a/cmd/openshift-install/main.go b/cmd/openshift-install/main.go index f3076202df2..8e43179a87a 100644 --- a/cmd/openshift-install/main.go +++ b/cmd/openshift-install/main.go @@ -31,8 +31,7 @@ func main() { } if err := rootCmd.Execute(); err != nil { - cause := errors.Cause(err) - logrus.Fatalf("Error executing openshift-install: %v", cause) + logrus.Fatalf("Error executing openshift-install: %v", err) } } diff --git a/cmd/openshift-install/watch.go b/cmd/openshift-install/watch.go new file mode 100644 index 00000000000..8bac42d8c82 --- /dev/null +++ b/cmd/openshift-install/watch.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "errors" + "fmt" + + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/watch" + watchtools "k8s.io/client-go/tools/watch" +) + +// WatcherFunc is from https://github.com/kubernetes/kubernetes/pull/50102. +type WatcherFunc func(sinceResourceVersion string) (watch.Interface, error) + +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102. +type RetryWatcher struct { + lastResourceVersion string + watcherFunc WatcherFunc + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} +} + +// Until is from https://github.com/kubernetes/kubernetes/pull/50102. +func Until(ctx context.Context, initialResourceVersion string, watcherFunc WatcherFunc, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { + return watchtools.UntilWithoutRetry(ctx, NewRetryWatcher(initialResourceVersion, watcherFunc), conditions...) +} + +// NewRetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102. +func NewRetryWatcher(initialResourceVersion string, watcherFunc WatcherFunc) *RetryWatcher { + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherFunc: watcherFunc, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + } + go rw.receive() + return rw +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking and we need to check if we need to be able to stop while doing so! + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +func (rw *RetryWatcher) doReceive() bool { + watcher, err := rw.watcherFunc(rw.lastResourceVersion) + if err != nil { + status := apierrors.NewInternalError(fmt.Errorf("retry watcher: watcherFunc failed: %v", err)).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // Stop the watcher + return true + } + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + logrus.Debug("Stopping RetryWatcher.") + return true + case event, ok := <-ch: + if !ok { + logrus.Warningf("RetryWatcher - getting event failed! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion) + return false + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + status := apierrors.NewInternalError(errors.New("__internal__: RetryWatcher: doesn't support resourceVersion")).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher: object %#v doesn't support resourceVersion", event.Object)).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true + } + + // All is fine; send the event and update lastResourceVersion + ok = rw.send(event) + if !ok { + return true + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + _ = rw.send(event) + return true + + default: + logrus.Errorf("RetryWatcher failed to recognize Event type %q", event.Type) + status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher failed to recognize Event type %q", event.Type)).Status() + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &status, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true + } + } + } +} + +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + + for { + select { + case <-rw.stopChan: + logrus.Debug("Stopping RetryWatcher.") + return + default: + done := rw.doReceive() + if done { + return + } + } + } +} + +// ResultChan is from https://github.com/kubernetes/kubernetes/pull/50102. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop is from https://github.com/kubernetes/kubernetes/pull/50102. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done is from https://github.com/kubernetes/kubernetes/pull/50102. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +}