Skip to content

Commit

Permalink
Feature/better handling (#4)
Browse files Browse the repository at this point in the history
* better handling

* better handling

* cleanup

* simpler solution
  • Loading branch information
davidkarlsen committed Sep 9, 2023
1 parent f0b00a9 commit cdda9d5
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 78 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
k8s.io/api v0.27.3
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.3
k8s.io/utils v0.0.0-20230209194617-a36077c30491
sigs.k8s.io/controller-runtime v0.15.2
)

Expand Down Expand Up @@ -38,7 +39,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand All @@ -64,7 +65,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/xlab/treeprint v1.1.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
Expand All @@ -87,7 +88,6 @@ require (
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/kubectl v0.27.3 // indirect
k8s.io/utils v0.0.0-20230209194617-a36077c30491 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/api v0.13.2 // indirect
sigs.k8s.io/kustomize/kyaml v0.14.1 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
Expand Down Expand Up @@ -182,8 +182,9 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 h1:+FNtrFTmVw0YZGpBGX56XDee331t6JAXeK2bcyhLOOc=
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5/go.mod h1:nmDLcffg48OtT/PSW0Hg7FvpRQsQh5OSqIylirxKC7o=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
Expand Down
31 changes: 31 additions & 0 deletions internal/controller/jobutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package controller

import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
eq "k8s.io/apimachinery/pkg/api/equality"
)

func jobsAreEqual(first *batchv1.Job, second *batchv1.Job) bool {
return first != nil && second != nil && eq.Semantic.DeepEqual(first, second)
}

// from https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/controller/job/utils.go
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func isJobFinished(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue {
return true
}
}
return false
}

func hasFailed(job *batchv1.Job) bool {
return job.Status.Failed > 0
}

func hasSucceeded(job *batchv1.Job) bool {
return job.Status.Succeeded > 0
}
134 changes: 62 additions & 72 deletions internal/controller/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@ import (
"context"
"fmt"
"github.com/caitlinelfring/go-env-default"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"time"

flywayv1alpha1 "github.com/davidkarlsen/flyway-operator/api/v1alpha1"
"github.com/redhat-cop/operator-utils/pkg/util"
"github.com/redhat-cop/operator-utils/pkg/util/crud"
batchv1 "k8s.io/api/batch/v1"
v12 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"
)

const (
sqlVolumeName = "sql"
defaultFlywayImage = "ghcr.io/davidkarlsen/flyway-db2:9.22"
env_name_flyway_image = "FLYWAY_IMAGE"
sqlVolumeName = "sql"
defaultFlywayImage = "ghcr.io/davidkarlsen/flyway-db2:9.22"
envNameFlywayImage = "FLYWAY_IMAGE"
)

// MigrationReconciler reconciles a Migration object
Expand Down Expand Up @@ -71,7 +71,7 @@ func (r *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

if util.IsBeingDeleted(migration) {
logger.Info("Migration deleted, returning")
r.GetRecorder().Event(migration, v12.EventTypeWarning, "Deleting", fmt.Sprintf("Migration deleted: %s", req.NamespacedName))
r.GetRecorder().Event(migration, corev1.EventTypeWarning, "Deleting", fmt.Sprintf("Migration deleted: %s", req.NamespacedName))
return r.ManageSuccess(ctx, migration)
}

Expand All @@ -80,41 +80,35 @@ func (r *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return r.ManageError(ctx, migration, err)
}

newJob := r.createJobSpec(ctx, migration)

if existingJob == nil { // no existing job - so submit one now
err := r.submitMigrationJob(ctx, migration)
if err != nil {
return r.ManageError(ctx, migration, err)
return r.submitMigrationJob(ctx, migration, newJob)
} else {
if !isJobFinished(existingJob) {
logger.Info("Job still running, returning for reconcile", "job", existingJob)
return r.ManageSuccessWithRequeue(ctx, migration, 3*time.Second)
}
return r.ManageSuccess(ctx, migration) //TODO: Should requeue
}

if !r.isJobFinished(existingJob) {
logger.Info("Job still running, returning for reconcile", "job", existingJob)
return r.ManageSuccessWithRequeue(ctx, migration, 3*time.Second)
}
jobsAreEqual := jobsAreEqual(existingJob, newJob)

if existingJob.Status.Failed > 0 {
err := r.deleteExistingJob(ctx, existingJob)
if err != nil {
return r.ManageError(ctx, migration, err)
if hasFailed(existingJob) || !jobsAreEqual {
return r.submitMigrationJob(ctx, migration, newJob)
}
//TODO: should save onto status field of migration
return r.ManageError(ctx, migration, fmt.Errorf("existing job failed - reattempting reconcilation"))
}

if existingJob.Status.Succeeded > 0 {
logger.Info("Job succeeded - removing") //TODO - should match hash
r.GetRecorder().Event(migration, v12.EventTypeNormal, "Succeeded", fmt.Sprintf("Migration Succeeded: %s", req.NamespacedName))

err := r.deleteExistingJob(ctx, existingJob)
if err != nil {
return r.ManageError(ctx, migration, err)
if hasSucceeded(existingJob) {
if jobsAreEqual {
logger.Info("Migration succeeded")
r.GetRecorder().Event(migration, corev1.EventTypeNormal, "Succeeded", fmt.Sprintf("Migration Succeeded: %s", req.NamespacedName))
return r.ManageSuccess(ctx, migration)
} else { // migration has changed - submit new job
return r.submitMigrationJob(ctx, migration, newJob)
}
}
return r.ManageSuccess(ctx, migration)
}

logger.Info("BUG - should not happen")
return r.ManageSuccess(ctx, migration)
err = fmt.Errorf("this is a bug and not not happen")
return r.ManageError(ctx, migration, err)
}

func (r *MigrationReconciler) getExistingJob(ctx context.Context, migration *flywayv1alpha1.Migration) (*batchv1.Job, error) {
Expand All @@ -128,12 +122,18 @@ func (r *MigrationReconciler) getExistingJob(ctx context.Context, migration *fly
return existingJob, err
}

func (r *MigrationReconciler) submitMigrationJob(ctx context.Context, migration *flywayv1alpha1.Migration) error {
job := r.createJobSpec(ctx, migration)
logger := log.FromContext(ctx)
logger.V(1).Info("Generated job", "job", fmt.Sprintf("%+v", job))
func (r *MigrationReconciler) submitMigrationJob(ctx context.Context, migration *flywayv1alpha1.Migration, job *batchv1.Job) (reconcile.Result, error) {
err := crud.DeleteResourceIfExists(ctx, job)
if err != nil {
return r.ManageError(ctx, migration, err)
}

err = crud.CreateResourceIfNotExists(ctx, migration, migration.Namespace, job)
if err != nil {
return r.ManageError(ctx, migration, err)
}

return crud.CreateResourceIfNotExists(ctx, migration, migration.Namespace, &job)
return r.ManageSuccess(ctx, migration)
}

func (r *MigrationReconciler) deleteExistingJob(ctx context.Context, existingJob *batchv1.Job) error {
Expand All @@ -142,64 +142,52 @@ func (r *MigrationReconciler) deleteExistingJob(ctx context.Context, existingJob
return r.Client.Delete(ctx, existingJob, &client.DeleteOptions{PropagationPolicy: &opt})
}

// from https://github.com/kubernetes/kubernetes/blob/v1.28.1/pkg/controller/job/utils.go
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func (r *MigrationReconciler) isJobFinished(j *batchv1.Job) bool {
for _, c := range j.Status.Conditions {
if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v12.ConditionTrue {
return true
}
}
return false
}

func (r *MigrationReconciler) createJobSpec(ctx context.Context, migration *flywayv1alpha1.Migration) batchv1.Job {
func (r *MigrationReconciler) createJobSpec(ctx context.Context, migration *flywayv1alpha1.Migration) *batchv1.Job {
const targetPath = "/mnt/target/"

return batchv1.Job{
job := &batchv1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: batchv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: migration.Name,
Namespace: migration.Namespace,
Name: migration.Name,
Namespace: migration.Namespace,
Annotations: make(map[string]string),
},
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: pointer.Int32(60 * 60),
BackoffLimit: pointer.Int32(2),
Template: v12.PodTemplateSpec{
Spec: v12.PodSpec{
InitContainers: []v12.Container{
BackoffLimit: pointer.Int32(2),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{
{
Name: "copy-sql",
Image: migration.Spec.Migration.ImageRef,
ImagePullPolicy: v12.PullAlways,
ImagePullPolicy: corev1.PullAlways,
Command: []string{"sh", "-c"},
Args: []string{fmt.Sprintf("cd %s && cp -rp * %s", migration.Spec.Migration.SqlPath, targetPath)},
VolumeMounts: []v12.VolumeMount{
VolumeMounts: []corev1.VolumeMount{
{
Name: sqlVolumeName,
MountPath: targetPath,
},
},
},
},
Containers: []v12.Container{
Containers: []corev1.Container{
{
Name: "flyway",
Image: env.GetDefault(env_name_flyway_image, defaultFlywayImage),
ImagePullPolicy: v12.PullAlways,
Image: env.GetDefault(envNameFlywayImage, defaultFlywayImage),
ImagePullPolicy: corev1.PullAlways,
Args: []string{"info", "migrate", "info"},
Env: []v12.EnvVar{
Env: []corev1.EnvVar{
{
Name: "FLYWAY_USER",
Value: migration.Spec.Database.Username,
},
{
Name: "FLYWAY_PASSWORD",
ValueFrom: &v12.EnvVarSource{
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &(migration.Spec.Database).Credentials,
},
},
Expand All @@ -208,27 +196,29 @@ func (r *MigrationReconciler) createJobSpec(ctx context.Context, migration *flyw
Value: migration.Spec.Database.JdbcUrl,
},
},
VolumeMounts: []v12.VolumeMount{
VolumeMounts: []corev1.VolumeMount{
{
Name: sqlVolumeName,
MountPath: "/flyway/sql",
},
},
},
},
Volumes: []v12.Volume{
Volumes: []corev1.Volume{
{
Name: sqlVolumeName,
VolumeSource: v12.VolumeSource{
EmptyDir: &v12.EmptyDirVolumeSource{},
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
RestartPolicy: v12.RestartPolicyNever,
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}

return job
}

// SetupWithManager sets up the controller with the Manager.
Expand Down

0 comments on commit cdda9d5

Please sign in to comment.