Skip to content

Commit

Permalink
Capitalize on cache mode=trynone if importer is being OOMKilled
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
  • Loading branch information
akalenyu committed Jul 11, 2024
1 parent 2afccd1 commit 70e2e0b
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 25 deletions.
9 changes: 7 additions & 2 deletions pkg/controller/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,14 @@ const (
// AnnVddkInitImageURL saves a per-DV VDDK image URL on the PVC
AnnVddkInitImageURL = AnnAPIGroup + "/storage.pod.vddk.initimageurl"

// AnnRequiresScratch provides a const for our PVC requires scratch annotation
// AnnRequiresScratch provides a const for our PVC requiring scratch annotation
AnnRequiresScratch = AnnAPIGroup + "/storage.import.requiresScratch"

// AnnRequiresDirectIO provides a const for our PVC requiring direct io annotation (due to OOMs we need to try qemu cache=none)
AnnRequiresDirectIO = AnnAPIGroup + "/storage.import.requiresDirectIo"
// OOMKilledReason provides a value that container runtimes must return in the reason field for an OOMKilled container
OOMKilledReason = "OOMKilled"

// AnnContentType provides a const for the PVC content-type
AnnContentType = AnnAPIGroup + "/storage.contentType"

Expand Down Expand Up @@ -827,7 +832,7 @@ func GetPriorityClass(pvc *corev1.PersistentVolumeClaim) string {

// ShouldDeletePod returns whether the PVC workload pod should be deleted
func ShouldDeletePod(pvc *corev1.PersistentVolumeClaim) bool {
return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.DeletionTimestamp != nil
return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.GetAnnotations()[AnnRequiresDirectIO] == "true" || pvc.DeletionTimestamp != nil
}

// AddFinalizer adds a finalizer to a resource
Expand Down
53 changes: 45 additions & 8 deletions pkg/controller/import-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type importPodEnvVar struct {
certConfigMapProxy string
extraHeaders []string
secretExtraHeaders []string
cacheMode string
}

type importerPodArgs struct {
Expand Down Expand Up @@ -378,25 +379,37 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
if scratchSpaceRequired {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
}
podModificationsNeeded := scratchSpaceRequired

if pod.Status.ContainerStatuses != nil &&
pod.Status.ContainerStatuses[0].State.Terminated != nil &&
pod.Status.ContainerStatuses[0].State.Terminated.ExitCode > 0 {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].State.Terminated.ExitCode)
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].State.Terminated.Message)
if statuses := pod.Status.ContainerStatuses; len(statuses) > 0 {
if isOOMKilled(statuses[0]) {
log.V(1).Info("Pod died of an OOM, deleting pod, and restarting with qemu cache mode=none if storage supports it", "pod.Name", pod.Name)
podModificationsNeeded = true
anno[cc.AnnRequiresDirectIO] = "true"
}
if terminated := statuses[0].State.Terminated; terminated != nil && terminated.ExitCode > 0 {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", terminated.ExitCode)
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, terminated.Message)
}
}

if anno[cc.AnnCurrentCheckpoint] != "" {
anno[cc.AnnCurrentPodID] = string(pod.ObjectMeta.UID)
}

anno[cc.AnnImportPod] = pod.Name
if !scratchSpaceRequired {
if !podModificationsNeeded {
// No scratch space required, update the phase based on the pod. If we require scratch space we don't want to update the
// phase, because the pod might terminate cleanly and mistakenly mark the import complete.
anno[cc.AnnPodPhase] = string(pod.Status.Phase)
}

for _, ev := range pod.Spec.Containers[0].Env {
if ev.Name == common.CacheMode && ev.Value == common.CacheModeTryNone {
anno[cc.AnnRequiresDirectIO] = "false"
}
}

// Check if the POD is waiting for scratch space, if so create some.
if pod.Status.Phase == corev1.PodPending && r.requiresScratchSpace(pvc) {
if err := r.createScratchPvcForPod(pvc, pod); err != nil {
Expand Down Expand Up @@ -428,8 +441,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts])
}

if cc.IsPVCComplete(pvc) || scratchSpaceRequired {
if !scratchSpaceRequired {
if cc.IsPVCComplete(pvc) || podModificationsNeeded {
if !podModificationsNeeded {
r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful")
log.V(1).Info("Import completed successfully")
}
Expand Down Expand Up @@ -623,6 +636,11 @@ func (r *ImportReconciler) createImportEnvVar(pvc *corev1.PersistentVolumeClaim)
if err != nil {
return nil, err
}

if v, ok := pvc.Annotations[cc.AnnRequiresDirectIO]; ok && v == "true" {
podEnvVar.cacheMode = common.CacheModeTryNone
}

return podEnvVar, nil
}

Expand Down Expand Up @@ -1237,6 +1255,10 @@ func makeImportEnv(podEnvVar *importPodEnvVar, uid types.UID) []corev1.EnvVar {
Name: common.Preallocation,
Value: strconv.FormatBool(podEnvVar.preallocation),
},
{
Name: common.CacheMode,
Value: podEnvVar.cacheMode,
},
}
if podEnvVar.secretName != "" && podEnvVar.source != cc.SourceGCS {
env = append(env, corev1.EnvVar{
Expand Down Expand Up @@ -1287,3 +1309,18 @@ func makeImportEnv(podEnvVar *importPodEnvVar, uid types.UID) []corev1.EnvVar {
}
return env
}

func isOOMKilled(status v1.ContainerStatus) bool {
if terminated := status.State.Terminated; terminated != nil {
if terminated.Reason == cc.OOMKilledReason {
return true
}
}
if terminated := status.LastTerminationState.Terminated; terminated != nil {
if terminated.Reason == cc.OOMKilledReason {
return true
}
}

return false
}
91 changes: 76 additions & 15 deletions pkg/controller/import-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,10 @@ var _ = Describe("Update PVC from POD", func() {
})

It("Should create scratch PVC, if pod is pending and PVC is marked with scratch", func() {
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
scratchPvc := &corev1.PersistentVolumeClaim{}
scratchPvc.Name = "testPvc1-scratch"
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodPending), cc.AnnRequiresScratch: "true"}, nil, corev1.ClaimBound)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc)
pod.Status = corev1.PodStatus{
Phase: corev1.PodPending,
ContainerStatuses: []v1.ContainerStatus{
Expand All @@ -552,7 +552,6 @@ var _ = Describe("Update PVC from POD", func() {
Expect(err).ToNot(HaveOccurred())
By("Checking scratch PVC has been created")
// Once all controllers are converted, we will use the runtime lib client instead of client-go and retrieval needs to change here.
scratchPvc := &v1.PersistentVolumeClaim{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "testPvc1-scratch", Namespace: "default"}, scratchPvc)
Expect(err).ToNot(HaveOccurred())
Expect(scratchPvc.Spec.Resources).To(Equal(pvc.Spec.Resources))
Expand Down Expand Up @@ -617,9 +616,9 @@ var _ = Describe("Update PVC from POD", func() {

It("Should NOT update phase on PVC, if pod exited with termination message stating scratch space is required", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound)
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName)
scratchPvc := &corev1.PersistentVolumeClaim{}
scratchPvc.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc)
pod.Status = corev1.PodStatus{
Phase: corev1.PodPending,
ContainerStatuses: []corev1.ContainerStatus{
Expand Down Expand Up @@ -708,9 +707,9 @@ var _ = Describe("Update PVC from POD", func() {

It("Should copy VDDK connection information to annotations on PVC", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning), cc.AnnSource: cc.SourceVDDK}, nil, corev1.ClaimBound)
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName)
scratchPvc := &corev1.PersistentVolumeClaim{}
scratchPvc.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc)
pod.Status = corev1.PodStatus{
Phase: corev1.PodSucceeded,
ContainerStatuses: []corev1.ContainerStatus{
Expand Down Expand Up @@ -743,9 +742,10 @@ var _ = Describe("Update PVC from POD", func() {

It("Should delete pod for scratch space even if retainAfterCompletion is set", func() {
annotations := map[string]string{
cc.AnnEndpoint: testEndPoint,
cc.AnnImportPod: "testpod",
cc.AnnRequiresScratch: "true",
cc.AnnEndpoint: testEndPoint,
cc.AnnImportPod: "testpod",
// gets added by controller
// cc.AnnRequiresScratch: "true",
cc.AnnSource: cc.SourceVDDK,
cc.AnnPodRetainAfterCompletion: "true",
}
Expand All @@ -755,8 +755,11 @@ var _ = Describe("Update PVC from POD", func() {
Phase: corev1.PodSucceeded,
ContainerStatuses: []corev1.ContainerStatus{
{
LastTerminationState: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{},
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 0,
Message: `{"scratchSpaceRequired": true}`,
},
},
},
},
Expand All @@ -777,6 +780,60 @@ var _ = Describe("Update PVC from POD", func() {
Expect(err.Error()).To(ContainSubstring("\"importer-testPvc1\" not found"))
})

It("Should delete pod in favor of recreating with cache=trynone in case of OOMKilled", func() {
annotations := map[string]string{
cc.AnnEndpoint: testEndPoint,
cc.AnnSource: cc.SourceRegistry,
cc.AnnRegistryImportMethod: string(cdiv1.RegistryPullNode),
}
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, annotations, nil, corev1.ClaimPending)
reconciler = createImportReconciler(pvc)

_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
// First reconcile decides pods name, second creates it
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())

// Simulate OOMKilled on pod
resPod := &corev1.Pod{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod)
Expect(err).ToNot(HaveOccurred())
resPod.Status = corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 137,
// This is an API
// https://github.com/kubernetes/kubernetes/blob/e38531e9a2359c2ba1505cb04d62d6810edc616e/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L5822-L5823
Reason: cc.OOMKilledReason,
},
},
},
},
}
err = reconciler.client.Status().Update(context.TODO(), resPod)
Expect(err).ToNot(HaveOccurred())
// Reconcile picks OOMKilled and deletes pod
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("\"importer-testPvc1\" not found"))
// Next reconcile recreates pod with cache=trynone
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod)
Expect(err).ToNot(HaveOccurred())
Expect(resPod.Spec.Containers[0].Env).To(ContainElement(
corev1.EnvVar{
Name: common.CacheMode,
Value: common.CacheModeTryNone,
},
))
})
})

var _ = Describe("Create Importer Pod", func() {
Expand Down Expand Up @@ -1182,6 +1239,10 @@ func createImportTestEnv(podEnvVar *importPodEnvVar, uid string) []corev1.EnvVar
Name: common.Preallocation,
Value: strconv.FormatBool(podEnvVar.preallocation),
},
{
Name: common.CacheMode,
Value: podEnvVar.cacheMode,
},
}

if podEnvVar.secretName != "" {
Expand Down

0 comments on commit 70e2e0b

Please sign in to comment.