Skip to content

Commit

Permalink
Boskos: Use cache for reading data
Browse files Browse the repository at this point in the history
  • Loading branch information
alvaroaleman committed Feb 12, 2020
1 parent 87c0597 commit 4b55256
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 12 deletions.
4 changes: 2 additions & 2 deletions boskos/cmd/boskos/boskos.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func main() {
// main server with the main mux until we're ready
health := pjutil.NewHealth()

client, err := kubeClientOptions.Client()
client, err := kubeClientOptions.CacheBackedClient(*namespace, &crds.ResourceObject{}, &crds.DRLCObject{})
if err != nil {
logrus.WithError(err).Fatal("unable to construct client")
logrus.WithError(err).Fatal("unable to get client")
}

storage, err := ranch.NewStorage(interrupts.Context(), client, *namespace, *storagePath)
Expand Down
3 changes: 3 additions & 0 deletions boskos/crds/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//boskos/common:go_default_library",
"//prow/interrupts:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_apiextensions_apiserver//pkg/apis/apiextensions/v1beta1:go_default_library",
"@io_k8s_apiextensions_apiserver//pkg/client/clientset/clientset:go_default_library",
"@io_k8s_apimachinery//pkg/api/errors:go_default_library",
Expand All @@ -23,6 +25,7 @@ go_library(
"@io_k8s_client_go//tools/clientcmd:go_default_library",
"@io_k8s_sigs_controller_runtime//pkg/client:go_default_library",
"@io_k8s_sigs_controller_runtime//pkg/client/fake:go_default_library",
"@io_k8s_sigs_controller_runtime//pkg/manager:go_default_library",
],
)

Expand Down
90 changes: 80 additions & 10 deletions boskos/crds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package crds

import (
"context"
"errors"
"flag"
"fmt"
"os"
"time"

"k8s.io/test-infra/boskos/common"

"github.com/sirupsen/logrus"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,6 +34,10 @@ import (
"k8s.io/client-go/tools/clientcmd"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
fakectrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/test-infra/boskos/common"
"k8s.io/test-infra/prow/interrupts"
)

// KubernetesClientOptions are flag options used to create a kube client.
Expand Down Expand Up @@ -63,22 +69,86 @@ func (o *KubernetesClientOptions) Client() (ctrlruntimeclient.Client, error) {
return fakectrlruntimeclient.NewFakeClient(), nil
}

var config *rest.Config
cfg, err := o.cfg()
if err != nil {
return nil, err
}

if err := registerResources(cfg); err != nil {
return nil, fmt.Errorf("failed to create CRDs: %v", err)
}

return ctrlruntimeclient.New(cfg, ctrlruntimeclient.Options{})
}

// CacheBackedClient returns a client whose Reader is cache backed. Namespace can be empty
// in which case the client will use all namespaces.
// It blocks until the cache was synced for all types passed in startCacheFor.
func (o *KubernetesClientOptions) CacheBackedClient(namespace string, startCacheFor ...runtime.Object) (ctrlruntimeclient.Client, error) {
if o.inMemory {
return fakectrlruntimeclient.NewFakeClient(), nil
}

cfg, err := o.cfg()
if err != nil {
return nil, err
}

if err := registerResources(cfg); err != nil {
return nil, fmt.Errorf("failed to create CRDs: %v", err)
}

mgr, err := manager.New(cfg, manager.Options{LeaderElection: false, Namespace: namespace})
if err != nil {
return nil, fmt.Errorf("failed to construct manager: %v", err)
}

// Allocate an informer so our cache actually waits for these types to
// be synced. Must be done before we start the mgr, else this may block
// indefinitely if there is an issue.
for _, t := range startCacheFor {
if _, err := mgr.GetCache().GetInformer(t); err != nil {
return nil, fmt.Errorf("failed to get informer for type %T: %v", t, err)
}
}

interrupts.Run(func(ctx context.Context) {
// Exiting like this is not nice, but the interrupts package
// doesn't allow us to stop the app. Furthermore, the behaviour
// of the reading client is undefined after the manager stops,
// so we should bail ASAP.
if err := mgr.Start(ctx.Done()); err != nil {
logrus.WithError(err).Fatal("Mgr failed.")
}
logrus.Info("Mgr finished gracefully.")
os.Exit(0)
})

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

startSyncTime := time.Now()
if synced := mgr.GetCache().WaitForCacheSync(ctx.Done()); !synced {
return nil, errors.New("timeout waiting for cache sync")
}
logrus.WithField("sync-duration", time.Since(startSyncTime).String()).Info("Cache synced")

return mgr.GetClient(), nil
}

func (o *KubernetesClientOptions) cfg() (*rest.Config, error) {
var cfg *rest.Config
var err error
if o.kubeConfig == "" {
config, err = rest.InClusterConfig()
cfg, err = rest.InClusterConfig()
} else {
config, err = clientcmd.BuildConfigFromFlags("", o.kubeConfig)
cfg, err = clientcmd.BuildConfigFromFlags("", o.kubeConfig)
}
if err != nil {
return nil, fmt.Errorf("failed to construct rest config: %v", err)
}

if err = registerResources(config); err != nil {
return nil, fmt.Errorf("failed to create CRDs: %v", err)
}

return ctrlruntimeclient.New(config, ctrlruntimeclient.Options{})
return cfg, nil
}

// Type defines a Custom Resource Definition (CRD) Type.
Expand Down

0 comments on commit 4b55256

Please sign in to comment.