diff --git a/.github/actions/e2e-deploy-vald-readreplica/action.yaml b/.github/actions/e2e-deploy-vald-readreplica/action.yaml new file mode 100644 index 0000000000..4bbe1316c5 --- /dev/null +++ b/.github/actions/e2e-deploy-vald-readreplica/action.yaml @@ -0,0 +1,128 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +name: "Deploy Vald Read Replica for E2E test" +description: "A action to deploy vald read replica for E2E test" + +inputs: + require_minio: + description: "If Minio is required, set this to true." + required: false + default: "false" + helm_extra_options: + description: "Extra options that passed to Helm command." + required: false + default: "" + values: + description: "Path to the values.yaml that passed to Helm command." + required: false + default: "false" + wait_for_selector: + description: "Label selector used for specifying a pod waited for" + required: false + default: "app=vald-lb-gateway" + wait_for_timeout: + description: "Timeout used for waiting for pods" + required: false + default: "600s" + use_local_charts: + description: "If you want to use local charts, set this to true." + required: false + default: "true" + default_image_tag: + description: "Default image tag. e.g) nightly, vx.x, vx.x.x" + required: true + default: "nightly" +outputs: + POD_NAME: + description: "A pod name that waited for" + value: ${{ steps.get_real_pod_name.outputs.POD_NAME }} + +runs: + using: "composite" + steps: + - name: Deploy Minio + id: deploy_minio + shell: bash + if: ${{ inputs.require_minio == 'true' }} + run: | + make K8S_SLEEP_DURATION_FOR_WAIT_COMMAND=10 k8s/external/minio/deploy + + - name: Dump Helm values + shell: bash + run: | + cat ${{ inputs.values }} + + - name: Deploy vald read replica from remote charts + shell: bash + id: deploy_vald_readreplica_remote + if: ${{ inputs.use_local_charts == 'false' }} + run: | + helm install \ + --values ${VALUES} \ + --set defaults.image.tag=${DEFAULT_IMAGE_TAG} \ + ${HELM_EXTRA_OPTIONS} \ + --generate-name charts/vald-readreplica + + sleep 3 + + kubectl wait --for=condition=ready pod -l ${WAIT_FOR_SELECTOR} --timeout=${WAIT_FOR_TIMEOUT} + + kubectl get pods + + podname=`kubectl get pods --selector=${WAIT_FOR_SELECTOR} | tail -1 | awk '{print $1}'` + echo "POD_NAME=${podname}" >> $GITHUB_OUTPUT + env: + DEFAULT_IMAGE_TAG: ${{ inputs.default_image_tag }} + VALUES: ${{ inputs.values }} + HELM_EXTRA_OPTIONS: ${{ inputs.helm_extra_options }} + WAIT_FOR_SELECTOR: ${{ inputs.wait_for_selector }} + WAIT_FOR_TIMEOUT: ${{ inputs.wait_for_timeout }} + + - name: Deploy vald read replica from local charts + shell: bash + id: deploy_vald_readreplica_local + if: ${{ inputs.use_local_charts == 'true' }} + run: | + make k8s/vald-readreplica/deploy VERSION=${DEFAULT_IMAGE_TAG} HELM_VALUES=${VALUES} HELM_EXTRA_OPTIONS="${HELM_EXTRA_OPTIONS}" + + sleep 3 + + kubectl wait --for=condition=ready pod -l ${WAIT_FOR_SELECTOR} --timeout=${WAIT_FOR_TIMEOUT} + + kubectl get pods + + podname=`kubectl get pods --selector=${WAIT_FOR_SELECTOR} | tail -1 | awk '{print $1}'` + echo "POD_NAME=${podname}" >> $GITHUB_OUTPUT + env: + DEFAULT_IMAGE_TAG: ${{ inputs.default_image_tag }} + VALUES: ${{ inputs.values }} + HELM_EXTRA_OPTIONS: ${{ inputs.helm_extra_options }} + WAIT_FOR_SELECTOR: ${{ inputs.wait_for_selector }} + WAIT_FOR_TIMEOUT: ${{ inputs.wait_for_timeout }} + + - name: Get real pod name + shell: bash + id: get_real_pod_name + env: + PODNAME_LOCAL_DEPLOY: ${{ steps.deploy_vald_readreplica_local.outputs.POD_NAME }} + PODNAME_REMOTE_DEPLOY: ${{ steps.deploy_vald_readreplica_remote.outputs.POD_NAME }} + # Set GITHUB_OUTPUT to the not empty one, PODNAME_LOCAL_DEPLOY or PODNAME_REMOTE_DEPLOY + run: | + if [[ -n "${PODNAME_LOCAL_DEPLOY}" ]]; then + echo "POD_NAME=${PODNAME_LOCAL_DEPLOY}" >> $GITHUB_OUTPUT + else + echo "POD_NAME=${PODNAME_REMOTE_DEPLOY}" >> $GITHUB_OUTPUT + fi diff --git a/.github/actions/setup-e2e/action.yaml b/.github/actions/setup-e2e/action.yaml index a2a17bbcef..19fd24100a 100644 --- a/.github/actions/setup-e2e/action.yaml +++ b/.github/actions/setup-e2e/action.yaml @@ -33,6 +33,10 @@ inputs: description: "If k3d is not required, set this to false" required: false default: "true" + require_minikube: + description: "If minikube is not required, set this to true and set require_k3d to false" + required: false + default: "false" ingress_port: description: 'If it is not "0", ingress will be exposed to the specified port' required: false @@ -94,6 +98,13 @@ runs: agents: 3 ingress_port: ${{ inputs.ingress_port }} + - name: Setup Minikube environment + if: ${{ inputs.require_minikube == 'true' }} + shell: bash + run: | + make minikube/install + make minikube/start + - name: Check Kubernetes cluster shell: bash run: | diff --git a/.github/helm/values/values-readreplica.yaml b/.github/helm/values/values-readreplica.yaml new file mode 100644 index 0000000000..2e4b86b89f --- /dev/null +++ b/.github/helm/values/values-readreplica.yaml @@ -0,0 +1,89 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +defaults: + logging: + level: info + networkPolicy: + enabled: true + +gateway: + lb: + enabled: true + minReplicas: 1 + hpa: + enabled: false + resources: + requests: + cpu: 100m + memory: 50Mi + gateway_config: + index_replica: 3 + +agent: + minReplicas: 3 + maxReplicas: 10 + podManagementPolicy: Parallel + hpa: + enabled: false + resources: + requests: + cpu: 100m + memory: 50Mi + ngt: + auto_index_duration_limit: 2m + auto_index_check_duration: 30s + auto_index_length: 1000 + dimension: 784 + index_path: /var/ngt/index + enable_in_memory_mode: false + persistentVolume: + enabled: true + accessMode: ReadWriteOnce + storageClass: csi-hostpath-sc + size: 1Gi + readreplica: + enabled: true + snapshot_classname: "csi-hostpath-snapclass" + replica: 1 + +discoverer: + minReplicas: 1 + hpa: + enabled: false + resources: + requests: + cpu: 100m + memory: 50Mi + +manager: + index: + replicas: 1 + resources: + requests: + cpu: 100m + memory: 30Mi + indexer: + auto_index_duration_limit: 2m + auto_index_check_duration: 30s + auto_index_length: 1000 + readreplica: + rotator: + enabled: true + initContainers: [] + env: + - name: MY_TARGET_REPLICA_ID + value: "0" diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 48a8eb11e5..f5e57d0838 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -319,6 +319,61 @@ jobs: env: POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }} + e2e-stream-crud-with-readreplica: + name: "E2E test (Stream CRUD) with read replica" + needs: [dump-contexts-to-log] + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v4 + + - name: Set Git config + run: | + git config --global --add safe.directory ${GITHUB_WORKSPACE} + + - name: Setup E2E environment + id: setup_e2e + uses: ./.github/actions/setup-e2e + with: + require_k3d: "false" + require_minikube: "true" + + - name: Deploy Vald + id: deploy_vald + uses: ./.github/actions/e2e-deploy-vald + with: + helm_extra_options: ${{ steps.setup_e2e.outputs.HELM_EXTRA_OPTIONS }} + values: .github/helm/values/values-readreplica.yaml + wait_for_selector: app=vald-agent-ngt + + - name: Deploy Vald Read Replica + id: deploy_vald_readreplica + uses: ./.github/actions/e2e-deploy-vald-readreplica + with: + default_image_tag: ${{ steps.setup_e2e.outputs.DEFAULT_IMAGE_TAG }} + helm_extra_options: ${{ steps.setup_e2e.outputs.HELM_EXTRA_OPTIONS }} + values: .github/helm/values/values-readreplica.yaml + wait_for_selector: app=vald-lb-gateway + + - name: Run E2E CRUD with read replica rotation + run: | + make hack/benchmark/assets/dataset/${{ env.DATASET }} + make E2E_BIND_PORT=8081 \ + E2E_DATASET_NAME=${{ env.DATASET }} \ + E2E_INSERT_COUNT=1000 \ + E2E_SEARCH_COUNT=1000 \ + E2E_SEARCH_BY_ID_COUNT=1000 \ + E2E_GET_OBJECT_COUNT=100 \ + E2E_UPDATE_COUNT=100 \ + E2E_UPSERT_COUNT=100 \ + E2E_REMOVE_COUNT=100 \ + E2E_WAIT_FOR_CREATE_INDEX_DURATION=3m \ + E2E_TARGET_POD_NAME=${POD_NAME} \ + E2E_TARGET_NAMESPACE=default \ + e2e/readreplica + env: + POD_NAME: ${{ steps.deploy_vald_readreplica.outputs.POD_NAME }} + e2e-stream-crud-with-mirror: name: "E2E test (Stream CRUD) with mirror" needs: [dump-contexts-to-log] @@ -391,6 +446,7 @@ jobs: - e2e-stream-crud-skip-exist-check - e2e-stream-crud-under-index-management-jobs - e2e-stream-crud-with-mirror + - e2e-stream-crud-with-readreplica runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/Makefile.d/e2e.mk b/Makefile.d/e2e.mk index 737121126a..544ad5d9f0 100644 --- a/Makefile.d/e2e.mk +++ b/Makefile.d/e2e.mk @@ -74,6 +74,11 @@ e2e/insert/search: e2e/index/job/correction: $(call run-e2e-crud-test,-run TestE2EIndexJobCorrection) +.PHONY: e2e/readreplica +## run readreplica e2e +e2e/readreplica: + $(call run-e2e-crud-test,-run TestE2EReadReplica) + .PHONY: e2e/maxdim ## run e2e/maxdim e2e/maxdim: diff --git a/pkg/index/job/readreplica/rotate/service/rotator.go b/pkg/index/job/readreplica/rotate/service/rotator.go index b5bd90b67e..5a1fca4a34 100644 --- a/pkg/index/job/readreplica/rotate/service/rotator.go +++ b/pkg/index/job/readreplica/rotate/service/rotator.go @@ -22,7 +22,7 @@ import ( snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1" "github.com/vdaas/vald/internal/errors" - client "github.com/vdaas/vald/internal/k8s/client" + "github.com/vdaas/vald/internal/k8s/client" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/observability/trace" "github.com/vdaas/vald/internal/sync/errgroup" diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index c8b333ded8..237d33be2c 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -25,6 +25,7 @@ import ( "fmt" "os" "os/exec" + "strconv" "testing" "time" @@ -34,6 +35,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/tests/e2e/hdf5" "github.com/vdaas/vald/tests/e2e/kubernetes/client" + "github.com/vdaas/vald/tests/e2e/kubernetes/kubectl" "github.com/vdaas/vald/tests/e2e/kubernetes/portforward" "github.com/vdaas/vald/tests/e2e/operation" ) @@ -64,6 +66,7 @@ var ( kubeClient client.Client namespace string + kubeConfig string forwarder *portforward.Portforward ) @@ -98,20 +101,19 @@ func init() { pfPodName := flag.String("portforward-pod-name", "vald-gateway-0", "pod name (only for port forward)") pfPodPort := flag.Int("portforward-pod-port", port, "pod gRPC port (only for port forward)") - kubeConfig := flag.String("kubeconfig", file.Join(os.Getenv("HOME"), ".kube", "config"), "kubeconfig path") + flag.StringVar(&kubeConfig, "kubeconfig", file.Join(os.Getenv("HOME"), ".kube", "config"), "kubeconfig path") flag.StringVar(&namespace, "namespace", "default", "namespace") flag.Parse() var err error if *pf { - kubeClient, err = client.New(*kubeConfig) + kubeClient, err = client.New(kubeConfig) if err != nil { panic(err) } forwarder = kubeClient.Portforward(namespace, *pfPodName, port, *pfPodPort) - err = forwarder.Start() if err != nil { panic(err) @@ -751,7 +753,6 @@ func TestE2ECRUDWithSkipStrictExistCheck(t *testing.T) { // TestE2EIndexJobCorrection tests the index correction job. // It inserts vectors, runs the index correction job, and then removes the vectors. -// TODO: Add index replica count check after inplementing StreamListObject in LB func TestE2EIndexJobCorrection(t *testing.T) { t.Cleanup(teardown) ctx := context.Background() @@ -823,3 +824,132 @@ func TestE2EIndexJobCorrection(t *testing.T) { t.Fatalf("an error occurred: %s", err) } } + +func TestE2EReadReplica(t *testing.T) { + t.Cleanup(teardown) + + if kubeClient == nil { + var err error + kubeClient, err = client.New(kubeConfig) + if err != nil { + t.Skipf("TestE2EReadReplica needs kubernetes client but failed to create one: %s", err) + } + } + + ctx := context.Background() + + op, err := operation.New(host, port) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Insert(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + + t.Log("starting to restart all the agent pods to make it backup index to pvc...") + if err := kubectl.RolloutResource(ctx, t, "statefulsets/vald-agent-ngt"); err != nil { + t.Fatalf("failed to restart all the agent pods: %s", err) + } + + t.Log("starting to create read replica rotators...") + pods, err := kubeClient.GetPods(ctx, namespace, "app=vald-agent-ngt") + if err != nil { + t.Fatalf("GetPods failed: %s", err) + } + cronJobs, err := kubeClient.ListCronJob(ctx, namespace, "app=vald-readreplica-rotate") + if err != nil { + t.Fatalf("ListCronJob failed: %s", err) + } + cronJob := cronJobs[0] + for id := 0; id < len(pods); id++ { + cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env[0].Value = strconv.Itoa(id) + kubeClient.CreateJobFromCronJob(ctx, "vald-readreplica-rotate-"+strconv.Itoa(id), namespace, &cronJob) + } + + t.Log("waiting for read replica rotator jobs to complete...") + if err := kubectl.WaitResources(ctx, t, "job", "app=vald-readreplica-rotate", "complete", "120s"); err != nil { + t.Fatalf("failed to wait for read replica rotator jobs to complete: %s", err) + } + + err = op.Search(t, ctx, operation.Dataset{ + Test: ds.Test[searchFrom : searchFrom+searchNum], + Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.SearchByID(t, ctx, operation.Dataset{ + Train: ds.Train[searchByIDFrom : searchByIDFrom+searchByIDNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.LinearSearch(t, ctx, operation.Dataset{ + Test: ds.Test[searchFrom : searchFrom+searchNum], + Neighbors: ds.Neighbors[searchFrom : searchFrom+searchNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.LinearSearchByID(t, ctx, operation.Dataset{ + Train: ds.Train[searchByIDFrom : searchByIDFrom+searchByIDNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Exists(t, ctx, "0") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.GetObject(t, ctx, operation.Dataset{ + Train: ds.Train[getObjectFrom : getObjectFrom+getObjectNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.StreamListObject(t, ctx, operation.Dataset{ + Train: ds.Train[insertFrom : insertFrom+insertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Update(t, ctx, operation.Dataset{ + Train: ds.Train[updateFrom : updateFrom+updateNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Upsert(t, ctx, operation.Dataset{ + Train: ds.Train[upsertFrom : upsertFrom+upsertNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Remove(t, ctx, operation.Dataset{ + Train: ds.Train[removeFrom : removeFrom+removeNum], + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + // Remove all vector data after the current - 1 hour. + err = op.RemoveByTimestamp(t, ctx, time.Now().Add(-time.Hour).UnixNano()) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } +} diff --git a/tests/e2e/kubernetes/client/client.go b/tests/e2e/kubernetes/client/client.go index 2ced2f3cfa..e97aba2d01 100644 --- a/tests/e2e/kubernetes/client/client.go +++ b/tests/e2e/kubernetes/client/client.go @@ -27,6 +27,7 @@ import ( "github.com/vdaas/vald/internal/file" "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/tests/e2e/kubernetes/portforward" + v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -58,6 +59,15 @@ type Client interface { namespace, name string, timeout time.Duration, ) (ok bool, err error) + ListCronJob( + ctx context.Context, + namespace, labelSelector string, + ) ([]v1.CronJob, error) + CreateJobFromCronJob( + ctx context.Context, + name, namespace string, + cronJob *v1.CronJob, + ) error } type client struct { @@ -109,7 +119,6 @@ func (cli *client) GetPod( if err != nil { return nil, err } - return pod, nil } @@ -174,3 +183,27 @@ func (cli *client) WaitForPodReady( } } } + +func (cli *client) ListCronJob(ctx context.Context, namespace, labelSelector string) ([]v1.CronJob, error) { + cronJobs, err := cli.clientset.BatchV1().CronJobs(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, err + } + + return cronJobs.Items, nil +} + +func (cli *client) CreateJobFromCronJob(ctx context.Context, name, namespace string, cronJob *v1.CronJob) error { + job := &v1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: cronJob.Spec.JobTemplate.Spec, + } + + _, err := cli.clientset.BatchV1().Jobs(namespace).Create(ctx, job, metav1.CreateOptions{}) + return err +} diff --git a/tests/e2e/kubernetes/kubectl/kubectl.go b/tests/e2e/kubernetes/kubectl/kubectl.go new file mode 100644 index 0000000000..2237507fb1 --- /dev/null +++ b/tests/e2e/kubernetes/kubectl/kubectl.go @@ -0,0 +1,64 @@ +//go:build e2e + +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package kubectl + +import ( + "context" + "fmt" + "os/exec" + "testing" + + "github.com/vdaas/vald/internal/errors" +) + +// RolloutResource rollouts and wait for the resource to be ready. +func RolloutResource(ctx context.Context, t *testing.T, resource string) error { + t.Helper() + + cmd := exec.CommandContext(ctx, "kubectl", "rollout", "restart", resource) + if err := runCmd(t, cmd); err != nil { + return err + } + + cmd = exec.CommandContext(ctx, "kubectl", "rollout", "status", resource) + return runCmd(t, cmd) +} + +// WaitResources waits for multiple resources to be ready. +func WaitResources(ctx context.Context, t *testing.T, resource, labelSelector, condition, timeout string) error { + t.Helper() + + cmd := exec.CommandContext(ctx, "kubectl", "wait", "--for=condition="+condition, "-l", labelSelector, "--timeout", timeout, resource) + return runCmd(t, cmd) +} + +func runCmd(t *testing.T, cmd *exec.Cmd) error { + t.Helper() + out, err := cmd.Output() + if err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + return errors.New(string(exitErr.Stderr)) + } else { + return fmt.Errorf("unexpected error: %w", err) + } + } + t.Log(string(out)) + return nil +}