Skip to content

Commit

Permalink
working version
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkarlsen committed Sep 8, 2023
1 parent 6a6a8ce commit 3a8eabf
Showing 1 changed file with 70 additions and 32 deletions.
102 changes: 70 additions & 32 deletions internal/controller/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ package controller
import (
"context"
"fmt"
"github.com/caitlinelfring/go-env-default"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"time"

flywayv1alpha1 "github.com/davidkarlsen/flyway-operator/api/v1alpha1"
"github.com/redhat-cop/operator-utils/pkg/util"
"github.com/redhat-cop/operator-utils/pkg/util/crud"
batchv1 "k8s.io/api/batch/v1"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"time"

"github.com/caitlinelfring/go-env-default"

flywayv1alpha1 "github.com/davidkarlsen/flyway-operator/api/v1alpha1"
)

const (
Expand Down Expand Up @@ -71,47 +70,86 @@ func (r *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

if util.IsBeingDeleted(migration) {
logger.Info("Deleting")
logger.Info("Migration deleted, returning")
return r.ManageSuccess(ctx, migration)
}

err := r.submitMigrationJob(ctx, migration)

existingJob, err := r.getExistingJob(ctx, migration)
if err != nil {
logger.Error(err, err.Error())
return r.ManageErrorWithRequeue(ctx, migration, err, 10*time.Second)
return r.ManageError(ctx, migration, err)
}
// else

if existingJob == nil { // no existing job - so submit one now
err := r.submitMigrationJob(ctx, migration)
if err != nil {
return r.ManageError(ctx, migration, err)
}
return r.ManageSuccess(ctx, migration) //TODO: Should requeue
}

if !r.isJobFinished(existingJob) {
logger.Info("Job still running, returning for reconcile", "job", existingJob)
return r.ManageSuccessWithRequeue(ctx, migration, 3*time.Second)
}

if existingJob.Status.Failed > 0 {
err := r.deleteExistingJob(ctx, existingJob)
if err != nil {
return r.ManageError(ctx, migration, err)
}
//TODO: should save onto status field of migration
return r.ManageError(ctx, migration, fmt.Errorf("existing job failed - reattempting reconcilation"))
}

if existingJob.Status.Succeeded > 0 {
logger.Info("Job succeeded - removing") //TODO - should match hash

err := r.deleteExistingJob(ctx, existingJob)
if err != nil {
return r.ManageError(ctx, migration, err)
}
return r.ManageSuccess(ctx, migration)
}

logger.Info("BUG - should not happen")
return r.ManageSuccess(ctx, migration)
}

func (r *MigrationReconciler) getExistingJob(ctx context.Context, migration *flywayv1alpha1.Migration) (*batchv1.Job, error) {
// look for any current migration job and check state
existingJob := &batchv1.Job{}
err := r.GetClient().Get(ctx, client.ObjectKeyFromObject(migration), existingJob)
if apierrors.IsNotFound(err) {
return nil, nil
}

return existingJob, err
}

func (r *MigrationReconciler) submitMigrationJob(ctx context.Context, migration *flywayv1alpha1.Migration) error {
job := r.createJobSpec(ctx, migration)
logger := log.FromContext(ctx)
logger.Info("Generated job", "job", fmt.Sprintf("%+v", job))

existingJob := &batchv1.Job{}
err := r.GetClient().Get(ctx, client.ObjectKeyFromObject(&job), existingJob)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
} else {
if existingJob.Status.Active > 0 {
return fmt.Errorf("job already running")
}
if existingJob.Status.Failed > 0 || existingJob.Status.Succeeded > 0 {
logger.Info("Deleting old completed or failed job", "job", existingJob)
//_ = crud.DeleteResourceIfExists(ctx, existingJob)x
opt := metav1.DeletePropagationForeground
err = r.Client.Delete(ctx, existingJob, &client.DeleteOptions{PropagationPolicy: &opt})
if err != nil {
return err
}
return crud.CreateResourceIfNotExists(ctx, migration, migration.Namespace, &job)
}

func (r *MigrationReconciler) deleteExistingJob(ctx context.Context, existingJob *batchv1.Job) error {
//_ = crud.DeleteResourceIfExists(ctx, existingJob)x
opt := metav1.DeletePropagationForeground
return r.Client.Delete(ctx, existingJob, &client.DeleteOptions{PropagationPolicy: &opt})
}

// from https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/controller/job/utils.go
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func (r *MigrationReconciler) isJobFinished(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v12.ConditionTrue {
return true
}
}

return crud.CreateResourceIfNotExists(ctx, migration, migration.Namespace, &job)
return false
}

func (r *MigrationReconciler) createJobSpec(ctx context.Context, migration *flywayv1alpha1.Migration) batchv1.Job {
Expand Down

0 comments on commit 3a8eabf

Please sign in to comment.