From c5fa2ba5c17e72d1412c0d83893aa9206dda70ca Mon Sep 17 00:00:00 2001 From: Jorge Turrado Ferrero Date: Tue, 9 Jan 2024 17:17:20 +0100 Subject: [PATCH] fix(jetstream): Raise an error if leader not found and added v2.10 e2e coverage (#5358) * fix(jetstream): Scaler doesn't print multiple (wrong) errors Signed-off-by: Jorge Turrado * Add error on failed leader search Signed-off-by: Jorge Turrado * update changelog Signed-off-by: Jorge Turrado * . Signed-off-by: Jorge Turrado --------- Signed-off-by: Jorge Turrado --- CHANGELOG.md | 1 + pkg/scalers/nats_jetstream_scaler.go | 6 +- pkg/scalers/nats_jetstream_scaler_test.go | 2 +- .../nats_jetstream/helper/nats_helper.go | 9 +- .../nats_jetstream_cluster_test.go | 89 +++++++++++++++---- 5 files changed, 81 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eabd94fd905..a28d8340687 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ Here is an overview of all new **experimental** features: - **Azure Pipelines**: No more HTTP 400 errors produced by poolName with spaces ([#5107](https://github.com/kedacore/keda/issues/5107)) - **GCP pubsub scaler**: Added `project_id` to filter for metrics queries ([#5256](https://github.com/kedacore/keda/issues/5256)) - **GCP pubsub scaler**: Missing use of default value of `value` added ([#5093](https://github.com/kedacore/keda/issues/5093)) +- **NATS JetSteam Scaler**: Raise an error if leader not found ([#5358](https://github.com/kedacore/keda/pull/5358)) - **Pulsar scaler**: Fix panic when auth is not used ([#5271](https://github.com/kedacore/keda/issues/5271)) - **ScaledJobs**: Copy ScaledJob annotations to child Jobs ([#4594](https://github.com/kedacore/keda/issues/4594)) diff --git a/pkg/scalers/nats_jetstream_scaler.go b/pkg/scalers/nats_jetstream_scaler.go index 5129730a636..2c4d59d0e9e 100644 --- a/pkg/scalers/nats_jetstream_scaler.go +++ b/pkg/scalers/nats_jetstream_scaler.go @@ -245,10 +245,7 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context clusterUrls := jetStreamServerResp.ConnectUrls if len(clusterUrls) == 0 { isNodeAdvertised = false - // append current node's `server_name` to check if it is a leader - // even though `server_name` is not an url, it will be split by first . (dot) - // to get the node's name anyway - clusterUrls = append(clusterUrls, jetStreamServerResp.ServerName) + // jetStreamServerResp.Cluster.HostUrls contains all the cluster nodes clusterUrls = append(clusterUrls, jetStreamServerResp.Cluster.HostUrls...) } @@ -311,6 +308,7 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context } } } + return fmt.Errorf("leader node not found for consumer %s", s.metadata.consumer) } return nil } diff --git a/pkg/scalers/nats_jetstream_scaler_test.go b/pkg/scalers/nats_jetstream_scaler_test.go index 1997f39e8f7..a30fa59f854 100644 --- a/pkg/scalers/nats_jetstream_scaler_test.go +++ b/pkg/scalers/nats_jetstream_scaler_test.go @@ -218,7 +218,7 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{ Accounts: []accountDetail{{Name: "$G", Streams: []*streamDetail{{Name: "mystream"}}, }}, - }, false, false}, + }, false, true}, } var testNATSJetStreamServerMockResponses = map[string][]byte{ diff --git a/tests/scalers/nats_jetstream/helper/nats_helper.go b/tests/scalers/nats_jetstream/helper/nats_helper.go index 71ccdcc280f..bbce0aca4e4 100644 --- a/tests/scalers/nats_jetstream/helper/nats_helper.go +++ b/tests/scalers/nats_jetstream/helper/nats_helper.go @@ -18,10 +18,11 @@ type JetStreamTemplateData struct { } const ( - NatsJetStreamName = "nats" - NatsJetStreamConsumerName = "PULL_CONSUMER" - NatsJetStreamChartVersion = "0.18.2" - NatsJetStreamServerVersion = "2.9.3" + NatsJetStreamName = "nats" + NatsJetStreamConsumerName = "PULL_CONSUMER" + Natsv2_10JetStreamChartVersion = "1.1.2" + NatsJetStreamChartVersion = "0.18.2" + NatsJetStreamServerVersion = "2.9.3" ) type JetStreamDeploymentTemplateData struct { diff --git a/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go b/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go index d390bec549a..b7bd1755f90 100644 --- a/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go +++ b/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go @@ -23,16 +23,17 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-test-ns", testName) - natsNamespace = fmt.Sprintf("%s-nats-ns", testName) - natsAddress = fmt.Sprintf("nats://%s.%s.svc.cluster.local:4222", nats.NatsJetStreamName, natsNamespace) - natsServerMonitoringEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace) - natsHelmRepo = "https://nats-io.github.io/k8s/helm/charts/" - natsServerReplicas = 3 - messagePublishCount = 300 - deploymentName = "sub" - minReplicaCount = 0 - maxReplicaCount = 2 + testNamespace = fmt.Sprintf("%s-test-ns", testName) + natsNamespace = fmt.Sprintf("%s-nats-ns", testName) + natsAddress = fmt.Sprintf("nats://%s.%s.svc.cluster.local:4222", nats.NatsJetStreamName, natsNamespace) + natsServerMonitoringEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace) + natsServerHeadlessMonitoringEndpoint = fmt.Sprintf("%s-headless.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace) + natsHelmRepo = "https://nats-io.github.io/k8s/helm/charts/" + natsServerReplicas = 3 + messagePublishCount = 300 + deploymentName = "sub" + minReplicaCount = 0 + maxReplicaCount = 2 ) func TestNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T) { @@ -63,7 +64,6 @@ func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3), "stream and consumer creation job with 3 stream replicas should be success") - testActivation(t, kc, testData) testScaleOut(t, kc, testData) testScaleIn(t, kc) @@ -72,24 +72,58 @@ func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise assert.True(t, WaitForJobCount(t, kc, testNamespace, 0, 60, 3), "job count in namespace should be 0") - // Create stream and consumer with 2 stream replicas + // Create single replica stream with consumer testData.NatsStream = "case2" - installStreamAndConsumer(t, 2, testData.NatsStream, testNamespace, natsAddress) + installStreamAndConsumer(t, 1, testData.NatsStream, testNamespace, natsAddress) + KubectlApplyWithTemplate(t, testData, "scaledObjectTemplate", nats.ScaledObjectTemplate) + assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3), + "stream and consumer creation job with 1 stream replica should be success") + + testScaleOut(t, kc, testData) + testScaleIn(t, kc) + + // Cleanup test namespace + removeStreamAndConsumer(t, 1, testData.NatsStream, testNamespace, natsAddress) + DeleteKubernetesResources(t, testNamespace, testData, testTemplates) + + // Cleanup nats namespace + removeClusterWithJetStream(t) + DeleteNamespace(t, natsNamespace) + deleted := WaitForNamespaceDeletion(t, natsNamespace) + assert.Truef(t, deleted, "%s namespace not deleted", natsNamespace) +} + +func TestNATSv2_10JetStreamScalerClusterWithStreamReplicas(t *testing.T) { + // Create k8s resources. + kc := GetKubernetesClient(t) + + // Deploy NATS server. + installClusterWithJetStreaV2_10(t, kc) + assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, natsServerReplicas, 60, 3), + "replica count should be %d after 3 minutes", minReplicaCount) + + // Create k8s resources for testing. + testData, testTemplates := nats.GetJetStreamDeploymentTemplateData(testNamespace, natsAddress, natsServerHeadlessMonitoringEndpoint, messagePublishCount) + CreateKubernetesResources(t, kc, testNamespace, testData, testTemplates) + + // Create 3 replica stream with consumer + testData.NatsStream = "case1" + installStreamAndConsumer(t, 3, testData.NatsStream, testNamespace, natsAddress) KubectlApplyWithTemplate(t, testData, "scaledObjectTemplate", nats.ScaledObjectTemplate) assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3), - "stream and consumer creation job with 2 stream replicas should be success") + "stream and consumer creation job with 3 stream replicas should be success") testActivation(t, kc, testData) testScaleOut(t, kc, testData) testScaleIn(t, kc) - // Remove 2 replica stream with consumer - removeStreamAndConsumer(t, 2, testData.NatsStream, testNamespace, natsAddress) + // Remove 3 replica stream with consumer + removeStreamAndConsumer(t, 3, testData.NatsStream, testNamespace, natsAddress) assert.True(t, WaitForJobCount(t, kc, testNamespace, 0, 60, 3), "job count in namespace should be 0") // Create single replica stream with consumer - testData.NatsStream = "case3" + testData.NatsStream = "case2" installStreamAndConsumer(t, 1, testData.NatsStream, testNamespace, natsAddress) KubectlApplyWithTemplate(t, testData, "scaledObjectTemplate", nats.ScaledObjectTemplate) assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3), @@ -155,6 +189,27 @@ func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset, noAdvertise bo assert.NoErrorf(t, err, "cannot execute command - %s", err) } +// installClusterWithJetStreaV2_10 install the nats helm chart with clustered jetstream enabled using v2.10 +func installClusterWithJetStreaV2_10(t *testing.T, kc *k8s.Clientset) { + CreateNamespace(t, kc, natsNamespace) + _, err := ExecuteCommand(fmt.Sprintf("helm repo add %s %s", nats.NatsJetStreamName, natsHelmRepo)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + _, err = ExecuteCommand("helm repo update") + assert.NoErrorf(t, err, "cannot execute command - %s", err) + _, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`, + nats.Natsv2_10JetStreamChartVersion, + "config.jetstream.enabled=true", + "config.jetstream.fileStorage.enabled=false", + "config.jetstream.memoryStore.enabled=true", + "config.cluster.enabled=true", + "service.enabled=true", + "service.ports.monitor.enabled=true", + fmt.Sprintf("config.cluster.replicas=%d", natsServerReplicas), + natsNamespace, + nats.NatsJetStreamName)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + // removeClusterWithJetStream uninstall the nats helm chart func removeClusterWithJetStream(t *testing.T) { _, err := ExecuteCommand(fmt.Sprintf(`helm uninstall --wait --namespace %s %s`, natsNamespace, nats.NatsJetStreamName))