diff --git a/internal/config/index_operator.go b/internal/config/index_operator.go index 053c4e845a..1b7d51aef1 100644 --- a/internal/config/index_operator.go +++ b/internal/config/index_operator.go @@ -13,7 +13,7 @@ // limitations under the License. package config -import "github.com/vdaas/vald/internal/k8s/client" +import "github.com/vdaas/vald/internal/k8s" // IndexOperator represents the configurations for index k8s operator. type IndexOperator struct { @@ -46,10 +46,10 @@ type IndexOperator struct { } type IndexJobTemplates struct { - Rotate *client.Job `json:"rotate" yaml:"rotate"` - Creation *client.Job `json:"creation" yaml:"creation"` - Save *client.Job `json:"save" yaml:"save"` - Correction *client.Job `json:"correction" yaml:"correction"` + Rotate *k8s.Job `json:"rotate" yaml:"rotate"` + Creation *k8s.Job `json:"creation" yaml:"creation"` + Save *k8s.Job `json:"save" yaml:"save"` + Correction *k8s.Job `json:"correction" yaml:"correction"` } func (ic *IndexOperator) Bind() *IndexOperator { diff --git a/internal/k8s/client/client.go b/internal/k8s/client/client.go index 10a91a5224..f02d15b7f7 100644 --- a/internal/k8s/client/client.go +++ b/internal/k8s/client/client.go @@ -23,11 +23,9 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" + "github.com/vdaas/vald/internal/k8s" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -42,42 +40,6 @@ import ( cli "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" -) - -type ( - Object = cli.Object - ObjectKey = cli.ObjectKey - DeleteAllOfOptions = cli.DeleteAllOfOptions - DeleteOptions = cli.DeleteOptions - ListOptions = cli.ListOptions - ListOption = cli.ListOption - CreateOption = cli.CreateOption - CreateOptions = cli.CreateOptions - GetOption = cli.GetOption - GetOptions = cli.GetOptions - UpdateOptions = cli.UpdateOptions - MatchingLabels = cli.MatchingLabels - InNamespace = cli.InNamespace - VolumeSnapshot = snapshotv1.VolumeSnapshot - Pod = corev1.Pod - Deployment = appsv1.Deployment - DeploymentList = appsv1.DeploymentList - ObjectMeta = metav1.ObjectMeta - EnvVar = corev1.EnvVar - Job = batchv1.Job - JobList = batchv1.JobList - JobStatus = batchv1.JobStatus - CronJob = batchv1.CronJob - Result = reconcile.Result -) - -const ( - DeletePropagationBackground = metav1.DeletePropagationBackground - WatchDeletedEvent = watch.Deleted - SelectionOpEquals = selection.Equals - SelectionOpExists = selection.Exists - PodIndexLabel = appsv1.PodIndexLabel ) var ( @@ -90,29 +52,29 @@ type Client interface { // Get retrieves an obj for the given object key from the Kubernetes Cluster. // obj must be a struct pointer so that obj can be updated with the response // returned by the Server. - Get(ctx context.Context, name string, namespace string, obj Object, opts ...cli.GetOption) error + Get(ctx context.Context, name string, namespace string, obj k8s.Object, opts ...cli.GetOption) error // List retrieves list of objects for a given namespace and list options. On a // successful call, Items field in the list will be populated with the // result returned from the server. - List(ctx context.Context, list cli.ObjectList, opts ...ListOption) error + List(ctx context.Context, list cli.ObjectList, opts ...k8s.ListOption) error // Create saves the object obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. - Create(ctx context.Context, obj Object, opts ...CreateOption) error + Create(ctx context.Context, obj k8s.Object, opts ...k8s.CreateOption) error // Delete deletes the given obj from Kubernetes cluster. - Delete(ctx context.Context, obj Object, opts ...cli.DeleteOption) error + Delete(ctx context.Context, obj k8s.Object, opts ...cli.DeleteOption) error // Update updates the given obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. - Update(ctx context.Context, obj Object, opts ...cli.UpdateOption) error + Update(ctx context.Context, obj k8s.Object, opts ...cli.UpdateOption) error // Patch patches the given obj in the Kubernetes cluster. obj must be a // struct pointer so that obj can be updated with the content returned by the Server. - Patch(ctx context.Context, obj Object, patch cli.Patch, opts ...cli.PatchOption) error + Patch(ctx context.Context, obj k8s.Object, patch cli.Patch, opts ...cli.PatchOption) error // Watch watches the given obj for changes and takes the appropriate callbacks. - Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error) + Watch(ctx context.Context, obj cli.ObjectList, opts ...k8s.ListOption) (watch.Interface, error) // MatchingLabels filters the list/delete operation on the given set of labels. MatchingLabels(labels map[string]string) cli.MatchingLabels @@ -171,23 +133,23 @@ func (c *client) List(ctx context.Context, list cli.ObjectList, opts ...cli.List return c.withWatch.List(ctx, list, opts...) } -func (c *client) Create(ctx context.Context, obj Object, opts ...CreateOption) error { +func (c *client) Create(ctx context.Context, obj k8s.Object, opts ...k8s.CreateOption) error { return c.withWatch.Create(ctx, obj, opts...) } -func (c *client) Delete(ctx context.Context, obj Object, opts ...cli.DeleteOption) error { +func (c *client) Delete(ctx context.Context, obj k8s.Object, opts ...cli.DeleteOption) error { return c.withWatch.Delete(ctx, obj, opts...) } -func (c *client) Update(ctx context.Context, obj Object, opts ...cli.UpdateOption) error { +func (c *client) Update(ctx context.Context, obj k8s.Object, opts ...cli.UpdateOption) error { return c.withWatch.Update(ctx, obj, opts...) } -func (c *client) Patch(ctx context.Context, obj Object, patch cli.Patch, opts ...cli.PatchOption) error { +func (c *client) Patch(ctx context.Context, obj k8s.Object, patch cli.Patch, opts ...cli.PatchOption) error { return c.withWatch.Patch(ctx, obj, patch, opts...) } -func (c *client) Watch(ctx context.Context, obj cli.ObjectList, opts ...ListOption) (watch.Interface, error) { +func (c *client) Watch(ctx context.Context, obj cli.ObjectList, opts ...k8s.ListOption) (watch.Interface, error) { return c.withWatch.Watch(ctx, obj, opts...) } diff --git a/internal/k8s/job/job.go b/internal/k8s/job/job.go index b71c386870..d714900d69 100644 --- a/internal/k8s/job/job.go +++ b/internal/k8s/job/job.go @@ -40,23 +40,17 @@ type reconciler struct { name string namespaces []string onError func(err error) - onReconcile func(ctx context.Context, jobList map[string][]Job) + onReconcile func(ctx context.Context, jobList map[string][]k8s.Job) listOpts []client.ListOption jobsByAppNamePool sync.Pool // map[app][]Job } -// Job is a type alias for the k8s job definition. -type Job = batchv1.Job - -// JobStatus is a type alias for the k8s job status definition. -type JobStatus = batchv1.JobStatus - // New returns the JobWatcher that implements reconciliation loop, or any errors occurred. func New(opts ...Option) (JobWatcher, error) { r := &reconciler{ jobsByAppNamePool: sync.Pool{ New: func() interface{} { - return make(map[string][]Job) + return make(map[string][]k8s.Job) }, }, } @@ -100,7 +94,7 @@ func (r *reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (res re return } - jobs := r.jobsByAppNamePool.Get().(map[string][]Job) + jobs := r.jobsByAppNamePool.Get().(map[string][]k8s.Job) for idx := range js.Items { job := js.Items[idx] name, ok := job.GetObjectMeta().GetLabels()["app"] @@ -110,7 +104,7 @@ func (r *reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (res re } if _, ok := jobs[name]; !ok { - jobs[name] = make([]Job, 0, len(js.Items)) + jobs[name] = make([]k8s.Job, 0, len(js.Items)) } jobs[name] = append(jobs[name], job) } diff --git a/internal/k8s/job/option.go b/internal/k8s/job/option.go index b4849294bb..22df59d6be 100644 --- a/internal/k8s/job/option.go +++ b/internal/k8s/job/option.go @@ -16,6 +16,7 @@ package job import ( "context" + "github.com/vdaas/vald/internal/k8s" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -57,7 +58,7 @@ func WithOnErrorFunc(f func(err error)) Option { } // WithOnReconcileFunc returns Option that sets r.onReconcile. -func WithOnReconcileFunc(f func(ctx context.Context, jobList map[string][]Job)) Option { +func WithOnReconcileFunc(f func(ctx context.Context, jobList map[string][]k8s.Job)) Option { return func(r *reconciler) error { r.onReconcile = f return nil diff --git a/internal/k8s/reconciler.go b/internal/k8s/reconciler.go index 28742af78d..b13d1cb508 100644 --- a/internal/k8s/reconciler.go +++ b/internal/k8s/reconciler.go @@ -36,11 +36,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -type ( - Manager = manager.Manager - OwnerReference = metav1.OwnerReference -) - type Controller interface { Start(ctx context.Context) (<-chan error, error) GetManager() Manager diff --git a/internal/k8s/types.go b/internal/k8s/types.go new file mode 100644 index 0000000000..b8c9bc4452 --- /dev/null +++ b/internal/k8s/types.go @@ -0,0 +1,72 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package k8s + +import ( + snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type ( + Object = client.Object + ObjectKey = client.ObjectKey + DeleteAllOfOptions = client.DeleteAllOfOptions + DeleteOptions = client.DeleteOptions + ListOptions = client.ListOptions + ListOption = client.ListOption + CreateOption = client.CreateOption + CreateOptions = client.CreateOptions + GetOption = client.GetOption + GetOptions = client.GetOptions + UpdateOptions = client.UpdateOptions + MatchingLabels = client.MatchingLabels + InNamespace = client.InNamespace + VolumeSnapshot = snapshotv1.VolumeSnapshot + VolumeSnapshotList = snapshotv1.VolumeSnapshotList + Pod = corev1.Pod + Deployment = appsv1.Deployment + DeploymentList = appsv1.DeploymentList + ObjectMeta = metav1.ObjectMeta + EnvVar = corev1.EnvVar + Job = batchv1.Job + JobList = batchv1.JobList + JobStatus = batchv1.JobStatus + CronJob = batchv1.CronJob + Result = reconcile.Result + OwnerReference = metav1.OwnerReference + PersistentVolumeClaim = corev1.PersistentVolumeClaim + PersistentVolumeClaimList = corev1.PersistentVolumeClaimList + PersistentVolumeClaimSpec = corev1.PersistentVolumeClaimSpec + TypedLocalObjectReference = corev1.TypedLocalObjectReference + Manager = manager.Manager +) + +const ( + DeletePropagationBackground = metav1.DeletePropagationBackground + WatchDeletedEvent = watch.Deleted + SelectionOpEquals = selection.Equals + SelectionOpExists = selection.Exists + PodIndexLabel = appsv1.PodIndexLabel +) diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go index b93866959c..486af7cd9e 100644 --- a/internal/k8s/vald/benchmark/job/job_template.go +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -18,7 +18,7 @@ package job import ( - jobs "github.com/vdaas/vald/internal/k8s/job" + "github.com/vdaas/vald/internal/k8s" corev1 "k8s.io/api/core/v1" ) @@ -42,14 +42,14 @@ const ( ) type BenchmarkJobTpl interface { - CreateJobTpl(opts ...BenchmarkJobOption) (jobs.Job, error) + CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, error) } type benchmarkJobTpl struct { containerName string containerImageName string imagePullPolicy ImagePullPolicy - jobTpl jobs.Job + jobTpl k8s.Job } func NewBenchmarkJob(opts ...BenchmarkJobTplOption) (BenchmarkJobTpl, error) { @@ -63,7 +63,7 @@ func NewBenchmarkJob(opts ...BenchmarkJobTplOption) (BenchmarkJobTpl, error) { return bjTpl, nil } -func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (jobs.Job, error) { +func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, error) { for _, opt := range append(defaultBenchmarkJobOpts, opts...) { err := opt(&b.jobTpl) if err != nil { diff --git a/internal/k8s/vald/benchmark/job/job_template_option.go b/internal/k8s/vald/benchmark/job/job_template_option.go index 35df5e426a..46574ea0c4 100644 --- a/internal/k8s/vald/benchmark/job/job_template_option.go +++ b/internal/k8s/vald/benchmark/job/job_template_option.go @@ -18,7 +18,6 @@ package job import ( "github.com/vdaas/vald/internal/k8s" - jobs "github.com/vdaas/vald/internal/k8s/job" corev1 "k8s.io/api/core/v1" ) @@ -62,7 +61,7 @@ func WithImagePullPolicy(p ImagePullPolicy) BenchmarkJobTplOption { } // BenchmarkJobOption represents the option for create benchmark job template. -type BenchmarkJobOption func(b *jobs.Job) error +type BenchmarkJobOption func(b *k8s.Job) error const ( // defaultTTLSeconds represents the default TTLSecondsAfterFinished for benchmark job template. @@ -83,7 +82,7 @@ var defaultBenchmarkJobOpts = []BenchmarkJobOption{ // WithSvcAccountName sets the service account name for benchmark job. func WithSvcAccountName(name string) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if len(name) > 0 { b.Spec.Template.Spec.ServiceAccountName = name } @@ -93,7 +92,7 @@ func WithSvcAccountName(name string) BenchmarkJobOption { // WithRestartPolicy sets the job restart policy for benchmark job. func WithRestartPolicy(rp RestartPolicy) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if len(rp) > 0 { b.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicy(rp) } @@ -103,7 +102,7 @@ func WithRestartPolicy(rp RestartPolicy) BenchmarkJobOption { // WithBackoffLimit sets the job backoff limit for benchmark job. func WithBackoffLimit(bo int32) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { b.Spec.BackoffLimit = &bo return nil } @@ -111,7 +110,7 @@ func WithBackoffLimit(bo int32) BenchmarkJobOption { // WithName sets the job name of benchmark job. func WithName(name string) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { b.Name = name return nil } @@ -119,7 +118,7 @@ func WithName(name string) BenchmarkJobOption { // WithNamespace specify namespace where job will execute. func WithNamespace(ns string) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { b.Namespace = ns return nil } @@ -127,7 +126,7 @@ func WithNamespace(ns string) BenchmarkJobOption { // WithOwnerRef sets the OwnerReference to the job resource. func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if len(refs) > 0 { b.OwnerReferences = refs } @@ -137,7 +136,7 @@ func WithOwnerRef(refs []k8s.OwnerReference) BenchmarkJobOption { // WithCompletions sets the job completion. func WithCompletions(com int32) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if com > 1 { b.Spec.Completions = &com } @@ -147,7 +146,7 @@ func WithCompletions(com int32) BenchmarkJobOption { // WithParallelism sets the job parallelism. func WithParallelism(parallelism int32) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if parallelism > 1 { b.Spec.Parallelism = ¶llelism } @@ -157,7 +156,7 @@ func WithParallelism(parallelism int32) BenchmarkJobOption { // WithLabel sets the label to the job resource. func WithLabel(label map[string]string) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if len(label) > 0 { b.Labels = label } @@ -167,7 +166,7 @@ func WithLabel(label map[string]string) BenchmarkJobOption { // WithTTLSecondsAfterFinished sets the TTLSecondsAfterFinished to the job template. func WithTTLSecondsAfterFinished(ttl int32) BenchmarkJobOption { - return func(b *jobs.Job) error { + return func(b *k8s.Job) error { if ttl > 0 { b.Spec.TTLSecondsAfterFinished = &ttl } diff --git a/internal/test/data/hdf5/hdf5_test.go b/internal/test/data/hdf5/hdf5_test.go index 6af704fb51..d43a09c693 100644 --- a/internal/test/data/hdf5/hdf5_test.go +++ b/internal/test/data/hdf5/hdf5_test.go @@ -18,12 +18,11 @@ package hdf5 import ( - "log" - "os" "reflect" "testing" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/test/goleak" "gonum.org/v1/hdf5" ) @@ -1192,20 +1191,8 @@ func Test_downloadFile(t *testing.T) { beforeFunc func(*testing.T, args) afterFunc func(*testing.T, args) } - defaultBeforeFunc := func(t *testing.T, _ args) { - t.Helper() - err := os.Mkdir("tmp", os.ModePerm) - if err != nil { - log.Fatal(err) - } - } - defaultAfterFunc := func(t *testing.T, _ args) { - t.Helper() - err := os.RemoveAll("tmp") - if err != nil { - log.Fatal(err) - } - } + defaultBeforeFunc := func(t *testing.T, _ args) {} + defaultAfterFunc := func(t *testing.T, _ args) {} defaultCheckFunc := func(w want, err error) error { if !errors.Is(err, w.err) { return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) @@ -1213,19 +1200,23 @@ func Test_downloadFile(t *testing.T) { return nil } tests := []test{ - { - name: "success download hdf5 file", - args: args{ - url: "https://ann-benchmarks.com/fashion-mnist-784-euclidean.hdf5", - path: "./tmp/data", - }, - want: want{ - err: nil, - }, - beforeFunc: defaultBeforeFunc, - afterFunc: defaultAfterFunc, - checkFunc: defaultCheckFunc, - }, + func() test { + tmpdir := t.TempDir() + path := file.Join(tmpdir, "data") + return test{ + name: "success download hdf5 file", + args: args{ + url: "https://ann-benchmarks.com/fashion-mnist-784-euclidean.hdf5", + path: path, + }, + want: want{ + err: nil, + }, + beforeFunc: defaultBeforeFunc, + afterFunc: defaultAfterFunc, + checkFunc: defaultCheckFunc, + } + }(), } for _, tc := range tests { diff --git a/internal/test/mock/k8s/client.go b/internal/test/mock/k8s/client.go index a40828d8ec..9a3c628ccb 100644 --- a/internal/test/mock/k8s/client.go +++ b/internal/test/mock/k8s/client.go @@ -17,6 +17,7 @@ import ( "context" "github.com/stretchr/testify/mock" + "github.com/vdaas/vald/internal/k8s" "github.com/vdaas/vald/internal/k8s/client" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -30,44 +31,44 @@ type ValdK8sClientMock struct { var _ client.Client = (*ValdK8sClientMock)(nil) -func (m *ValdK8sClientMock) Get(ctx context.Context, name, namespace string, obj client.Object, opts ...crclient.GetOption) error { +func (m *ValdK8sClientMock) Get(ctx context.Context, name, namespace string, obj k8s.Object, opts ...crclient.GetOption) error { args := m.Called(ctx, name, namespace, obj, opts) return args.Error(0) } -func (m *ValdK8sClientMock) List(ctx context.Context, list crclient.ObjectList, opts ...client.ListOption) error { +func (m *ValdK8sClientMock) List(ctx context.Context, list crclient.ObjectList, opts ...k8s.ListOption) error { args := m.Called(ctx, list, opts) return args.Error(0) } -func (m *ValdK8sClientMock) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { +func (m *ValdK8sClientMock) Create(ctx context.Context, obj k8s.Object, opts ...k8s.CreateOption) error { args := m.Called(ctx, obj, opts) return args.Error(0) } -func (m *ValdK8sClientMock) Delete(ctx context.Context, obj client.Object, opts ...crclient.DeleteOption) error { +func (m *ValdK8sClientMock) Delete(ctx context.Context, obj k8s.Object, opts ...crclient.DeleteOption) error { args := m.Called(ctx, obj, opts) return args.Error(0) } -func (m *ValdK8sClientMock) Update(ctx context.Context, obj client.Object, opts ...crclient.UpdateOption) error { +func (m *ValdK8sClientMock) Update(ctx context.Context, obj k8s.Object, opts ...crclient.UpdateOption) error { args := m.Called(ctx, obj, opts) return args.Error(0) } -func (m *ValdK8sClientMock) Patch(ctx context.Context, obj client.Object, patch crclient.Patch, opts ...crclient.PatchOption) error { +func (m *ValdK8sClientMock) Patch(ctx context.Context, obj k8s.Object, patch crclient.Patch, opts ...crclient.PatchOption) error { args := m.Called(ctx, obj, patch, opts) return args.Error(0) } -func (m *ValdK8sClientMock) Watch(ctx context.Context, obj crclient.ObjectList, opts ...client.ListOption) (watch.Interface, error) { +func (m *ValdK8sClientMock) Watch(ctx context.Context, obj crclient.ObjectList, opts ...k8s.ListOption) (watch.Interface, error) { args := m.Called(ctx, obj, opts) return args.Get(0).(watch.Interface), args.Error(1) } -func (m *ValdK8sClientMock) MatchingLabels(labels map[string]string) client.MatchingLabels { +func (m *ValdK8sClientMock) MatchingLabels(labels map[string]string) k8s.MatchingLabels { args := m.Called(labels) - return args.Get(0).(client.MatchingLabels) + return args.Get(0).(k8s.MatchingLabels) } func (m *ValdK8sClientMock) LabelSelector(key string, op selection.Operator, vals []string) (labels.Selector, error) { diff --git a/pkg/gateway/mirror/service/discovery.go b/pkg/gateway/mirror/service/discovery.go index a118655aa0..c5e41ad8fe 100644 --- a/pkg/gateway/mirror/service/discovery.go +++ b/pkg/gateway/mirror/service/discovery.go @@ -24,7 +24,6 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/hash" "github.com/vdaas/vald/internal/k8s" - k8sclient "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/k8s/vald/mirror/target" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net" @@ -314,7 +313,7 @@ func (d *discovery) disconnectTarget(ctx context.Context, req map[string]*delete func (d *discovery) updateMirrorTargetPhase(ctx context.Context, name string, phase target.MirrorTargetPhase) error { c := d.ctrl.GetManager().GetClient() mt := &target.MirrorTarget{} - err := c.Get(ctx, k8sclient.ObjectKey{ + err := c.Get(ctx, k8s.ObjectKey{ Namespace: d.namespace, Name: name, }, mt) diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index ec64ac39bc..e3c8bfec6b 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -20,17 +20,14 @@ import ( "strings" "time" - snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s" "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/k8s/vald" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/safety" "github.com/vdaas/vald/internal/sync/errgroup" - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" ) @@ -52,7 +49,7 @@ type rotator struct { } type subProcess struct { - listOpts client.ListOptions + listOpts k8s.ListOptions client client.Client volumeName string } @@ -123,13 +120,13 @@ func (r *rotator) Start(ctx context.Context) error { } func (r *rotator) newSubprocess(c client.Client, replicaID string) (subProcess, error) { - selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpEquals, []string{replicaID}) + selector, err := c.LabelSelector(r.readReplicaLabelKey, k8s.SelectionOpEquals, []string{replicaID}) if err != nil { return subProcess{}, err } sub := subProcess{ client: c, - listOpts: client.ListOptions{ + listOpts: k8s.ListOptions{ Namespace: r.namespace, LabelSelector: selector, }, @@ -186,8 +183,8 @@ func (s *subProcess) rotate(ctx context.Context) error { return nil } -func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Deployment) (newSnap, oldSnap *client.VolumeSnapshot, err error) { - list := snapshotv1.VolumeSnapshotList{} +func (s *subProcess) createSnapshot(ctx context.Context, deployment *k8s.Deployment) (newSnap, oldSnap *k8s.VolumeSnapshot, err error) { + list := k8s.VolumeSnapshotList{} if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, nil, fmt.Errorf("failed to get snapshot: %w", err) } @@ -201,12 +198,12 @@ func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Depl if newNameBase == "" { return nil, nil, fmt.Errorf("the name(%s) doesn't seem to have replicaid", cur.GetObjectMeta().GetName()) } - newSnap = &client.VolumeSnapshot{ - ObjectMeta: metav1.ObjectMeta{ + newSnap = &k8s.VolumeSnapshot{ + ObjectMeta: k8s.ObjectMeta{ Name: fmt.Sprintf("%s%d", newNameBase, time.Now().Unix()), Namespace: cur.GetNamespace(), Labels: cur.GetObjectMeta().GetLabels(), - OwnerReferences: []metav1.OwnerReference{ + OwnerReferences: []k8s.OwnerReference{ { APIVersion: "apps/v1", Kind: "Deployment", @@ -230,8 +227,8 @@ func (s *subProcess) createSnapshot(ctx context.Context, deployment *appsv1.Depl return newSnap, oldSnap, nil } -func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployment *appsv1.Deployment) (newPvc, oldPvc *v1.PersistentVolumeClaim, err error) { - list := v1.PersistentVolumeClaimList{} +func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployment *k8s.Deployment) (newPvc, oldPvc *k8s.PersistentVolumeClaim, err error) { + list := k8s.PersistentVolumeClaimList{} if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, nil, fmt.Errorf("failed to get PVC: %w", err) } @@ -247,12 +244,12 @@ func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployme } // remove timestamp from old pvc name - newPvc = &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ + newPvc = &k8s.PersistentVolumeClaim{ + ObjectMeta: k8s.ObjectMeta{ Name: fmt.Sprintf("%s%d", newNameBase, time.Now().Unix()), Namespace: cur.GetNamespace(), Labels: cur.GetObjectMeta().GetLabels(), - OwnerReferences: []metav1.OwnerReference{ + OwnerReferences: []k8s.OwnerReference{ { APIVersion: "apps/v1", Kind: "Deployment", @@ -262,10 +259,10 @@ func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployme }, }, }, - Spec: v1.PersistentVolumeClaimSpec{ + Spec: k8s.PersistentVolumeClaimSpec{ AccessModes: cur.Spec.AccessModes, Resources: cur.Spec.Resources, - DataSource: &v1.TypedLocalObjectReference{ + DataSource: &k8s.TypedLocalObjectReference{ Name: newSnapShot, Kind: cur.Spec.DataSource.Kind, APIGroup: cur.Spec.DataSource.APIGroup, @@ -284,8 +281,8 @@ func (s *subProcess) createPVC(ctx context.Context, newSnapShot string, deployme return newPvc, oldPvc, nil } -func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, error) { - list := appsv1.DeploymentList{} +func (s *subProcess) getDeployment(ctx context.Context) (*k8s.Deployment, error) { + list := k8s.DeploymentList{} if err := s.client.List(ctx, &list, &s.listOpts); err != nil { return nil, fmt.Errorf("failed to get deployment through client: %w", err) } @@ -296,7 +293,7 @@ func (s *subProcess) getDeployment(ctx context.Context) (*appsv1.Deployment, err return &list.Items[0], nil } -func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *appsv1.Deployment, snapshotTime time.Time) error { +func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deployment *k8s.Deployment, snapshotTime time.Time) error { if deployment.Spec.Template.ObjectMeta.Annotations == nil { deployment.Spec.Template.ObjectMeta.Annotations = map[string]string{} } @@ -324,10 +321,10 @@ func (s *subProcess) updateDeployment(ctx context.Context, newPVC string, deploy return nil } -func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.VolumeSnapshot) error { +func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *k8s.VolumeSnapshot) error { watcher, err := s.client.Watch(ctx, - &snapshotv1.VolumeSnapshotList{ - Items: []snapshotv1.VolumeSnapshot{*snapshot}, + &k8s.VolumeSnapshotList{ + Items: []k8s.VolumeSnapshot{*snapshot}, }, &s.listOpts, ) @@ -345,7 +342,7 @@ func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Vo case <-egctx.Done(): return egctx.Err() case event := <-watcher.ResultChan(): - if event.Type == client.WatchDeletedEvent { + if event.Type == k8s.WatchDeletedEvent { log.Infof("volume snapshot(%s) deleted", snapshot.GetName()) return nil } else { @@ -361,10 +358,10 @@ func (s *subProcess) deleteSnapshot(ctx context.Context, snapshot *snapshotv1.Vo return eg.Wait() } -func (s *subProcess) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClaim) error { +func (s *subProcess) deletePVC(ctx context.Context, pvc *k8s.PersistentVolumeClaim) error { watcher, err := s.client.Watch(ctx, - &v1.PersistentVolumeClaimList{ - Items: []v1.PersistentVolumeClaim{*pvc}, + &k8s.PersistentVolumeClaimList{ + Items: []k8s.PersistentVolumeClaim{*pvc}, }, &s.listOpts, ) @@ -382,7 +379,7 @@ func (s *subProcess) deletePVC(ctx context.Context, pvc *v1.PersistentVolumeClai case <-egctx.Done(): return egctx.Err() case event := <-watcher.ResultChan(): - if event.Type == client.WatchDeletedEvent { + if event.Type == k8s.WatchDeletedEvent { log.Infof("PVC(%s) deleted", pvc.GetName()) return nil } else { @@ -417,12 +414,12 @@ func (r *rotator) parseReplicaID(replicaID string, c client.Client) ([]string, e } if replicaID == rotateAllID { - var deploymentList appsv1.DeploymentList - selector, err := c.LabelSelector(r.readReplicaLabelKey, client.SelectionOpExists, []string{}) + var deploymentList k8s.DeploymentList + selector, err := c.LabelSelector(r.readReplicaLabelKey, k8s.SelectionOpExists, []string{}) if err != nil { return nil, err } - if err := c.List(context.Background(), &deploymentList, &client.ListOptions{ + if err := c.List(context.Background(), &deploymentList, &k8s.ListOptions{ Namespace: r.namespace, LabelSelector: selector, }); err != nil { diff --git a/pkg/index/job/readreplica/rotate/service/rotator_test.go b/pkg/index/job/readreplica/rotate/service/rotator_test.go index 4369a884fc..36e0f4d83e 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator_test.go +++ b/pkg/index/job/readreplica/rotate/service/rotator_test.go @@ -18,8 +18,9 @@ import ( "github.com/stretchr/testify/require" "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/k8s" "github.com/vdaas/vald/internal/k8s/client" - "github.com/vdaas/vald/internal/test/mock/k8s" + mock "github.com/vdaas/vald/internal/test/mock/k8s" "github.com/vdaas/vald/internal/test/testify" ) @@ -135,21 +136,21 @@ func Test_parseReplicaID(t *testing.T) { func() test { wantID1 := "bar" wantID2 := "baz" - mock := &k8s.ValdK8sClientMock{} + mock := &mock.ValdK8sClientMock{} mock.On("LabelSelector", testify.Anything, testify.Anything, testify.Anything).Return(client.NewSelector(), nil) mock.On("List", testify.Anything, testify.Anything, testify.Anything).Run(func(args testify.Arguments) { - if depList, ok := args.Get(1).(*client.DeploymentList); ok { - depList.Items = []client.Deployment{ + if depList, ok := args.Get(1).(*k8s.DeploymentList); ok { + depList.Items = []k8s.Deployment{ { - ObjectMeta: client.ObjectMeta{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ labelKey: wantID1, }, }, }, { - ObjectMeta: client.ObjectMeta{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ labelKey: wantID2, }, diff --git a/pkg/index/operator/service/operator.go b/pkg/index/operator/service/operator.go index ab20319484..9f7c7b1c48 100644 --- a/pkg/index/operator/service/operator.go +++ b/pkg/index/operator/service/operator.go @@ -58,11 +58,11 @@ type operator struct { readReplicaEnabled bool readReplicaLabelKey string rotationJobConcurrency uint - rotatorJob *client.Job + rotatorJob *k8s.Job } // New returns Indexer object if no error occurs. -func New(namespace, agentName, rotatorName, targetReadReplicaIDKey string, rotatorJob *client.Job, opts ...Option) (o Operator, err error) { +func New(namespace, agentName, rotatorName, targetReadReplicaIDKey string, rotatorJob *k8s.Job, opts ...Option) (o Operator, err error) { operator := new(operator) operator.namespace = namespace operator.targetReadReplicaIDAnnotationsKey = targetReadReplicaIDKey @@ -80,7 +80,7 @@ func New(namespace, agentName, rotatorName, targetReadReplicaIDKey string, rotat } } - isAgent := func(pod *client.Pod) bool { + isAgent := func(pod *k8s.Pod) bool { return pod.Labels["app"] == agentName } @@ -150,38 +150,38 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) { return ech, nil } -func (o *operator) podOnReconcile(ctx context.Context, pod *client.Pod) (client.Result, error) { +func (o *operator) podOnReconcile(ctx context.Context, pod *k8s.Pod) (k8s.Result, error) { if o.readReplicaEnabled { rq, err := o.reconcileRotatorJob(ctx, pod) if err != nil { log.Errorf("reconciling rotator job: %s", err) - return client.Result{}, fmt.Errorf("reconciling rotator job: %w", err) + return k8s.Result{}, fmt.Errorf("reconciling rotator job: %w", err) } // let controller-runtime backoff exponentially by not setting the backoff duration - return client.Result{ + return k8s.Result{ Requeue: rq, }, nil } - return client.Result{}, nil + return k8s.Result{}, nil } // reconcileRotatorJob starts rotation job when the condition meets. // This function is work in progress. -func (o *operator) reconcileRotatorJob(ctx context.Context, pod *client.Pod) (requeue bool, err error) { - podIdx, ok := pod.Labels[client.PodIndexLabel] +func (o *operator) reconcileRotatorJob(ctx context.Context, pod *k8s.Pod) (requeue bool, err error) { + podIdx, ok := pod.Labels[k8s.PodIndexLabel] if !ok { log.Info("no index label found. the agent is not StatefulSet? skipping...") return false, nil } // retrieve the readreplica deployment annotations for podIdx - var readReplicaDeployments client.DeploymentList - selector, err := o.client.LabelSelector(o.readReplicaLabelKey, client.SelectionOpEquals, []string{podIdx}) + var readReplicaDeployments k8s.DeploymentList + selector, err := o.client.LabelSelector(o.readReplicaLabelKey, k8s.SelectionOpEquals, []string{podIdx}) if err != nil { return false, fmt.Errorf("creating label selector: %w", err) } - listOpts := client.ListOptions{ + listOpts := k8s.ListOptions{ Namespace: o.namespace, LabelSelector: selector, } @@ -261,7 +261,7 @@ func (o *operator) createRotationJobOrRequeue(ctx context.Context, podIdx string job.Spec.Template.Annotations = make(map[string]string) } job.Spec.Template.Annotations[o.targetReadReplicaIDAnnotationsKey] = podIdx - job.ObjectMeta = client.ObjectMeta{ + job.ObjectMeta = k8s.ObjectMeta{ GenerateName: fmt.Sprintf("%s-", o.rotatorName), Namespace: o.namespace, } @@ -277,12 +277,12 @@ func (o *operator) createRotationJobOrRequeue(ctx context.Context, podIdx string // the MaxConcurrentReconciles defaults to 1 and we do not change it. func (o *operator) ensureJobConcurrency(ctx context.Context, podIdx string) (jobReconcileResult, error) { // get all the rotation jobs and make sure the job is not running - var jobList client.JobList - selector, err := o.client.LabelSelector("app", client.SelectionOpEquals, []string{o.rotatorName}) + var jobList k8s.JobList + selector, err := o.client.LabelSelector("app", k8s.SelectionOpEquals, []string{o.rotatorName}) if err != nil { return createSkipped, fmt.Errorf("creating label selector: %w", err) } - if err := o.client.List(ctx, &jobList, &client.ListOptions{ + if err := o.client.List(ctx, &jobList, &k8s.ListOptions{ Namespace: o.namespace, LabelSelector: selector, }); err != nil { @@ -290,7 +290,7 @@ func (o *operator) ensureJobConcurrency(ctx context.Context, podIdx string) (job } // no need to check finished jobs - jobList.Items = slices.DeleteFunc(jobList.Items, func(job client.Job) bool { + jobList.Items = slices.DeleteFunc(jobList.Items, func(job k8s.Job) bool { return job.Status.Active == 0 }) diff --git a/pkg/index/operator/service/operator_test.go b/pkg/index/operator/service/operator_test.go index f96fe89fa6..be519476a5 100644 --- a/pkg/index/operator/service/operator_test.go +++ b/pkg/index/operator/service/operator_test.go @@ -22,9 +22,10 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/vdaas/vald/internal/k8s" "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/k8s/vald" - "github.com/vdaas/vald/internal/test/mock/k8s" + mock "github.com/vdaas/vald/internal/test/mock/k8s" "github.com/vdaas/vald/internal/test/testify" ) @@ -32,16 +33,16 @@ func Test_operator_podOnReconcile(t *testing.T) { t.Parallel() type want struct { - res client.Result + res k8s.Result createCalled bool err error } type test struct { name string - agentPod *client.Pod + agentPod *k8s.Pod readReplicaEnabled bool - readReplicaDeployment *client.Deployment - runningJobs []client.Job + readReplicaDeployment *k8s.Deployment + runningJobs []k8s.Job rotationJobConcurrency uint want want } @@ -51,7 +52,7 @@ func Test_operator_podOnReconcile(t *testing.T) { name: "returns client.Result{} when read replica is not enabled", readReplicaEnabled: false, want: want{ - res: client.Result{}, + res: k8s.Result{}, createCalled: false, err: nil, }, @@ -59,9 +60,9 @@ func Test_operator_podOnReconcile(t *testing.T) { { name: "returns client.Result{} when pod is not a statefulset", readReplicaEnabled: true, - agentPod: &client.Pod{}, + agentPod: &k8s.Pod{}, want: want{ - res: client.Result{}, + res: k8s.Result{}, createCalled: false, err: nil, }, @@ -72,18 +73,18 @@ func Test_operator_podOnReconcile(t *testing.T) { return test{ name: "returns requeue: false when last snapshot time is after the last save time", readReplicaEnabled: true, - agentPod: &client.Pod{ - ObjectMeta: client.ObjectMeta{ + agentPod: &k8s.Pod{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ - client.PodIndexLabel: "0", + k8s.PodIndexLabel: "0", }, Annotations: map[string]string{ vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), }, }, }, - readReplicaDeployment: &client.Deployment{ - ObjectMeta: client.ObjectMeta{ + readReplicaDeployment: &k8s.Deployment{ + ObjectMeta: k8s.ObjectMeta{ Name: "deploymentName", Annotations: map[string]string{ vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), @@ -91,7 +92,7 @@ func Test_operator_podOnReconcile(t *testing.T) { }, }, want: want{ - res: client.Result{ + res: k8s.Result{ Requeue: false, }, createCalled: false, @@ -105,18 +106,18 @@ func Test_operator_podOnReconcile(t *testing.T) { return test{ name: "returns requeue: false and calls client.Create once when last snapshot time is before the last save time", readReplicaEnabled: true, - agentPod: &client.Pod{ - ObjectMeta: client.ObjectMeta{ + agentPod: &k8s.Pod{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ - client.PodIndexLabel: "0", + k8s.PodIndexLabel: "0", }, Annotations: map[string]string{ vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), }, }, }, - readReplicaDeployment: &client.Deployment{ - ObjectMeta: client.ObjectMeta{ + readReplicaDeployment: &k8s.Deployment{ + ObjectMeta: k8s.ObjectMeta{ Name: "deploymentName", Annotations: map[string]string{ vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), @@ -124,7 +125,7 @@ func Test_operator_podOnReconcile(t *testing.T) { }, }, want: want{ - res: client.Result{ + res: k8s.Result{ Requeue: false, }, createCalled: true, @@ -138,37 +139,37 @@ func Test_operator_podOnReconcile(t *testing.T) { return test{ name: "returns requeue: true when there is already one running job when rotation job concurrency is 1", readReplicaEnabled: true, - agentPod: &client.Pod{ - ObjectMeta: client.ObjectMeta{ + agentPod: &k8s.Pod{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ - client.PodIndexLabel: "0", + k8s.PodIndexLabel: "0", }, Annotations: map[string]string{ vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), }, }, }, - readReplicaDeployment: &client.Deployment{ - ObjectMeta: client.ObjectMeta{ + readReplicaDeployment: &k8s.Deployment{ + ObjectMeta: k8s.ObjectMeta{ Name: "deploymentName", Annotations: map[string]string{ vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), }, }, }, - runningJobs: []client.Job{ + runningJobs: []k8s.Job{ { - ObjectMeta: client.ObjectMeta{ + ObjectMeta: k8s.ObjectMeta{ Name: "already running job1", }, - Status: client.JobStatus{ + Status: k8s.JobStatus{ Active: 1, }, }, }, rotationJobConcurrency: 1, want: want{ - res: client.Result{ + res: k8s.Result{ Requeue: true, }, createCalled: false, @@ -182,37 +183,37 @@ func Test_operator_podOnReconcile(t *testing.T) { return test{ name: "returns requeue: false and create job when there is one running job when rotation job concurrency is 2", readReplicaEnabled: true, - agentPod: &client.Pod{ - ObjectMeta: client.ObjectMeta{ + agentPod: &k8s.Pod{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ - client.PodIndexLabel: "0", + k8s.PodIndexLabel: "0", }, Annotations: map[string]string{ vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), }, }, }, - readReplicaDeployment: &client.Deployment{ - ObjectMeta: client.ObjectMeta{ + readReplicaDeployment: &k8s.Deployment{ + ObjectMeta: k8s.ObjectMeta{ Name: "deploymentName", Annotations: map[string]string{ vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), }, }, }, - runningJobs: []client.Job{ + runningJobs: []k8s.Job{ { - ObjectMeta: client.ObjectMeta{ + ObjectMeta: k8s.ObjectMeta{ Name: "already running job1", }, - Status: client.JobStatus{ + Status: k8s.JobStatus{ Active: 1, }, }, }, rotationJobConcurrency: 2, want: want{ - res: client.Result{ + res: k8s.Result{ Requeue: false, }, createCalled: true, @@ -226,45 +227,45 @@ func Test_operator_podOnReconcile(t *testing.T) { return test{ name: "returns requeue: true when there are two running jobs when rotation job concurrency is 2", readReplicaEnabled: true, - agentPod: &client.Pod{ - ObjectMeta: client.ObjectMeta{ + agentPod: &k8s.Pod{ + ObjectMeta: k8s.ObjectMeta{ Labels: map[string]string{ - client.PodIndexLabel: "0", + k8s.PodIndexLabel: "0", }, Annotations: map[string]string{ vald.LastTimeSaveIndexTimestampAnnotationsKey: saveTime.Format(vald.TimeFormat), }, }, }, - readReplicaDeployment: &client.Deployment{ - ObjectMeta: client.ObjectMeta{ + readReplicaDeployment: &k8s.Deployment{ + ObjectMeta: k8s.ObjectMeta{ Name: "deploymentName", Annotations: map[string]string{ vald.LastTimeSnapshotTimestampAnnotationsKey: rotateTime.Format(vald.TimeFormat), }, }, }, - runningJobs: []client.Job{ + runningJobs: []k8s.Job{ { - ObjectMeta: client.ObjectMeta{ + ObjectMeta: k8s.ObjectMeta{ Name: "already running job1", }, - Status: client.JobStatus{ + Status: k8s.JobStatus{ Active: 1, }, }, { - ObjectMeta: client.ObjectMeta{ + ObjectMeta: k8s.ObjectMeta{ Name: "already running job2", }, - Status: client.JobStatus{ + Status: k8s.JobStatus{ Active: 1, }, }, }, rotationJobConcurrency: 2, want: want{ - res: client.Result{ + res: k8s.Result{ Requeue: true, }, createCalled: false, @@ -279,17 +280,17 @@ func Test_operator_podOnReconcile(t *testing.T) { t.Run(test.name, func(tt *testing.T) { tt.Parallel() - mock := &k8s.ValdK8sClientMock{} + mock := &mock.ValdK8sClientMock{} mock.On("LabelSelector", testify.Anything, testify.Anything, testify.Anything).Return(client.NewSelector(), nil).Maybe() mock.On("List", testify.Anything, testify.AnythingOfType("*v1.DeploymentList"), testify.Anything).Run(func(args testify.Arguments) { - arg, ok := args.Get(1).(*client.DeploymentList) + arg, ok := args.Get(1).(*k8s.DeploymentList) require.True(t, ok) - arg.Items = []client.Deployment{*test.readReplicaDeployment} + arg.Items = []k8s.Deployment{*test.readReplicaDeployment} }).Return(nil).Maybe() mock.On("List", testify.Anything, testify.AnythingOfType("*v1.JobList"), testify.Anything).Run(func(args testify.Arguments) { - arg, ok := args.Get(1).(*client.JobList) + arg, ok := args.Get(1).(*k8s.JobList) require.True(t, ok) arg.Items = test.runningJobs @@ -312,8 +313,8 @@ func Test_operator_podOnReconcile(t *testing.T) { rotationJobConcurrency: concurrency, } - op.rotatorJob = &client.Job{ - ObjectMeta: client.ObjectMeta{ + op.rotatorJob = &k8s.Job{ + ObjectMeta: k8s.ObjectMeta{ Name: "foo job", }, } diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index 78830b1d03..f03c48b866 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -26,7 +26,6 @@ import ( "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s" - "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/k8s/job" v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" benchjob "github.com/vdaas/vald/internal/k8s/vald/benchmark/job" @@ -175,7 +174,7 @@ func (o *operator) getAtomicJob() map[string]string { // jobReconcile gets k8s job list and watches theirs STATUS. // Then, it processes according STATUS. // skipcq: GO-R1005 -func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]job.Job) { +func (o *operator) jobReconcile(ctx context.Context, jobList map[string][]k8s.Job) { log.Debug("[reconcile job] start") cjobs := o.getAtomicJob() if cjobs == nil { @@ -393,26 +392,26 @@ func (o *operator) benchScenarioReconcile(ctx context.Context, scenarioList map[ // deleteBenchmarkJob deletes benchmark job resource according to given scenario name and generation. func (o *operator) deleteBenchmarkJob(ctx context.Context, name string, generation int64) error { - opts := new(client.DeleteAllOfOptions) - client.MatchingLabels(map[string]string{ + opts := new(k8s.DeleteAllOfOptions) + k8s.MatchingLabels(map[string]string{ Scenario: name + strconv.Itoa(int(generation)), }).ApplyToDeleteAllOf(opts) - client.InNamespace(o.jobNamespace).ApplyToDeleteAllOf(opts) + k8s.InNamespace(o.jobNamespace).ApplyToDeleteAllOf(opts) return o.ctrl.GetManager().GetClient().DeleteAllOf(ctx, &v1.ValdBenchmarkJob{}, opts) } // deleteJob deletes job resource according to given benchmark job name and generation. func (o *operator) deleteJob(ctx context.Context, name string) error { - cj := new(job.Job) - err := o.ctrl.GetManager().GetClient().Get(ctx, client.ObjectKey{ + cj := new(k8s.Job) + err := o.ctrl.GetManager().GetClient().Get(ctx, k8s.ObjectKey{ Namespace: o.jobNamespace, Name: name, }, cj) if err != nil { return err } - opts := new(client.DeleteOptions) - deleteProgation := client.DeletePropagationBackground + opts := new(k8s.DeleteOptions) + deleteProgation := k8s.DeletePropagationBackground opts.PropagationPolicy = &deleteProgation return o.ctrl.GetManager().GetClient().Delete(ctx, cj, opts) } @@ -560,11 +559,11 @@ func (o *operator) checkJobsStatus(ctx context.Context, jobs map[string]string) log.Infof("[check job status] no job launched") return nil } - job := new(job.Job) + job := new(k8s.Job) c := o.ctrl.GetManager().GetClient() jobStatus := map[string]v1.BenchmarkJobStatus{} for name, ns := range jobs { - err := c.Get(ctx, client.ObjectKey{ + err := c.Get(ctx, k8s.ObjectKey{ Namespace: ns, Name: name, }, job) diff --git a/pkg/tools/benchmark/operator/service/operator_test.go b/pkg/tools/benchmark/operator/service/operator_test.go index a2c130aa30..da896c6b3e 100644 --- a/pkg/tools/benchmark/operator/service/operator_test.go +++ b/pkg/tools/benchmark/operator/service/operator_test.go @@ -24,7 +24,6 @@ import ( "github.com/vdaas/vald/internal/config" "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/k8s" - "github.com/vdaas/vald/internal/k8s/job" v1 "github.com/vdaas/vald/internal/k8s/vald/benchmark/api/v1" "github.com/vdaas/vald/internal/test/goleak" "github.com/vdaas/vald/internal/test/mock" @@ -517,7 +516,7 @@ func Test_operator_jobReconcile(t *testing.T) { t.Parallel() type args struct { ctx context.Context - jobList map[string][]job.Job + jobList map[string][]k8s.Job } type fields struct { jobNamespace string @@ -553,7 +552,7 @@ func Test_operator_jobReconcile(t *testing.T) { name: "success when the length of jobList is 0.", args: args{ ctx: ctx, - jobList: map[string][]job.Job{}, + jobList: map[string][]k8s.Job{}, }, fields: fields{ jobNamespace: "default", @@ -582,14 +581,14 @@ func Test_operator_jobReconcile(t *testing.T) { name: "success with new job whose namespace is same as jobNamespace and deleted job by etcd", args: args{ ctx: ctx, - jobList: map[string][]job.Job{ + jobList: map[string][]k8s.Job{ "scenario-insert": { { ObjectMeta: metav1.ObjectMeta{ Name: "scenario-insert", Namespace: "default", }, - Status: job.JobStatus{ + Status: k8s.JobStatus{ Active: 1, }, }, @@ -672,14 +671,14 @@ func Test_operator_jobReconcile(t *testing.T) { name: "success with completed job whose namespace is same as jobNamespace", args: args{ ctx: ctx, - jobList: map[string][]job.Job{ + jobList: map[string][]k8s.Job{ "scenario-insert": { { ObjectMeta: metav1.ObjectMeta{ Name: "scenario-completed-insert", Namespace: "default", }, - Status: job.JobStatus{ + Status: k8s.JobStatus{ Active: 0, Succeeded: 1, CompletionTime: func() *metav1.Time { @@ -769,14 +768,14 @@ func Test_operator_jobReconcile(t *testing.T) { name: "success with job whose namespace is not same as jobNamespace", args: args{ ctx: ctx, - jobList: map[string][]job.Job{ + jobList: map[string][]k8s.Job{ "scenario-insert": { { ObjectMeta: metav1.ObjectMeta{ Name: "scenario-insert", Namespace: "benchmark", }, - Status: job.JobStatus{ + Status: k8s.JobStatus{ Active: 1, }, },