Skip to content

Commit

Permalink
cmd/openshift-install/create: Destroy bootstrap on bootstrap-complete
Browse files Browse the repository at this point in the history
Once the cluster is up, the bootstrap stuff is just a waste of
resources.  With this commit we make a reasonable effort to wait for
the bootstrap-complete event, and, if we see it, we tear down the
bootstrap resources.

The Kubernetes client initialization is based on [1] and the local
wait.go is based on [2].  Both of those are, like this project, under
the Apache 2.0 license.  Once kubernetes/kubernetes#50102 lands, we
can drop the local wait.go and use the upstream implementation.

I've dropped the errors.Cause call from main.go, because it was
dropping the wrapped context, which is useful context for debugging.
That Cause call is from dc118f2 (cmd: switch to already vendored
cobra, 2018-10-08, openshift#429), but that commit doesn't explicitly motivate
the call.

[1]: https://github.com/kubernetes/client-go/blob/v9.0.0/examples/out-of-cluster-client-configuration/main.go
[2]: https://github.com/kubernetes/kubernetes/pull/50102/files
  • Loading branch information
wking committed Nov 2, 2018
1 parent 921941a commit 127219f
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 3 deletions.
81 changes: 80 additions & 1 deletion cmd/openshift-install/create.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package main

import (
"context"
"fmt"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

"github.com/openshift/installer/pkg/asset"
"github.com/openshift/installer/pkg/asset/cluster"
Expand All @@ -16,6 +25,7 @@ import (
"github.com/openshift/installer/pkg/asset/installconfig"
"github.com/openshift/installer/pkg/asset/kubeconfig"
"github.com/openshift/installer/pkg/asset/manifests"
destroybootstrap "github.com/openshift/installer/pkg/destroy/bootstrap"
)

type target struct {
Expand Down Expand Up @@ -53,7 +63,9 @@ var targets = []target{{
command: &cobra.Command{
Use: "cluster",
Short: "Create an OpenShift cluster",
Long: "",
PostRunE: func(_ *cobra.Command, _ []string) error {
return destroyBootstrap(context.Background(), rootOpts.dir)
},
},
assets: []asset.WritableAsset{&cluster.TerraformVariables{}, &kubeconfig.Admin{}, &cluster.Cluster{}},
}}
Expand Down Expand Up @@ -128,3 +140,70 @@ func runTargetCmd(targets ...asset.WritableAsset) func(cmd *cobra.Command, args
return nil
}
}

// FIXME: pulling the kubeconfig and metadata out of the root
// directory is a bit cludgy when we already have them in memory.
func destroyBootstrap(ctx context.Context, directory string) (err error) {
logrus.Info("Waiting for bootstrap completion...")
config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(directory, "auth", "kubeconfig"))
if err != nil {
return errors.Wrap(err, "loading kubeconfig")
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return errors.Wrap(err, "creating a Kubernetes client")
}

discovery := client.Discovery()

apiContext, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
wait.Until(func() {
version, err := discovery.ServerVersion()
if err == nil {
logrus.Infof("API %s up", version)
cancel()
}
logrus.Debugf("API not up yet: %s", err)
}, 2*time.Second, apiContext.Done())

events := client.CoreV1().Events("kube-system")

eventContext, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
_, err = Until(
eventContext,
"",
func(sinceResourceVersion string) (watch.Interface, error) {
return events.Watch(metav1.ListOptions{
Watch: true,
ResourceVersion: sinceResourceVersion,
})
},
func(watchEvent watch.Event) (bool, error) {
event, ok := watchEvent.Object.(*corev1.Event)
if !ok {
return false, nil
}

if watchEvent.Type == watch.Error {
logrus.Debugf("error %s: %s", event.Name, event.Message)
return false, nil
}

if watchEvent.Type != watch.Added {
return false, nil
}

logrus.Debugf("added %s: %s", event.Name, event.Message)
return event.Name == "bootstrap-complete", nil
},
)
if err != nil {
return errors.Wrap(err, "waiting for bootstrap-complete")
}

logrus.Info("Destroying the bootstrap resources...")
return destroybootstrap.Destroy(rootOpts.dir)
}
3 changes: 1 addition & 2 deletions cmd/openshift-install/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func main() {
}

if err := rootCmd.Execute(); err != nil {
cause := errors.Cause(err)
logrus.Fatalf("Error executing openshift-install: %v", cause)
logrus.Fatalf("Error executing openshift-install: %v", err)
}
}

Expand Down
165 changes: 165 additions & 0 deletions cmd/openshift-install/watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"context"
"errors"
"fmt"

"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
watchtools "k8s.io/client-go/tools/watch"
)

// WatcherFunc is from https://github.com/kubernetes/kubernetes/pull/50102.
type WatcherFunc func(sinceResourceVersion string) (watch.Interface, error)

type resourceVersionGetter interface {
GetResourceVersion() string
}

// RetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102.
type RetryWatcher struct {
lastResourceVersion string
watcherFunc WatcherFunc
resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{}
}

// Until is from https://github.com/kubernetes/kubernetes/pull/50102.
func Until(ctx context.Context, initialResourceVersion string, watcherFunc WatcherFunc, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
return watchtools.UntilWithoutRetry(ctx, NewRetryWatcher(initialResourceVersion, watcherFunc), conditions...)
}

// NewRetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102.
func NewRetryWatcher(initialResourceVersion string, watcherFunc WatcherFunc) *RetryWatcher {
rw := &RetryWatcher{
lastResourceVersion: initialResourceVersion,
watcherFunc: watcherFunc,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
}
go rw.receive()
return rw
}

func (rw *RetryWatcher) send(event watch.Event) bool {
// Writing to an unbuffered channel is blocking and we need to check if we need to be able to stop while doing so!
select {
case rw.resultChan <- event:
return true
case <-rw.stopChan:
return false
}
}

func (rw *RetryWatcher) doReceive() bool {
watcher, err := rw.watcherFunc(rw.lastResourceVersion)
if err != nil {
status := apierrors.NewInternalError(fmt.Errorf("retry watcher: watcherFunc failed: %v", err)).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// Stop the watcher
return true
}
ch := watcher.ResultChan()
defer watcher.Stop()

for {
select {
case <-rw.stopChan:
logrus.Debug("Stopping RetryWatcher.")
return true
case event, ok := <-ch:
if !ok {
logrus.Warningf("RetryWatcher - getting event failed! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion)
return false
}

// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
status := apierrors.NewInternalError(errors.New("__internal__: RetryWatcher: doesn't support resourceVersion")).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true
}

resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" {
status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher: object %#v doesn't support resourceVersion", event.Object)).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true
}

// All is fine; send the event and update lastResourceVersion
ok = rw.send(event)
if !ok {
return true
}
rw.lastResourceVersion = resourceVersion

continue

case watch.Error:
_ = rw.send(event)
return true

default:
logrus.Errorf("RetryWatcher failed to recognize Event type %q", event.Type)
status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher failed to recognize Event type %q", event.Type)).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true
}
}
}
}

func (rw *RetryWatcher) receive() {
defer close(rw.doneChan)

for {
select {
case <-rw.stopChan:
logrus.Debug("Stopping RetryWatcher.")
return
default:
done := rw.doReceive()
if done {
return
}
}
}
}

// ResultChan is from https://github.com/kubernetes/kubernetes/pull/50102.
func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
return rw.resultChan
}

// Stop is from https://github.com/kubernetes/kubernetes/pull/50102.
func (rw *RetryWatcher) Stop() {
close(rw.stopChan)
}

// Done is from https://github.com/kubernetes/kubernetes/pull/50102.
func (rw *RetryWatcher) Done() <-chan struct{} {
return rw.doneChan
}

0 comments on commit 127219f

Please sign in to comment.