diff --git a/pkg/kube/job.go b/pkg/kube/job.go deleted file mode 100644 index 75e4edbe7f..0000000000 --- a/pkg/kube/job.go +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright 2019 The Kanister Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kube - -import ( - "context" - "fmt" - - "github.com/gofrs/uuid" - "github.com/pkg/errors" - batch "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - - "github.com/kanisterio/kanister/pkg/field" - "github.com/kanisterio/kanister/pkg/log" -) - -const defautlJobPodName = "kanister-job-pod" -const defaultJobPodContainer = "kanister-job-container" - -// Job object is used for running the user specified container as a Kubernetes job. -type Job struct { - image string - command []string - namespace string - name string - sa string - // vols is a map of PVC->Mount points to add to the job pod spec - vols map[string]VolumeMountOptions - clientset kubernetes.Interface -} - -// NewJob creates a new Job object. -func NewJob(clientset kubernetes.Interface, - jobName string, - namespace string, - serviceAccount string, - image string, - vols map[string]VolumeMountOptions, - command ...string, -) (*Job, error) { - if jobName == "" { - return nil, errors.New("Job name is required") - } - - if image == "" { - return nil, errors.New("Container image needs to be passed") - } - - if namespace == "" { - log.Debug().Print("No namespace specified. Using \"default\".") - namespace = "default" - } - - if clientset == nil { - return nil, errors.New("No clientset object provided") - } - - if len(command) == 0 || command[0] == "" { - return nil, errors.New("Command needs to be passed") - } - - return &Job{image, command, namespace, jobName, serviceAccount, vols, clientset}, nil -} - -// Create creates the Job in Kubernetes. -func (job *Job) Create() error { - falseVal := false - ctx := context.TODO() - volumeMounts, podVolumes, err := createFilesystemModeVolumeSpecs(ctx, job.vols) - if err != nil { - return errors.Wrapf(err, "Failed to create volume spec for job %s", job.name) - } - k8sJob := &batch.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: job.name, - }, - TypeMeta: metav1.TypeMeta{ - Kind: "Job", - APIVersion: "v1", - }, - Spec: batch.JobSpec{ - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Name: defautlJobPodName, - Labels: make(map[string]string), - }, - Spec: corev1.PodSpec{ - ServiceAccountName: job.sa, - Containers: []corev1.Container{ - { - Name: defaultJobPodContainer, - Image: job.image, - Command: job.command, - SecurityContext: &corev1.SecurityContext{ - Privileged: &falseVal, - }, - ImagePullPolicy: corev1.PullPolicy(corev1.PullIfNotPresent), - VolumeMounts: volumeMounts, - }, - }, - RestartPolicy: corev1.RestartPolicyOnFailure, - Volumes: podVolumes, - }, - }, - }, - } - - batchClient := job.clientset.BatchV1() - jobsClient := batchClient.Jobs(job.namespace) - - newJob, err := jobsClient.Create(ctx, k8sJob, metav1.CreateOptions{}) - if err != nil { - return errors.Wrapf(err, "Failed to create job %s", job.name) - } - job.name = newJob.Name - log.Print("New job created", field.M{"JobName": job.name}) - - return nil -} - -func createFilesystemModeVolumeSpecs( - ctx context.Context, - vols map[string]VolumeMountOptions, -) (volumeMounts []corev1.VolumeMount, podVolumes []corev1.Volume, error error) { - // Build filesystem mode volume specs - for pvcName, mountOpts := range vols { - id, err := uuid.NewV1() - if err != nil { - return nil, nil, err - } - - if mountOpts.ReadOnly { - log.Debug().WithContext(ctx).Print("PVC will be mounted in read-only mode", field.M{"pvcName": pvcName}) - } - - podVolName := fmt.Sprintf("vol-%s", id.String()) - volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: podVolName, MountPath: mountOpts.MountPath, ReadOnly: mountOpts.ReadOnly}) - podVolumes = append(podVolumes, - corev1.Volume{ - Name: podVolName, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvcName, - ReadOnly: mountOpts.ReadOnly, - }, - }, - }, - ) - } - return volumeMounts, podVolumes, nil -} - -func createBlockModeVolumeSpecs(blockVols map[string]string) (volumeDevices []corev1.VolumeDevice, podVolumes []corev1.Volume, error error) { - // Build block mode volume specs - for pvc, devicePath := range blockVols { - id, err := uuid.NewV1() - if err != nil { - return nil, nil, err - } - podBlockVolName := fmt.Sprintf("block-%s", id.String()) - volumeDevices = append(volumeDevices, corev1.VolumeDevice{Name: podBlockVolName, DevicePath: devicePath}) - podVolumes = append(podVolumes, - corev1.Volume{ - Name: podBlockVolName, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvc, - }, - }, - }, - ) - } - return volumeDevices, podVolumes, nil -} - -// WaitForCompletion waits for the job to run to completion. -func (job *Job) WaitForCompletion(ctx context.Context) error { - batchClient := job.clientset.BatchV1() - jobsClient := batchClient.Jobs(job.namespace) - watch, err := jobsClient.Watch(ctx, metav1.ListOptions{LabelSelector: "job-name=" + job.name}) - if err != nil { - return errors.Wrap(err, "Failed to create watch object") - } - - // Before getting into the loop of watching events, confirm that the job is actually present - // in Kubernetes. - k8sjob, err := jobsClient.Get(ctx, job.name, metav1.GetOptions{}) - if err != nil { - return errors.Wrapf(err, "Failed to get job %s", job.name) - } - - if k8sjob == nil { - return errors.Wrapf(err, "Couldn't find job %s", job.name) - } - - events := watch.ResultChan() - for { - select { - case event := <-events: - if event.Object == nil { - return errors.Errorf("Result channel closed for Job %s", job.name) - } - k8sJob, ok := event.Object.(*batch.Job) - if !ok { - return errors.Errorf("Invalid Job event object: %T", event.Object) - } - conditions := k8sJob.Status.Conditions - for _, condition := range conditions { - if condition.Type == batch.JobComplete { - log.Print("Job reported complete\n", field.M{"JobName": job.name}) - return nil - } else if condition.Type == batch.JobFailed { - return errors.Errorf("Job %s failed", job.name) - } - } - case <-ctx.Done(): - return errors.New("Cancellation received") - } - } -} - -// Delete deletes the Job in Kubernetes. -func (job *Job) Delete() error { - batchClient := job.clientset.BatchV1() - jobsClient := batchClient.Jobs(job.namespace) - deletePropagation := metav1.DeletePropagationForeground - err := jobsClient.Delete(context.TODO(), job.name, metav1.DeleteOptions{PropagationPolicy: &deletePropagation}) - if err != nil { - return errors.Wrapf(err, "Failed to delete job %s", job.name) - } - log.Print("Deleted job", field.M{"JobName": job.name}) - - return nil -} diff --git a/pkg/kube/job_test.go b/pkg/kube/job_test.go deleted file mode 100644 index a9cf5dbbef..0000000000 --- a/pkg/kube/job_test.go +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright 2019 The Kanister Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !unit -// +build !unit - -package kube - -import ( - "context" - "fmt" - "time" - - "github.com/jpillora/backoff" - . "gopkg.in/check.v1" - batch "k8s.io/api/batch/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" - k8stesting "k8s.io/client-go/testing" -) - -type JobSuite struct{} - -var _ = Suite(&JobSuite{}) - -// Name of test job for this suite. -// Initially it contains incorrect name of job for catching errors. -var testJobName = "" - -const testJobNamespace = "default" -const testJobImage = "busybox" -const testJobServiceAccount = "default" - -func (s *JobSuite) SetUpSuite(c *C) { - // c.Skip("Too slow") -} - -func (s *JobSuite) SetUpTest(c *C) { - testJobName = "kanister-test-job" + rand.String(5) -} - -// Verifies that the Job object is not created if the job name is not specified. -func (s *JobSuite) TestJobsNoName(c *C) { - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, "", nil, "sleep", "10") - c.Assert(job, IsNil) - c.Assert(err, NotNil) -} - -// Verifies that the Job object is not created if the image is not specified. -func (s *JobSuite) TestJobsNoImage(c *C) { - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, "", nil, "sleep", "10") - c.Assert(job, IsNil) - c.Assert(err, NotNil) -} - -// Verifies that the Job object is not created if the namespace is not specified. -func (s *JobSuite) TestJobsNoNamespace(c *C) { - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, "", testJobServiceAccount, testJobImage, nil, "sleep", "10") - c.Assert(job.namespace, Equals, "default") - c.Assert(err, IsNil) -} - -// Verifies that the Job object is not created if the clientset is nil. -func (s *JobSuite) TestJobsNoClientset(c *C) { - job, err := NewJob(nil, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, nil, "sleep", "10") - c.Assert(job, IsNil) - c.Assert(err, NotNil) -} - -// Verifies that the Job object is not created if the command is not passed. -func (s *JobSuite) TestJobsNoCommand(c *C) { - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, nil, "") - c.Assert(job, IsNil) - c.Assert(err, NotNil) - - job, err = NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, nil) - c.Assert(job, IsNil) - c.Assert(err, NotNil) -} - -func getK8sJobCount(clientset kubernetes.Interface, namespace string, c *C) int { - jobsCli := clientset.BatchV1().Jobs(namespace) - list, err := jobsCli.List(context.TODO(), metav1.ListOptions{LabelSelector: "job-name=" + testJobName}) - c.Assert(err, IsNil) - - return len(list.Items) -} - -func waitForJobCount(clientset kubernetes.Interface, namespace string, expectedCount int, c *C) error { - // At times, even if the job is deleted, the API server takes sometime - // to consolidate it's state. - maxRetries := 10 - boff := &backoff.Backoff{Factor: 2, Jitter: false, Min: 100 * time.Millisecond, Max: 1 * time.Minute} - var newJobCount int - for { - newJobCount = getK8sJobCount(clientset, namespace, c) - if newJobCount != expectedCount { - if int(boff.Attempt()) >= maxRetries { - return fmt.Errorf("Job count %d, expected job count %d", newJobCount, expectedCount) - } - duration := boff.Duration() - fmt.Printf("Trying again in %s\n", duration) - time.Sleep(duration) - continue - } else { - return nil - } - } -} - -// Verifies that the basic Job creation and deletion completes successfully. -func (s *JobSuite) TestJobsBasic(c *C) { - namespace := "default" - clientset, err := NewClient() - c.Assert(err, IsNil) - - origJobCount := getK8sJobCount(clientset, namespace, c) - - images := [2]string{"ubuntu:latest", "perl"} - for _, image := range images { - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, image, nil, "sleep", "2") - - c.Assert(job, NotNil) - c.Assert(err, IsNil) - - err = job.Create() - c.Assert(err, IsNil) - - ctx := context.Background() - err = job.WaitForCompletion(ctx) - c.Assert(err, IsNil) - - err = job.Delete() - c.Assert(err, IsNil) - - err = waitForJobCount(clientset, namespace, origJobCount, c) - if err != nil { - c.Fail() - } - } -} - -// Verifies that deleting the Job while it is running works. -func (s *JobSuite) TestJobsDeleteWhileRunning(c *C) { - namespace := "default" - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, nil, "sleep", "300") - - c.Assert(job, NotNil) - c.Assert(err, IsNil) - - origJobCount := getK8sJobCount(clientset, namespace, c) - // Start the job that will run for 5 minutes - _ = job.Create() - time.Sleep(100 * time.Millisecond) - // Deleting the job should work. - _ = job.Delete() - - _ = waitForJobCount(clientset, namespace, origJobCount, c) - c.Assert(c, NotNil) -} - -func cancelLater(cancel func()) { - time.Sleep(10 * time.Second) - cancel() -} - -// Verifies that cancelling the context results in WaitForCompletion finishing. -func (s *JobSuite) TestJobsWaitAfterDelete(c *C) { - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, nil, "sleep", "300") - - c.Assert(job, NotNil) - c.Assert(err, IsNil) - - // Start the job and then delete it immediately. - _ = job.Create() - _ = job.Delete() - - lo := metav1.ListOptions{LabelSelector: "job-name=" + testJobName} - jl, err := clientset.BatchV1().Jobs(testJobNamespace).List(context.TODO(), lo) - c.Assert(err, IsNil) - for _, j := range jl.Items { - c.Assert(j.GetDeletionTimestamp(), NotNil) - } - - ctx, cancel := context.WithCancel(context.Background()) - go cancelLater(cancel) - // WaitForCompletion should complete when the context is cancelled. - _ = job.WaitForCompletion(ctx) - c.Assert(c, NotNil) -} - -func (s *JobSuite) TestJobsWaitOnNonExistentJob(c *C) { - clientset, err := NewClient() - c.Assert(err, IsNil) - - job, err := NewJob(clientset, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, nil, "sleep", "300") - - c.Assert(job, NotNil) - c.Assert(err, IsNil) - - // Call WaitForCompletion on non-existent kubernetes job. - err = job.WaitForCompletion(context.Background()) - c.Assert(err, NotNil) -} - -func (s *JobSuite) TestJobsVolumes(c *C) { - cli := fake.NewSimpleClientset() - vols := map[string]VolumeMountOptions{"pvc-test": {MountPath: "/mnt/data1", ReadOnly: false}} - job, err := NewJob(cli, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, vols, "sleep", "300") - c.Assert(err, IsNil) - c.Assert(job.Create(), IsNil) - - a := cli.Actions() - c.Assert(a, HasLen, 1) - createAction := a[0] - createdJob, ok := createAction.(k8stesting.CreateAction).GetObject().(*batch.Job) - c.Assert(ok, Equals, true) - - c.Assert(createdJob.Name, Equals, testJobName) - podSpec := createdJob.Spec.Template.Spec - c.Assert(podSpec.Volumes, HasLen, 1) - c.Assert(podSpec.Volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName, Equals, "pvc-test") - c.Assert(podSpec.Containers[0].VolumeMounts[0].MountPath, Equals, "/mnt/data1") -} - -func (s *JobSuite) TestJobsReadOnlyVolumes(c *C) { - cli := fake.NewSimpleClientset() - vols := map[string]VolumeMountOptions{"pvc-test": {MountPath: "/mnt/data1", ReadOnly: true}} - job, err := NewJob(cli, testJobName, testJobNamespace, testJobServiceAccount, testJobImage, vols, "sleep", "300") - c.Assert(err, IsNil) - c.Assert(job.Create(), IsNil) - - a := cli.Actions() - c.Assert(a, HasLen, 1) - createAction := a[0] - createdJob, ok := createAction.(k8stesting.CreateAction).GetObject().(*batch.Job) - c.Assert(ok, Equals, true) - - c.Assert(createdJob.Name, Equals, testJobName) - podSpec := createdJob.Spec.Template.Spec - c.Assert(podSpec.Volumes, HasLen, 1) - c.Assert(podSpec.Volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName, Equals, "pvc-test") - c.Assert(podSpec.Containers[0].VolumeMounts[0].MountPath, Equals, "/mnt/data1") -} diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index c7164ab943..38086e3cef 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/gofrs/uuid" json "github.com/json-iterator/go" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -155,6 +156,61 @@ func GetPodObjectFromPodOptions(ctx context.Context, cli kubernetes.Interface, o return createPodSpec(opts, patchedSpecs, ns), nil } +func createFilesystemModeVolumeSpecs( + ctx context.Context, + vols map[string]VolumeMountOptions, +) (volumeMounts []corev1.VolumeMount, podVolumes []corev1.Volume, error error) { + // Build filesystem mode volume specs + for pvcName, mountOpts := range vols { + id, err := uuid.NewV1() + if err != nil { + return nil, nil, err + } + + if mountOpts.ReadOnly { + log.Debug().WithContext(ctx).Print("PVC will be mounted in read-only mode", field.M{"pvcName": pvcName}) + } + + podVolName := fmt.Sprintf("vol-%s", id.String()) + volumeMounts = append(volumeMounts, corev1.VolumeMount{Name: podVolName, MountPath: mountOpts.MountPath, ReadOnly: mountOpts.ReadOnly}) + podVolumes = append(podVolumes, + corev1.Volume{ + Name: podVolName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + ReadOnly: mountOpts.ReadOnly, + }, + }, + }, + ) + } + return volumeMounts, podVolumes, nil +} + +func createBlockModeVolumeSpecs(blockVols map[string]string) (volumeDevices []corev1.VolumeDevice, podVolumes []corev1.Volume, error error) { + // Build block mode volume specs + for pvc, devicePath := range blockVols { + id, err := uuid.NewV1() + if err != nil { + return nil, nil, err + } + podBlockVolName := fmt.Sprintf("block-%s", id.String()) + volumeDevices = append(volumeDevices, corev1.VolumeDevice{Name: podBlockVolName, DevicePath: devicePath}) + podVolumes = append(podVolumes, + corev1.Volume{ + Name: podBlockVolName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc, + }, + }, + }, + ) + } + return volumeDevices, podVolumes, nil +} + func createPodSpec(opts *PodOptions, patchedSpecs corev1.PodSpec, ns string) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{