Skip to content

Commit

Permalink
Bugfix recreate benchmark job when operator reboot (#2463)
Browse files Browse the repository at this point in the history
* 🐛 Fix recreate benchmark job already runnnig when operator rebooted

Signed-off-by: vankichi <kyukawa315@gmail.com>

* 🐛 Fix benchmarkJobReconcile status handling

Signed-off-by: vankichi <kyukawa315@gmail.com>

* ✅ Fix test

Signed-off-by: vankichi <kyukawa315@gmail.com>

* ♻️ Update k8s dir

Signed-off-by: vankichi <kyukawa315@gmail.com>

* ♻️ Fix

Signed-off-by: vankichi <kyukawa315@gmail.com>

---------

Signed-off-by: vankichi <kyukawa315@gmail.com>
  • Loading branch information
vankichi authored and vdaas-ci committed Mar 22, 2024
1 parent 51439a4 commit 96f8883
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 33 deletions.
1 change: 1 addition & 0 deletions charts/vald-benchmark-operator/crds/valdbenchmarkjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ spec:
- Completed
- Available
- Healthy
default: Available
type: string
spec:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ spec:
- Completed
- Available
- Healthy
default: Available
type: string
spec:
type: object
Expand Down
1 change: 1 addition & 0 deletions k8s/tools/benchmark/operator/crds/valdbenchmarkjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ spec:
- Completed
- Available
- Healthy
default: Available
type: string
spec:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ spec:
- Completed
- Available
- Healthy
default: Available
type: string
spec:
type: object
Expand Down
70 changes: 38 additions & 32 deletions pkg/tools/benchmark/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Jo
jobNames[job.GetName()] = struct{}{}
if _, ok := cjobs[job.Name]; !ok && job.Status.CompletionTime == nil {
cjobs[job.GetName()] = job.Namespace
benchmarkJobStatus[job.GetName()] = v1.BenchmarkJobAvailable
benchmarkJobStatus[job.GetName()] = v1.BenchmarkJobHealthy
continue
}
name = job.GetName()
Expand Down Expand Up @@ -248,22 +248,22 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin
// jobStatus is used for update benchmarkJob CR status if updating is needed.
jobStatus := make(map[string]v1.BenchmarkJobStatus)
for k := range benchJobList {
// update scenario status
job := benchJobList[k]
hasOwner := false
// update scenario status
if len(job.GetOwnerReferences()) > 0 {
hasOwner = true
}
if scenarios := o.getAtomicScenario(); scenarios != nil && hasOwner {
on := job.GetOwnerReferences()[0].Name
if _, ok := scenarios[on]; ok {
if scenarios[on].BenchJobStatus == nil {
scenarios[on].BenchJobStatus = map[string]v1.BenchmarkJobStatus{}
if scenarios := o.getAtomicScenario(); scenarios != nil {
ownerRefs := job.GetOwnerReferences()
ownerName := ownerRefs[0].Name
if _, ok := scenarios[ownerName]; ok {
if scenarios[ownerName].BenchJobStatus == nil {
scenarios[ownerName].BenchJobStatus = map[string]v1.BenchmarkJobStatus{}
}
scenarios[ownerName].BenchJobStatus[job.Name] = job.Status
}
scenarios[on].BenchJobStatus[job.Name] = job.Status
o.scenarios.Store(&scenarios)
}
o.scenarios.Store(&scenarios)
}
// update benchmark job
if oldJob := cbjl[k]; oldJob != nil {
if oldJob.GetGeneration() != job.GetGeneration() {
if job.Status != "" && oldJob.Status != v1.BenchmarkJobCompleted {
Expand All @@ -282,13 +282,15 @@ func (o *operator) benchJobReconcile(ctx context.Context, benchJobList map[strin
} else if oldJob.Status == "" {
jobStatus[oldJob.GetName()] = v1.BenchmarkJobAvailable
}
} else if len(job.Status) == 0 || job.Status == v1.BenchmarkJobNotReady {
log.Info("[reconcile benchmark job resource] create job: ", k)
err := o.createJob(ctx, job)
if err != nil {
log.Errorf("[reconcile benchmark job resource] failed to create job: %s", err.Error())
} else {
if job.Status == "" || job.Status == v1.BenchmarkJobAvailable {
log.Info("[reconcile benchmark job resource] create job: ", k)
err := o.createJob(ctx, job)
if err != nil {
log.Errorf("[reconcile benchmark job resource] failed to create job: %s", err.Error())

Check warning on line 290 in pkg/tools/benchmark/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/operator/service/operator.go#L290

Added line #L290 was not covered by tests
}
jobStatus[job.Name] = v1.BenchmarkJobHealthy
}
jobStatus[job.Name] = v1.BenchmarkJobAvailable
cbjl[k] = &job
}
}
Expand Down Expand Up @@ -325,22 +327,26 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[
for name := range scenarioList {
sc := scenarioList[name]
if oldScenario := cbsl[name]; oldScenario == nil {
// apply new crd which is not set yet.
jobNames, err := o.createBenchmarkJob(ctx, sc)
if err != nil {
log.Errorf("[reconcile benchmark scenario resource] failed to create benchmark job resource: %s", err.Error())
}
// init atomic values for current scenario
cbsl[name] = &scenario{
Crd: &sc,
BenchJobStatus: func() map[string]v1.BenchmarkJobStatus {
s := map[string]v1.BenchmarkJobStatus{}
for _, v := range jobNames {
s[v] = v1.BenchmarkJobNotReady
}
return s
}(),
}
scenarioStatus[sc.GetName()] = v1.BenchmarkScenarioHealthy
scenarioStatus[sc.GetName()] = sc.Status
// apply new crd which is not set yet.
if sc.Status == "" || sc.Status == v1.BenchmarkScenarioAvailable {
jobNames, err := o.createBenchmarkJob(ctx, sc)
if err != nil {
log.Errorf("[reconcile benchmark scenario resource] failed to create benchmark job resource: %s", err.Error())

Check warning on line 339 in pkg/tools/benchmark/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/operator/service/operator.go#L339

Added line #L339 was not covered by tests
}
// benchmark job resource status to store benchmarkScenario Atomic
s := map[string]v1.BenchmarkJobStatus{}
for _, v := range jobNames {
s[v] = v1.BenchmarkJobNotReady
}
cbsl[name].BenchJobStatus = s
// use for updating benchmarkScenario CR status
scenarioStatus[sc.GetName()] = v1.BenchmarkScenarioHealthy
}
} else {
// apply updated crd which is already applied.
if oldScenario.Crd.GetGeneration() < sc.GetGeneration() {
Expand Down Expand Up @@ -606,7 +612,7 @@ func (o *operator) checkAtomics() error {
return errors.ErrMismatchBenchmarkAtomics(cjl, cbjl, cbsl)
}
}
// check scenario and bench
// check scenario resource and bench resource
if owners := bj.GetOwnerReferences(); len(owners) > 0 {
var scenarioName string
for _, o := range owners {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tools/benchmark/operator/service/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,7 @@ func Test_operator_benchJobReconcile(t *testing.T) {
Timestamp: "",
},
},
Status: v1.BenchmarkJobAvailable,
Status: v1.BenchmarkJobHealthy,
},
},
},
Expand Down

0 comments on commit 96f8883

Please sign in to comment.