From 4dbfc20e2761947146f3c0e14b8b4a45df8d4943 Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Fri, 8 Feb 2019 23:03:51 -0800 Subject: [PATCH] Add StartTime and CompletionTime in job status --- .gitignore | 1 + pkg/apis/kubeflow/v1alpha1/types.go | 10 ++ .../v1alpha1/zz_generated.deepcopy.go | 10 +- pkg/controllers/mpi_job_controller.go | 7 + pkg/controllers/mpi_job_controller_test.go | 163 +++++++++++------- 5 files changed, 127 insertions(+), 64 deletions(-) diff --git a/.gitignore b/.gitignore index 4c36e385..4ec852f0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/ +.vscode/ vendor/ diff --git a/pkg/apis/kubeflow/v1alpha1/types.go b/pkg/apis/kubeflow/v1alpha1/types.go index b3f9ad7d..45c7e5eb 100644 --- a/pkg/apis/kubeflow/v1alpha1/types.go +++ b/pkg/apis/kubeflow/v1alpha1/types.go @@ -117,4 +117,14 @@ type MPIJobStatus struct { // The number of available worker replicas. // +optional WorkerReplicas int32 `json:"workerReplicas,omitempty"` + + // Represents time when the job was acknowledged by the job controller. + // It is not guaranteed to be set in happens-before order across separate operations. + // It is represented in RFC3339 form and is in UTC. + StartTime *metav1.Time `json:"startTime,omitempty"` + + // Represents time when the job was completed. It is not guaranteed to + // be set in happens-before order across separate operations. + // It is represented in RFC3339 form and is in UTC. + CompletionTime *metav1.Time `json:"completionTime,omitempty"` } diff --git a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go index 7d90c550..fc91d4ff 100644 --- a/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v1alpha1/zz_generated.deepcopy.go @@ -28,7 +28,7 @@ func (in *MPIJob) DeepCopyInto(out *MPIJob) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) return } @@ -143,6 +143,14 @@ func (in *MPIJobSpec) DeepCopy() *MPIJobSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MPIJobStatus) DeepCopyInto(out *MPIJobStatus) { *out = *in + if in.StartTime != nil { + in, out := &in.StartTime, &out.StartTime + *out = (*in).DeepCopy() + } + if in.CompletionTime != nil { + in, out := &in.CompletionTime, &out.CompletionTime + *out = (*in).DeepCopy() + } return } diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/mpi_job_controller.go index a393fc9c..e70297a6 100644 --- a/pkg/controllers/mpi_job_controller.go +++ b/pkg/controllers/mpi_job_controller.go @@ -769,10 +769,17 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher // Or create a copy manually for better performance mpiJobCopy := mpiJob.DeepCopy() if launcher != nil { + now := metav1.Now() if launcher.Status.Active > 0 { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherActive + if mpiJobCopy.Status.StartTime == nil { + mpiJobCopy.Status.StartTime = &now + } } else if launcher.Status.Succeeded > 0 { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded + if mpiJobCopy.Status.CompletionTime == nil { + mpiJobCopy.Status.CompletionTime = &now + } } else if launcher.Status.Failed > 0 { mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherFailed } diff --git a/pkg/controllers/mpi_job_controller_test.go b/pkg/controllers/mpi_job_controller_test.go index a4c7c0f1..af9c07f7 100644 --- a/pkg/controllers/mpi_job_controller_test.go +++ b/pkg/controllers/mpi_job_controller_test.go @@ -78,15 +78,14 @@ func newFixture(t *testing.T) *fixture { return f } -func newMPIJob(name string, gpus *int32) *kubeflow.MPIJob { - return &kubeflow.MPIJob{ +func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { + mpiJob := &kubeflow.MPIJob{ TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: metav1.NamespaceDefault, }, Spec: kubeflow.MPIJobSpec{ - GPUs: gpus, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ @@ -98,58 +97,48 @@ func newMPIJob(name string, gpus *int32) *kubeflow.MPIJob { }, }, }, + Status: kubeflow.MPIJobStatus{}, } -} -func newMPIJobWithCPUs(name string, cpus *int32) *kubeflow.MPIJob { - return &kubeflow.MPIJob{ - TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceDefault, - }, - Spec: kubeflow.MPIJobSpec{ - ProcessingUnits: cpus, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "foo", - Image: "bar", - }, - }, - }, - }, - }, + if startTime != nil { + mpiJob.Status.StartTime = startTime + } + + if completionTime != nil { + mpiJob.Status.CompletionTime = completionTime } + + return mpiJob } -func newMPIJobWithCustomResources(name string, replicas *int32, pusPerReplica int64, processingResourceType string) *kubeflow.MPIJob { - return &kubeflow.MPIJob{ - TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: metav1.NamespaceDefault, - }, - Spec: kubeflow.MPIJobSpec{ - Replicas: replicas, - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "foo", - Image: "bar", - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - convertProcessingResourceType(processingResourceType): *resource.NewQuantity(pusPerReplica, resource.DecimalExponent), - }, - }, - }, - }, - }, +func newMPIJob(name string, gpus *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { + mpiJob := newMPIJobCommon(name, startTime, completionTime) + mpiJob.Spec.GPUs = gpus + + return mpiJob +} + +func newMPIJobWithCPUs(name string, cpus *int32, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { + mpiJob := newMPIJobCommon(name, startTime, completionTime) + mpiJob.Spec.ProcessingUnits = cpus + + return mpiJob +} + +func newMPIJobWithCustomResources(name string, replicas *int32, pusPerReplica int64, processingResourceType string, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { + mpiJob := newMPIJobCommon(name, startTime, completionTime) + mpiJob.Spec.Replicas = replicas + + for i := range mpiJob.Spec.Template.Spec.Containers { + container := &mpiJob.Spec.Template.Spec.Containers[i] + container.Resources = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + convertProcessingResourceType(processingResourceType): *resource.NewQuantity(pusPerReplica, resource.DecimalExponent), }, - }, + } } + + return mpiJob } func (f *fixture) newController(processingResourceType string) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { @@ -455,6 +444,16 @@ func (f *fixture) setUpRbac(mpiJob *kubeflow.MPIJob, workerReplicas int) { f.setUpRoleBinding(roleBinding) } +func setUpMPIJobTimestamp(mpiJob *kubeflow.MPIJob, startTime, completionTime *metav1.Time) { + if startTime != nil { + mpiJob.Status.StartTime = startTime + } + + if completionTime != nil { + mpiJob.Status.CompletionTime = completionTime + } +} + func getKey(mpiJob *kubeflow.MPIJob, t *testing.T) string { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(mpiJob) if err != nil { @@ -471,14 +470,18 @@ func TestDoNothingWithInvalidKey(t *testing.T) { func TestDoNothingWithNonexistentMPIJob(t *testing.T) { f := newFixture(t) - mpiJob := newMPIJob("test", int32Ptr(64)) + startTime := metav1.Now() + completionTime := metav1.Now() + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.run(getKey(mpiJob, t), gpuResourceName) } func TestLauncherNotControlledByUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) launcher := newLauncher(mpiJob, "kubectl-delivery") @@ -491,7 +494,9 @@ func TestLauncherNotControlledByUs(t *testing.T) { func TestLauncherSucceeded(t *testing.T) { f := newFixture(t) - mpiJob := newMPIJob("test", int32Ptr(64)) + startTime := metav1.Now() + completionTime := metav1.Now() + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) launcher := newLauncher(mpiJob, "kubectl-delivery") @@ -500,6 +505,7 @@ func TestLauncherSucceeded(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), gpuResourceName) @@ -507,8 +513,9 @@ func TestLauncherSucceeded(t *testing.T) { func TestLauncherFailed(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, nil) f.setUpMPIJob(mpiJob) launcher := newLauncher(mpiJob, "kubectl-delivery") @@ -517,6 +524,7 @@ func TestLauncherFailed(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherFailed + setUpMPIJobTimestamp(mpiJobCopy, &startTime, nil) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), gpuResourceName) @@ -524,8 +532,10 @@ func TestLauncherFailed(t *testing.T) { func TestLauncherDoesNotExist(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) expConfigMap := newConfigMap(mpiJob, 8, 8) @@ -545,6 +555,7 @@ func TestLauncherDoesNotExist(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.WorkerReplicas = 0 + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), gpuResourceName) @@ -554,8 +565,10 @@ func TestLauncherDoesNotExistWithCustomResources(t *testing.T) { resourceNames := []string{cpuResourceName, gpuResourceName} for _, resourceName := range resourceNames { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJobWithCustomResources("test", int32Ptr(4), 4, resourceName) + mpiJob := newMPIJobWithCustomResources("test", int32Ptr(4), 4, resourceName, &startTime, &completionTime) f.setUpMPIJob(mpiJob) expConfigMap := newConfigMap(mpiJob, 4, 4) @@ -575,6 +588,7 @@ func TestLauncherDoesNotExistWithCustomResources(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.WorkerReplicas = 0 + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), resourceName) @@ -583,8 +597,10 @@ func TestLauncherDoesNotExistWithCustomResources(t *testing.T) { func TestConfigMapNotControlledByUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) configMap := newConfigMap(mpiJob, 8, 8) @@ -596,8 +612,10 @@ func TestConfigMapNotControlledByUs(t *testing.T) { func TestServiceAccountNotControlledByUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 8, 8)) @@ -611,8 +629,10 @@ func TestServiceAccountNotControlledByUs(t *testing.T) { func TestRoleNotControlledByUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 8, 8)) @@ -627,8 +647,10 @@ func TestRoleNotControlledByUs(t *testing.T) { func TestRoleBindingNotControlledByUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 8, 8)) @@ -644,8 +666,10 @@ func TestRoleBindingNotControlledByUs(t *testing.T) { func TestShutdownWorker(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) launcher := newLauncher(mpiJob, "kubectl-delivery") @@ -661,6 +685,7 @@ func TestShutdownWorker(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.WorkerReplicas = 0 mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherSucceeded + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), gpuResourceName) @@ -668,8 +693,10 @@ func TestShutdownWorker(t *testing.T) { func TestWorkerNotControlledByUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(64)) + mpiJob := newMPIJob("test", int32Ptr(64), &startTime, &completionTime) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 8, 8)) @@ -684,8 +711,11 @@ func TestWorkerNotControlledByUs(t *testing.T) { func TestLauncherActive(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(8), &startTime, &completionTime) - mpiJob := newMPIJob("test", int32Ptr(8)) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 1, 8)) @@ -700,6 +730,7 @@ func TestLauncherActive(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.LauncherStatus = kubeflow.LauncherActive + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), gpuResourceName) @@ -707,8 +738,10 @@ func TestLauncherActive(t *testing.T) { func TestWorkerReady(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJob("test", int32Ptr(16)) + mpiJob := newMPIJob("test", int32Ptr(16), &startTime, &completionTime) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 2, 8)) @@ -723,6 +756,7 @@ func TestWorkerReady(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.WorkerReplicas = 2 + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), gpuResourceName) @@ -730,8 +764,10 @@ func TestWorkerReady(t *testing.T) { func TestWorkerReadyWithCPUs(t *testing.T) { f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() - mpiJob := newMPIJobWithCPUs("test", int32Ptr(16)) + mpiJob := newMPIJobWithCPUs("test", int32Ptr(16), &startTime, &completionTime) f.setUpMPIJob(mpiJob) f.setUpConfigMap(newConfigMap(mpiJob, 2, 8)) @@ -746,6 +782,7 @@ func TestWorkerReadyWithCPUs(t *testing.T) { mpiJobCopy := mpiJob.DeepCopy() mpiJobCopy.Status.WorkerReplicas = 2 + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t), cpuResourceName)