Skip to content

Commit

Permalink
Use job template from cfg
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Mar 14, 2024
1 parent b046705 commit 584d1f0
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 24 deletions.
6 changes: 1 addition & 5 deletions cmd/index/operator/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ observability:
operator:
namespace: default
agent_name: vald-agent
agent_namespace:
agent_namespace: default
rotator_name: vald-readreplica-rotate
target_read_replica_id_annotations_key: vald.vdaas.org/target-read-replica-id
rotation_job_concurrency: 2
Expand Down Expand Up @@ -195,7 +195,6 @@ operator:
app.kubernetes.io/instance: release-name
app.kubernetes.io/component: vald-readreplica-rotate
app.kubernetes.io/version: v1.7.12
annotations:
spec:
containers:
- name: vald-readreplica-rotate
Expand Down Expand Up @@ -309,7 +308,6 @@ operator:
app.kubernetes.io/instance: release-name
app.kubernetes.io/component: vald-index-creation
app.kubernetes.io/version: v1.7.12
annotations:
spec:
initContainers:
- name: wait-for-agent
Expand Down Expand Up @@ -425,7 +423,6 @@ operator:
app.kubernetes.io/instance: release-name
app.kubernetes.io/component: vald-index-save
app.kubernetes.io/version: v1.7.12
annotations:
spec:
initContainers:
- name: wait-for-agent
Expand Down Expand Up @@ -541,7 +538,6 @@ operator:
app.kubernetes.io/instance: release-name
app.kubernetes.io/component: vald-index-correction
app.kubernetes.io/version: v1.7.12
annotations:
spec:
initContainers:
- name: wait-for-agent
Expand Down
12 changes: 12 additions & 0 deletions internal/config/index_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.
package config

import "github.com/vdaas/vald/internal/k8s/client"

// IndexOperator represents the configurations for index k8s operator.
type IndexOperator struct {
// Namespace represent the namespace of this pod
Expand All @@ -38,6 +40,16 @@ type IndexOperator struct {

// ReadReplicaLabelKey represents the label key for read replica.
ReadReplicaLabelKey string `json:"read_replica_label_key" yaml:"read_replica_label_key"`

// JobTemplates represents the job templates for indexing.
JobTemplates IndexJobTemplates `json:"job_templates" yaml:"job_templates"`
}

type IndexJobTemplates struct {
Rotate *client.Job `json:"rotate" yaml:"rotate"`
Creation *client.Job `json:"creation" yaml:"creation"`
Save *client.Job `json:"save" yaml:"save"`
Correction *client.Job `json:"correction" yaml:"correction"`
}

func (ic *IndexOperator) Bind() *IndexOperator {
Expand Down
31 changes: 12 additions & 19 deletions pkg/index/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ type operator struct {
}

// New returns Indexer object if no error occurs.
func New(namespace, agentName, rotatorName, targetReadReplicaIDKey string, rotatorJob *client.Job,opts ...Option) (o Operator, err error) {
func New(namespace, agentName, rotatorName, targetReadReplicaIDKey string, rotatorJob *client.Job, opts ...Option) (o Operator, err error) {

Check warning on line 65 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L65

Added line #L65 was not covered by tests
operator := new(operator)
operator.namespace = namespace
operator.targetReadReplicaIDAnnotationsKey = targetReadReplicaIDKey
operator.rotatorName = rotatorName
operator.rotatorJob = rotatorJob

Check warning on line 70 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L67-L70

Added lines #L67 - L70 were not covered by tests
for _, opt := range append(defaultOpts, opts...) {
if err := opt(operator); err != nil {
oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt))
Expand Down Expand Up @@ -153,7 +154,8 @@ func (o *operator) podOnReconcile(ctx context.Context, pod *client.Pod) (client.
if o.readReplicaEnabled {
rq, err := o.reconcileRotatorJob(ctx, pod)
if err != nil {
return client.Result{}, fmt.Errorf("rotating or requeueing: %w", err)
log.Errorf("reconciling rotator job: %s", err)
return client.Result{}, fmt.Errorf("reconciling rotator job: %w", err)

Check warning on line 158 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L153-L158

Added lines #L153 - L158 were not covered by tests
}
// let controller-runtime backoff exponentially by not setting the backoff duration
return client.Result{
Expand Down Expand Up @@ -235,11 +237,6 @@ func needsRotation(agentAnnotations, readReplicaAnnotations map[string]string) (
}

func (o *operator) createRotationJobOrRequeue(ctx context.Context, podIdx string) (rq bool, err error) {

Check warning on line 239 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L239

Added line #L239 was not covered by tests
var cronJob client.CronJob
if err := o.client.Get(ctx, o.rotatorName, o.namespace, &cronJob); err != nil {
return false, err
}

// get all the rotation jobs and make sure the job is not running
res, err := o.ensureJobConcurrency(ctx, podIdx)
if err != nil {
Expand All @@ -259,21 +256,17 @@ func (o *operator) createRotationJobOrRequeue(ctx context.Context, podIdx string

// now we actually need to create the rotator job
log.Infof("no job is running to rotate the agent(id:%s). creating a new job...", podIdx)
spec := *cronJob.Spec.JobTemplate.Spec.DeepCopy()
if spec.Template.Annotations == nil {
spec.Template.Annotations = make(map[string]string)
job := o.rotatorJob.DeepCopy()
if job.Spec.Template.Annotations == nil {
job.Spec.Template.Annotations = make(map[string]string)

Check warning on line 261 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L258-L261

Added lines #L258 - L261 were not covered by tests
}
spec.Template.Annotations[o.targetReadReplicaIDAnnotationsKey] = podIdx

job := client.Job{
ObjectMeta: client.ObjectMeta{
GenerateName: cronJob.Name + "-",
Namespace: o.namespace,
},
Spec: spec,
job.Spec.Template.Annotations[o.targetReadReplicaIDAnnotationsKey] = podIdx
job.ObjectMeta = client.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", o.rotatorName),
Namespace: o.namespace,

Check warning on line 266 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L263-L266

Added lines #L263 - L266 were not covered by tests
}

if err := o.client.Create(ctx, &job); err != nil {
if err := o.client.Create(ctx, job); err != nil {
return false, fmt.Errorf("creating job resource with k8s API: %w", err)

Check warning on line 270 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L269-L270

Added lines #L269 - L270 were not covered by tests
}

Expand Down

0 comments on commit 584d1f0

Please sign in to comment.