From 2b940439d0cf60f9e190c26371a57d1ef02f5114 Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Mon, 10 Jun 2019 17:08:32 -0400 Subject: [PATCH 1/6] feat(driver): support kubernetes --- pkg/driver/kubernetes_driver.go | 366 +++++++++++++++++++++++++++ pkg/driver/kubernetes_driver_test.go | 47 ++++ pkg/driver/lookup.go | 2 + 3 files changed, 415 insertions(+) create mode 100644 pkg/driver/kubernetes_driver.go create mode 100644 pkg/driver/kubernetes_driver_test.go diff --git a/pkg/driver/kubernetes_driver.go b/pkg/driver/kubernetes_driver.go new file mode 100644 index 00000000..6eccddf8 --- /dev/null +++ b/pkg/driver/kubernetes_driver.go @@ -0,0 +1,366 @@ +package driver + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/deislabs/cnab-go/driver" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1" + coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + k8sContainerName = "invocation" + k8sFileSecretVolume = "files" +) + +// KubernetesDriver is capable of running an invocation image in a Kubernetes cluster. +type KubernetesDriver struct { + Namespace string + ServiceAccountName string + LimitCPU resource.Quantity + LimitMemory resource.Quantity + ActiveDeadlineSeconds int64 + BackoffLimit int32 + SkipCleanup bool + skipJobStatusCheck bool + jobs batchclientv1.JobInterface + secrets coreclientv1.SecretInterface + pods coreclientv1.PodInterface + deletionPolicy metav1.DeletionPropagation + requiredCompletions int32 +} + +// NewKubernetesDriver initializes a Kubernetes driver. +func NewKubernetesDriver(namespace, serviceAccount string, conf *rest.Config) (*KubernetesDriver, error) { + driver := &KubernetesDriver{ + Namespace: namespace, + ServiceAccountName: serviceAccount, + } + driver.setDefaults() + err := driver.setClient(conf) + if err != nil { + return driver, err + } + + return driver, nil +} + +// Config returns the Kubernetes driver configuration options +func (k *KubernetesDriver) Config() map[string]string { + return map[string]string{ + "KUBE_NAMESPACE": "Increase verbosity. true, false are supported values", + "SERVICE_ACCOUNT": "Always pull image, even if locally available (0|1)", + "KUBE_CONFIG": "Absolute path to the kubeconfig file", + "MASTER_URL": "Kubernetes master endpoint", + } +} + +// SetConfig sets Kubernetes driver configuration +func (k *KubernetesDriver) SetConfig(settings map[string]string) { + k.setDefaults() + k.Namespace = settings["KUBE_NAMESPACE"] + k.ServiceAccountName = settings["SERVICE_ACCOUNT"] + + var kubeconfig string + if kpath := settings["KUBE_CONFIG"]; kpath != "" { + kubeconfig = kpath + } else if home := homeDir(); home != "" { + kubeconfig = filepath.Join(home, ".kube", "config") + } + + conf, err := clientcmd.BuildConfigFromFlags(settings["MASTER_URL"], kubeconfig) + if err != nil { + panic(err) + } + err = k.setClient(conf) + if err != nil { + panic(err) + } +} + +func (k *KubernetesDriver) setDefaults() { + k.SkipCleanup = false + k.BackoffLimit = 0 + k.ActiveDeadlineSeconds = 300 + k.requiredCompletions = 1 + k.deletionPolicy = metav1.DeletePropagationBackground +} + +func (k *KubernetesDriver) setClient(conf *rest.Config) error { + coreClient, err := coreclientv1.NewForConfig(conf) + if err != nil { + return err + } + batchClient, err := batchclientv1.NewForConfig(conf) + if err != nil { + return err + } + k.jobs = batchClient.Jobs(k.Namespace) + k.secrets = coreClient.Secrets(k.Namespace) + k.pods = coreClient.Pods(k.Namespace) + + return nil +} + +// Run executes the operation inside of the invocation image +func (k *KubernetesDriver) Run(op *driver.Operation) error { + if k.Namespace == "" { + return fmt.Errorf("KUBE_NAMESPACE is required") + } + labelMap := generateLabels(op) + meta := metav1.ObjectMeta{ + Namespace: k.Namespace, + GenerateName: generateNameTemplate(op), + Labels: labelMap, + } + // Mount SA token if a non-zero value for ServiceAccountName has been specified + mountServiceAccountToken := k.ServiceAccountName != "" + job := &batchv1.Job{ + ObjectMeta: meta, + Spec: batchv1.JobSpec{ + ActiveDeadlineSeconds: &k.ActiveDeadlineSeconds, + Completions: &k.requiredCompletions, + BackoffLimit: &k.BackoffLimit, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labelMap, + }, + Spec: v1.PodSpec{ + ServiceAccountName: k.ServiceAccountName, + AutomountServiceAccountToken: &mountServiceAccountToken, + RestartPolicy: v1.RestartPolicyNever, + }, + }, + }, + } + container := v1.Container{ + Name: k8sContainerName, + Image: op.Image, + Command: []string{"/cnab/app/run"}, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: k.LimitCPU, + v1.ResourceMemory: k.LimitMemory, + }, + }, + ImagePullPolicy: v1.PullIfNotPresent, + } + + if len(op.Environment) > 0 { + secret := &v1.Secret{ + ObjectMeta: meta, + StringData: op.Environment, + } + secret.ObjectMeta.GenerateName += "env-" + envsecret, err := k.secrets.Create(secret) + if err != nil { + return err + } + // TODO: add to slice of resources awaiting cleanup instead, then return err + if !k.SkipCleanup { + defer k.deleteSecret(envsecret.ObjectMeta.Name) + } + + container.EnvFrom = []v1.EnvFromSource{ + v1.EnvFromSource{ + SecretRef: &v1.SecretEnvSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: envsecret.ObjectMeta.Name, + }, + }, + }, + } + } + + if len(op.Files) > 0 { + secret, mounts := generateFileSecret(op.Files) + secret.ObjectMeta = metav1.ObjectMeta{ + Namespace: k.Namespace, + GenerateName: generateNameTemplate(op) + "files-", + Labels: labelMap, + } + secret, err := k.secrets.Create(secret) + if err != nil { + return err + } + if !k.SkipCleanup { + defer k.deleteSecret(secret.ObjectMeta.Name) + } + + job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, v1.Volume{ + Name: k8sFileSecretVolume, + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: secret.ObjectMeta.Name, + }, + }, + }) + container.VolumeMounts = mounts + } + + job.Spec.Template.Spec.Containers = []v1.Container{container} + job, err := k.jobs.Create(job) + if err != nil { + return err + } + if !k.SkipCleanup { + defer k.deleteJob(job.ObjectMeta.Name) + } + + // Return early for unit testing purposes (the fake k8s client implementation just + // hangs during watch because no events are ever created on the Job) + if k.skipJobStatusCheck { + return nil + } + + selector := metav1.ListOptions{ + LabelSelector: labels.Set(job.ObjectMeta.Labels).String(), + } + // Stream Pod logs in the background + logsStreamingComplete := make(chan bool) + err = k.streamPodLogs(selector, op.Out, logsStreamingComplete) + if err != nil { + return err + } + // Watch job events and exit on failure/success + watch, err := k.jobs.Watch(selector) + if err != nil { + return err + } + for event := range watch.ResultChan() { + job, ok := event.Object.(*batchv1.Job) + if !ok { + return fmt.Errorf("unexpected type") + } + complete := false + for _, cond := range job.Status.Conditions { + if cond.Type == batchv1.JobFailed { + err = fmt.Errorf(cond.Message) + complete = true + break + } + if cond.Type == batchv1.JobComplete { + complete = true + break + } + } + if complete { + break + } + } + + // Wait for pod logs to finish printing + for i := 0; i < int(k.requiredCompletions); i++ { + <-logsStreamingComplete + } + + return err +} + +// Handles receives an ImageType* and answers whether this driver supports that type +func (k *KubernetesDriver) Handles(imagetype string) bool { + return imagetype == driver.ImageTypeDocker || imagetype == driver.ImageTypeOCI +} + +func (k *KubernetesDriver) streamPodLogs(options metav1.ListOptions, out io.Writer, done chan bool) error { + watch, err := k.pods.Watch(options) + if err != nil { + return err + } + + go func() { + watching := map[string]bool{} + for event := range watch.ResultChan() { + pod, ok := event.Object.(*v1.Pod) + if !ok { + continue + } + podName := pod.GetName() + if watching[podName] { + continue + } + req := k.pods.GetLogs(podName, &v1.PodLogOptions{ + Container: k8sContainerName, + Follow: true, + }) + reader, err := req.Stream() + if err != nil { + continue + } + + watching[podName] = true + io.Copy(out, reader) + reader.Close() + done <- true + } + }() + + return nil +} + +func (k *KubernetesDriver) deleteSecret(name string) error { + return k.secrets.Delete(name, &metav1.DeleteOptions{ + PropagationPolicy: &k.deletionPolicy, + }) +} + +func (k *KubernetesDriver) deleteJob(name string) error { + return k.jobs.Delete(name, &metav1.DeleteOptions{ + PropagationPolicy: &k.deletionPolicy, + }) +} + +// TODO: limit character count +func generateNameTemplate(op *driver.Operation) string { + return fmt.Sprintf("%s-%s-", op.Installation, op.Action) +} + +func generateLabels(op *driver.Operation) map[string]string { + return map[string]string{ + "cnab.io/installation": op.Installation, + "cnab.io/action": op.Action, + "cnab.io/revision": op.Revision, + } +} + +func generateFileSecret(files map[string]string) (*v1.Secret, []v1.VolumeMount) { + size := len(files) + data := make(map[string]string, size) + mounts := make([]v1.VolumeMount, size) + + i := 0 + for path, contents := range files { + key := strings.ReplaceAll(filepath.ToSlash(path), "/", "_") + data[key] = contents + mounts[i] = v1.VolumeMount{ + Name: k8sFileSecretVolume, + MountPath: path, + SubPath: key, + } + i++ + } + + secret := &v1.Secret{ + StringData: data, + } + + return secret, mounts +} + +func homeDir() string { + if h := os.Getenv("HOME"); h != "" { + return h + } + return os.Getenv("USERPROFILE") // windows +} diff --git a/pkg/driver/kubernetes_driver_test.go b/pkg/driver/kubernetes_driver_test.go new file mode 100644 index 00000000..dd257ffa --- /dev/null +++ b/pkg/driver/kubernetes_driver_test.go @@ -0,0 +1,47 @@ +package driver + +// TODO(jlegrone): uncomment tests when k8s.io/client-go/kubernetes/fake is available + +// import ( +// "os" +// "testing" + +// "github.com/deislabs/cnab-go/driver" +// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +// "k8s.io/client-go/kubernetes/fake" +// ) + +// func TestKubernetesRun(t *testing.T) { +// client := fake.NewSimpleClientset() +// namespace := "default" +// k := KubernetesDriver{ +// Namespace: namespace, +// jobs: client.BatchV1().Jobs(namespace), +// secrets: client.CoreV1().Secrets(namespace), +// pods: client.CoreV1().Pods(namespace), +// SkipCleanup: true, +// skipJobStatusCheck: true, +// } +// op := driver.Operation{ +// Action: "install", +// Out: os.Stdout, +// Environment: map[string]string{ +// "foo": "bar", +// }, +// } + +// err := k.Run(&op) +// if err != nil { +// t.Error(err) +// } + +// jobList, _ := k.jobs.List(metav1.ListOptions{}) +// if len(jobList.Items) != 1 { +// t.Errorf("Expected one item in jobList, got %d", len(jobList.Items)) +// } + +// secretList, _ := k.secrets.List(metav1.ListOptions{}) +// if len(secretList.Items) != 1 { +// t.Errorf("Expected one item in secretList, got %d", len(secretList.Items)) +// } +// } diff --git a/pkg/driver/lookup.go b/pkg/driver/lookup.go index de58fe9d..362eb72e 100644 --- a/pkg/driver/lookup.go +++ b/pkg/driver/lookup.go @@ -11,6 +11,8 @@ func Lookup(name string) (driver.Driver, error) { switch name { case "docker": return &DockerDriver{}, nil + case "kubernetes", "k8s": + return &KubernetesDriver{}, nil case "debug": return &driver.DebugDriver{}, nil case "command": From 602b001e37d271aee608fd144ef1056fe7732966 Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Tue, 11 Jun 2019 14:59:52 -0400 Subject: [PATCH 2/6] Improve code comments and fix config help --- pkg/driver/kubernetes_driver.go | 37 ++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/pkg/driver/kubernetes_driver.go b/pkg/driver/kubernetes_driver.go index 6eccddf8..35fae644 100644 --- a/pkg/driver/kubernetes_driver.go +++ b/pkg/driver/kubernetes_driver.go @@ -24,7 +24,7 @@ const ( k8sFileSecretVolume = "files" ) -// KubernetesDriver is capable of running an invocation image in a Kubernetes cluster. +// KubernetesDriver runs an invocation image in a Kubernetes cluster. type KubernetesDriver struct { Namespace string ServiceAccountName string @@ -49,24 +49,20 @@ func NewKubernetesDriver(namespace, serviceAccount string, conf *rest.Config) (* } driver.setDefaults() err := driver.setClient(conf) - if err != nil { - return driver, err - } - - return driver, nil + return driver, err } -// Config returns the Kubernetes driver configuration options +// Config returns the Kubernetes driver configuration options. func (k *KubernetesDriver) Config() map[string]string { return map[string]string{ - "KUBE_NAMESPACE": "Increase verbosity. true, false are supported values", - "SERVICE_ACCOUNT": "Always pull image, even if locally available (0|1)", + "KUBE_NAMESPACE": "Kubernetes namespace in which to run the invocation image", + "SERVICE_ACCOUNT": "Kubernetes service account to be mounted by the invocation image (if empty, no service account token will be mounted)", "KUBE_CONFIG": "Absolute path to the kubeconfig file", "MASTER_URL": "Kubernetes master endpoint", } } -// SetConfig sets Kubernetes driver configuration +// SetConfig sets Kubernetes driver configuration. func (k *KubernetesDriver) SetConfig(settings map[string]string) { k.setDefaults() k.Namespace = settings["KUBE_NAMESPACE"] @@ -113,7 +109,7 @@ func (k *KubernetesDriver) setClient(conf *rest.Config) error { return nil } -// Run executes the operation inside of the invocation image +// Run executes the operation inside of the invocation image. func (k *KubernetesDriver) Run(op *driver.Operation) error { if k.Namespace == "" { return fmt.Errorf("KUBE_NAMESPACE is required") @@ -268,26 +264,29 @@ func (k *KubernetesDriver) Run(op *driver.Operation) error { return err } -// Handles receives an ImageType* and answers whether this driver supports that type +// Handles receives an ImageType* and answers whether this driver supports that type. func (k *KubernetesDriver) Handles(imagetype string) bool { return imagetype == driver.ImageTypeDocker || imagetype == driver.ImageTypeOCI } func (k *KubernetesDriver) streamPodLogs(options metav1.ListOptions, out io.Writer, done chan bool) error { - watch, err := k.pods.Watch(options) + watcher, err := k.pods.Watch(options) if err != nil { return err } go func() { - watching := map[string]bool{} - for event := range watch.ResultChan() { + // Track pods whose logs have been streamed by pod name. We need to know when we've already + // processed logs for a given pod, since multiple lifecycle events are received per pod. + streamedLogs := map[string]bool{} + for event := range watcher.ResultChan() { pod, ok := event.Object.(*v1.Pod) if !ok { continue } podName := pod.GetName() - if watching[podName] { + if streamedLogs[podName] { + // The event was for a pod whose logs have already been streamed, so do nothing. continue } req := k.pods.GetLogs(podName, &v1.PodLogOptions{ @@ -295,11 +294,15 @@ func (k *KubernetesDriver) streamPodLogs(options metav1.ListOptions, out io.Writ Follow: true, }) reader, err := req.Stream() + // There was an error connecting to the pod, so continue the loop and attempt streaming + // logs again next time there is an event for the same pod. if err != nil { continue } - watching[podName] = true + // We successfully connected to the pod, so mark it as having streamed logs. + streamedLogs[podName] = true + // Block the loop until all logs from the pod have been processed. io.Copy(out, reader) reader.Close() done <- true From 474db9c0cf292d4f15a83b649be6e7b58038f79c Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Tue, 11 Jun 2019 15:01:18 -0400 Subject: [PATCH 3/6] Remove _driver from file names --- pkg/driver/{kubernetes_driver.go => kubernetes.go} | 0 pkg/driver/{kubernetes_driver_test.go => kubernetes_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename pkg/driver/{kubernetes_driver.go => kubernetes.go} (100%) rename pkg/driver/{kubernetes_driver_test.go => kubernetes_test.go} (100%) diff --git a/pkg/driver/kubernetes_driver.go b/pkg/driver/kubernetes.go similarity index 100% rename from pkg/driver/kubernetes_driver.go rename to pkg/driver/kubernetes.go diff --git a/pkg/driver/kubernetes_driver_test.go b/pkg/driver/kubernetes_test.go similarity index 100% rename from pkg/driver/kubernetes_driver_test.go rename to pkg/driver/kubernetes_test.go From 0895168d7c97e25c11d0f3e90137bd1df262c512 Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Tue, 11 Jun 2019 15:02:34 -0400 Subject: [PATCH 4/6] Add kubernetes driver integration test This is behind a build flag. Usage: go test -tags=integration ./... --- pkg/driver/kubernetes_integration_test.go | 62 +++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 pkg/driver/kubernetes_integration_test.go diff --git a/pkg/driver/kubernetes_integration_test.go b/pkg/driver/kubernetes_integration_test.go new file mode 100644 index 00000000..7de1766d --- /dev/null +++ b/pkg/driver/kubernetes_integration_test.go @@ -0,0 +1,62 @@ +// +build integration + +package driver + +import ( + "bytes" + "testing" + + "github.com/deislabs/cnab-go/driver" + "github.com/stretchr/testify/assert" +) + +func TestKubernetesDriver_Run_Integration(t *testing.T) { + namespace := "default" + k := &KubernetesDriver{} + k.SetConfig(map[string]string{ + "KUBE_NAMESPACE": namespace, + }) + k.ActiveDeadlineSeconds = 60 + + cases := []struct { + name string + op *driver.Operation + output string + err error + }{ + { + name: "install", + op: &driver.Operation{ + Installation: "example", + Action: "install", + Image: "cnab/helloworld@sha256:55f83710272990efab4e076f9281453e136980becfd879640b06552ead751284", + Environment: map[string]string{ + "PORT": "3000", + }, + }, + output: "Port parameter was set to 3000\nInstall action\nAction install complete for example\n", + err: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var output bytes.Buffer + tc.op.Out = &output + if tc.op.Environment == nil { + tc.op.Environment = map[string]string{} + } + tc.op.Environment["CNAB_ACTION"] = tc.op.Action + tc.op.Environment["CNAB_INSTALLATION_NAME"] = tc.op.Installation + + err := k.Run(tc.op) + + if tc.err != nil { + assert.EqualError(t, err, tc.err.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tc.output, output.String()) + }) + } +} From d6d15041b3c29afc0c00127556f515a37567dae4 Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Tue, 11 Jun 2019 17:26:32 -0400 Subject: [PATCH 5/6] Add kubernetes driver unit test --- Gopkg.lock | 56 ++++++++++++++++++++++++++- pkg/driver/kubernetes_test.go | 73 ++++++++++++++++------------------- 2 files changed, 87 insertions(+), 42 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 22267623..e486b006 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1093,7 +1093,7 @@ revision = "kubernetes-1.11.2" [[projects]] - digest = "1:13a711fa4a9db8c05c1217196f149dc3cbf01ccc2050bf1608ba5ab9e5d7afd3" + digest = "1:a20e293fab86a640f5cd1e730c24909d9940ed936e25242031395fea9e317de2" name = "k8s.io/apimachinery" packages = [ "pkg/api/errors", @@ -1121,56 +1121,90 @@ "pkg/util/framer", "pkg/util/intstr", "pkg/util/json", + "pkg/util/mergepatch", "pkg/util/net", "pkg/util/runtime", "pkg/util/sets", + "pkg/util/strategicpatch", "pkg/util/validation", "pkg/util/validation/field", "pkg/util/wait", "pkg/util/yaml", "pkg/version", "pkg/watch", + "third_party/forked/golang/json", "third_party/forked/golang/reflect", ] pruneopts = "NUT" revision = "kubernetes-1.11.2" [[projects]] - digest = "1:67aa6c8d98372e4816c5a2e63f7bbc852648b822a087df9d009c82b7ce20e71c" + digest = "1:49e04e9242805279031c8ee812a5dd489e9809aa9dd945ff19f5d096f6c19949" name = "k8s.io/client-go" packages = [ "discovery", + "discovery/fake", "kubernetes", + "kubernetes/fake", "kubernetes/scheme", "kubernetes/typed/admissionregistration/v1alpha1", + "kubernetes/typed/admissionregistration/v1alpha1/fake", "kubernetes/typed/admissionregistration/v1beta1", + "kubernetes/typed/admissionregistration/v1beta1/fake", "kubernetes/typed/apps/v1", + "kubernetes/typed/apps/v1/fake", "kubernetes/typed/apps/v1beta1", + "kubernetes/typed/apps/v1beta1/fake", "kubernetes/typed/apps/v1beta2", + "kubernetes/typed/apps/v1beta2/fake", "kubernetes/typed/authentication/v1", + "kubernetes/typed/authentication/v1/fake", "kubernetes/typed/authentication/v1beta1", + "kubernetes/typed/authentication/v1beta1/fake", "kubernetes/typed/authorization/v1", + "kubernetes/typed/authorization/v1/fake", "kubernetes/typed/authorization/v1beta1", + "kubernetes/typed/authorization/v1beta1/fake", "kubernetes/typed/autoscaling/v1", + "kubernetes/typed/autoscaling/v1/fake", "kubernetes/typed/autoscaling/v2beta1", + "kubernetes/typed/autoscaling/v2beta1/fake", "kubernetes/typed/batch/v1", + "kubernetes/typed/batch/v1/fake", "kubernetes/typed/batch/v1beta1", + "kubernetes/typed/batch/v1beta1/fake", "kubernetes/typed/batch/v2alpha1", + "kubernetes/typed/batch/v2alpha1/fake", "kubernetes/typed/certificates/v1beta1", + "kubernetes/typed/certificates/v1beta1/fake", "kubernetes/typed/core/v1", + "kubernetes/typed/core/v1/fake", "kubernetes/typed/events/v1beta1", + "kubernetes/typed/events/v1beta1/fake", "kubernetes/typed/extensions/v1beta1", + "kubernetes/typed/extensions/v1beta1/fake", "kubernetes/typed/networking/v1", + "kubernetes/typed/networking/v1/fake", "kubernetes/typed/policy/v1beta1", + "kubernetes/typed/policy/v1beta1/fake", "kubernetes/typed/rbac/v1", + "kubernetes/typed/rbac/v1/fake", "kubernetes/typed/rbac/v1alpha1", + "kubernetes/typed/rbac/v1alpha1/fake", "kubernetes/typed/rbac/v1beta1", + "kubernetes/typed/rbac/v1beta1/fake", "kubernetes/typed/scheduling/v1alpha1", + "kubernetes/typed/scheduling/v1alpha1/fake", "kubernetes/typed/scheduling/v1beta1", + "kubernetes/typed/scheduling/v1beta1/fake", "kubernetes/typed/settings/v1alpha1", + "kubernetes/typed/settings/v1alpha1/fake", "kubernetes/typed/storage/v1", + "kubernetes/typed/storage/v1/fake", "kubernetes/typed/storage/v1alpha1", + "kubernetes/typed/storage/v1alpha1/fake", "kubernetes/typed/storage/v1beta1", + "kubernetes/typed/storage/v1beta1/fake", "pkg/apis/clientauthentication", "pkg/apis/clientauthentication/v1alpha1", "pkg/apis/clientauthentication/v1beta1", @@ -1178,6 +1212,7 @@ "plugin/pkg/client/auth/exec", "rest", "rest/watch", + "testing", "tools/auth", "tools/clientcmd", "tools/clientcmd/api", @@ -1195,6 +1230,13 @@ pruneopts = "NUT" revision = "kubernetes-1.11.2" +[[projects]] + digest = "1:a2c842a1e0aed96fd732b535514556323a6f5edfded3b63e5e0ab1bce188aa54" + name = "k8s.io/kube-openapi" + packages = ["pkg/util/proto"] + pruneopts = "NUT" + revision = "d8ea2fe547a448256204cfc68dfee7b26c720acb" + [[projects]] branch = "master" digest = "1:ccaa5810f7d18dbe4bacc8cfdb5b5060282fd607c78ed92f584d706012fbd40b" @@ -1252,7 +1294,17 @@ "golang.org/x/net/context", "gopkg.in/AlecAivazis/survey.v1", "gopkg.in/yaml.v2", + "k8s.io/api/batch/v1", + "k8s.io/api/core/v1", + "k8s.io/apimachinery/pkg/api/resource", + "k8s.io/apimachinery/pkg/apis/meta/v1", + "k8s.io/apimachinery/pkg/labels", "k8s.io/apimachinery/pkg/util/validation", + "k8s.io/client-go/kubernetes/fake", + "k8s.io/client-go/kubernetes/typed/batch/v1", + "k8s.io/client-go/kubernetes/typed/core/v1", + "k8s.io/client-go/rest", + "k8s.io/client-go/tools/clientcmd", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/pkg/driver/kubernetes_test.go b/pkg/driver/kubernetes_test.go index dd257ffa..f92a34ab 100644 --- a/pkg/driver/kubernetes_test.go +++ b/pkg/driver/kubernetes_test.go @@ -1,47 +1,40 @@ package driver -// TODO(jlegrone): uncomment tests when k8s.io/client-go/kubernetes/fake is available +import ( + "os" + "testing" -// import ( -// "os" -// "testing" + "github.com/deislabs/cnab-go/driver" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) -// "github.com/deislabs/cnab-go/driver" -// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -// "k8s.io/client-go/kubernetes/fake" -// ) +func TestKubernetesDriver_Run(t *testing.T) { + client := fake.NewSimpleClientset() + namespace := "default" + k := KubernetesDriver{ + Namespace: namespace, + jobs: client.BatchV1().Jobs(namespace), + secrets: client.CoreV1().Secrets(namespace), + pods: client.CoreV1().Pods(namespace), + SkipCleanup: true, + skipJobStatusCheck: true, + } + op := driver.Operation{ + Action: "install", + Out: os.Stdout, + Environment: map[string]string{ + "foo": "bar", + }, + } -// func TestKubernetesRun(t *testing.T) { -// client := fake.NewSimpleClientset() -// namespace := "default" -// k := KubernetesDriver{ -// Namespace: namespace, -// jobs: client.BatchV1().Jobs(namespace), -// secrets: client.CoreV1().Secrets(namespace), -// pods: client.CoreV1().Pods(namespace), -// SkipCleanup: true, -// skipJobStatusCheck: true, -// } -// op := driver.Operation{ -// Action: "install", -// Out: os.Stdout, -// Environment: map[string]string{ -// "foo": "bar", -// }, -// } + err := k.Run(&op) + assert.NoError(t, err) -// err := k.Run(&op) -// if err != nil { -// t.Error(err) -// } + jobList, _ := k.jobs.List(metav1.ListOptions{}) + assert.Equal(t, len(jobList.Items), 1, "expected one job to be created") -// jobList, _ := k.jobs.List(metav1.ListOptions{}) -// if len(jobList.Items) != 1 { -// t.Errorf("Expected one item in jobList, got %d", len(jobList.Items)) -// } - -// secretList, _ := k.secrets.List(metav1.ListOptions{}) -// if len(secretList.Items) != 1 { -// t.Errorf("Expected one item in secretList, got %d", len(secretList.Items)) -// } -// } + secretList, _ := k.secrets.List(metav1.ListOptions{}) + assert.Equal(t, len(secretList.Items), 1, "expected one secret to be created") +} From 206a2bec45ddddb9dcf558581f27249a80db835f Mon Sep 17 00:00:00 2001 From: Jacob LeGrone Date: Tue, 11 Jun 2019 17:49:57 -0400 Subject: [PATCH 6/6] Move job/log watching to separate function, remove todos --- pkg/driver/kubernetes.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/driver/kubernetes.go b/pkg/driver/kubernetes.go index 35fae644..0ec3b750 100644 --- a/pkg/driver/kubernetes.go +++ b/pkg/driver/kubernetes.go @@ -52,6 +52,11 @@ func NewKubernetesDriver(namespace, serviceAccount string, conf *rest.Config) (* return driver, err } +// Handles receives an ImageType* and answers whether this driver supports that type. +func (k *KubernetesDriver) Handles(imagetype string) bool { + return imagetype == driver.ImageTypeDocker || imagetype == driver.ImageTypeOCI +} + // Config returns the Kubernetes driver configuration options. func (k *KubernetesDriver) Config() map[string]string { return map[string]string{ @@ -163,7 +168,6 @@ func (k *KubernetesDriver) Run(op *driver.Operation) error { if err != nil { return err } - // TODO: add to slice of resources awaiting cleanup instead, then return err if !k.SkipCleanup { defer k.deleteSecret(envsecret.ObjectMeta.Name) } @@ -223,9 +227,14 @@ func (k *KubernetesDriver) Run(op *driver.Operation) error { selector := metav1.ListOptions{ LabelSelector: labels.Set(job.ObjectMeta.Labels).String(), } + + return k.watchJobStatusAndLogs(selector, op.Out) +} + +func (k *KubernetesDriver) watchJobStatusAndLogs(selector metav1.ListOptions, out io.Writer) error { // Stream Pod logs in the background logsStreamingComplete := make(chan bool) - err = k.streamPodLogs(selector, op.Out, logsStreamingComplete) + err := k.streamPodLogs(selector, out, logsStreamingComplete) if err != nil { return err } @@ -261,12 +270,7 @@ func (k *KubernetesDriver) Run(op *driver.Operation) error { <-logsStreamingComplete } - return err -} - -// Handles receives an ImageType* and answers whether this driver supports that type. -func (k *KubernetesDriver) Handles(imagetype string) bool { - return imagetype == driver.ImageTypeDocker || imagetype == driver.ImageTypeOCI + return nil } func (k *KubernetesDriver) streamPodLogs(options metav1.ListOptions, out io.Writer, done chan bool) error { @@ -324,7 +328,6 @@ func (k *KubernetesDriver) deleteJob(name string) error { }) } -// TODO: limit character count func generateNameTemplate(op *driver.Operation) string { return fmt.Sprintf("%s-%s-", op.Installation, op.Action) }