From 0430e2085ed12c25fde8b366b4ab04050552aeba Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Wed, 6 Jul 2022 14:24:09 -0400 Subject: [PATCH 1/9] Sync in-memory store w/ Consul at constant interval --- internal/commands/server/server.go | 32 ++++++++++----------- internal/consul/intentions.go | 5 ++-- internal/store/memory/store.go | 45 ++++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 32 deletions(-) diff --git a/internal/commands/server/server.go b/internal/commands/server/server.go index 25c001f5d..e42450c1e 100644 --- a/internal/commands/server/server.go +++ b/internal/commands/server/server.go @@ -9,6 +9,9 @@ import ( "golang.org/x/sync/errgroup" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-hclog" + consulAdapters "github.com/hashicorp/consul-api-gateway/internal/adapters/consul" "github.com/hashicorp/consul-api-gateway/internal/consul" "github.com/hashicorp/consul-api-gateway/internal/envoy" @@ -16,8 +19,6 @@ import ( "github.com/hashicorp/consul-api-gateway/internal/metrics" "github.com/hashicorp/consul-api-gateway/internal/profiling" "github.com/hashicorp/consul-api-gateway/internal/store/memory" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" ) type ServerConfig struct { @@ -34,21 +35,10 @@ type ServerConfig struct { func RunServer(config ServerConfig) int { // Set up signal handlers and global context - ctx, cancel := context.WithCancel(config.Context) - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) - defer func() { - signal.Stop(interrupt) - cancel() - }() - go func() { - select { - case <-interrupt: - config.Logger.Debug("received shutdown signal") - cancel() - case <-ctx.Done(): - } - }() + ctx, cancel := signal.NotifyContext(config.Context, os.Interrupt, syscall.SIGTERM) + defer cancel() + + group, groupCtx := errgroup.WithContext(ctx) secretClient := envoy.NewMultiSecretClient() k8sSecretClient, err := k8s.NewK8sSecretClient(config.Logger.Named("cert-fetcher"), config.K8sConfig.RestConfig) @@ -74,6 +64,7 @@ func RunServer(config ServerConfig) int { Adapter: consulAdapters.NewSyncAdapter(config.Logger.Named("consul-adapter"), consulClient), Logger: config.Logger.Named("state"), }) + store.SyncAtInterval(groupCtx) controller.SetConsul(consulClient) controller.SetStore(store) @@ -85,7 +76,6 @@ func RunServer(config ServerConfig) int { "consul-api-gateway-controller", options, ) - group, groupCtx := errgroup.WithContext(ctx) group.Go(func() error { return certManager.Manage(groupCtx) }) @@ -100,26 +90,32 @@ func RunServer(config ServerConfig) int { } config.Logger.Trace("initial certificates written") + // Run SDS server server := envoy.NewSDSServer(config.Logger.Named("sds-server"), certManager, secretClient, store) group.Go(func() error { return server.Run(groupCtx) }) + + // Run controller group.Go(func() error { return controller.Start(groupCtx) }) + // Run metrics server if configured if config.MetricsPort != 0 { group.Go(func() error { return metrics.RunServer(groupCtx, config.Logger.Named("metrics"), fmt.Sprintf("127.0.0.1:%d", config.MetricsPort)) }) } + // Run profiling server if configured if config.ProfilingPort != 0 { group.Go(func() error { return profiling.RunServer(groupCtx, config.Logger.Named("pprof"), fmt.Sprintf("127.0.0.1:%d", config.ProfilingPort)) }) } + // Wait for any of the above to exit if err := group.Wait(); err != nil { config.Logger.Error("unexpected error", "error", err) return 1 diff --git a/internal/consul/intentions.go b/internal/consul/intentions.go index 9870a665f..bd8d2f475 100644 --- a/internal/consul/intentions.go +++ b/internal/consul/intentions.go @@ -9,10 +9,11 @@ import ( "github.com/cenkalti/backoff" - "github.com/hashicorp/consul-api-gateway/internal/common" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" + + "github.com/hashicorp/consul-api-gateway/internal/common" ) //go:generate mockgen -source ./intentions.go -destination ./mocks/intentions.go -package mocks consulDiscoveryChains,consulConfigEntries @@ -152,7 +153,7 @@ func (r *IntentionsReconciler) sourceIntention() *api.SourceIntention { // reconciler is stopped. // 2. A discoChainWatcher sends a discoChainWatchResult which will compute and added or removed discovery chain // targets and synchronize intentions. -// 3. The intentionSyncInterval is met, triggering the a ticker to fire and synchronize intentions. +// 3. The intentionSyncInterval is met, triggering the ticker to fire and synchronize intentions. // // The loop stops and returns if the struct context is cancelled. func (r *IntentionsReconciler) reconcileLoop() { diff --git a/internal/store/memory/store.go b/internal/store/memory/store.go index dc9feb972..462ada56f 100644 --- a/internal/store/memory/store.go +++ b/internal/store/memory/store.go @@ -2,20 +2,22 @@ package memory import ( "context" - "errors" "reflect" "sync" "sync/atomic" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/consul-api-gateway/internal/core" "github.com/hashicorp/consul-api-gateway/internal/metrics" "github.com/hashicorp/consul-api-gateway/internal/store" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-multierror" ) var ( - ErrCannotBindListener = errors.New("cannot bind listener") + consulSyncInterval = 60 * time.Second + startSyncLoopOnce sync.Once ) type Store struct { @@ -25,14 +27,10 @@ type Store struct { gateways map[core.GatewayID]*gatewayState routes map[string]store.Route - // This mutex acts as a stop-the-world type - // global mutex, as the store is a singleton - // what this means is that once a lock on the - // mutex is acquired, any mutable operations - // on the gateway interfaces wrapped by our - // state-building structures can happen - // concerns of thread-safety (unless they) - // spin up additional goroutines. + // This mutex acts as a stop-the-world type global mutex, as the store is a singleton. + // What this means is that once a lock on the mutex is acquired, any mutable operations + // on the gateway interfaces wrapped by our state-building structures can happen without + // concerns of thread-safety (unless they) spin up additional goroutines. mutex sync.RWMutex activeGateways int64 @@ -162,6 +160,29 @@ func (s *Store) Sync(ctx context.Context) error { return s.sync(ctx) } +// SyncAtInterval syncs the objects in the store w/ Consul at a constant interval +// until the provided context is cancelled. Calling SyncAtInterval multiple times +// will only result in a single sync loop as it should only be called during startup. +func (s *Store) SyncAtInterval(ctx context.Context) { + startSyncLoopOnce.Do(func() { + ticker := time.NewTicker(consulSyncInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := s.sync(ctx); err != nil { + s.logger.Warn("An error occurred during memory store sync, some changes may be out of sync", "error", err) + } else { + s.logger.Trace("Synced memory store in background") + } + } + } + }) +} + func (s *Store) DeleteRoute(ctx context.Context, id string) error { s.mutex.Lock() defer s.mutex.Unlock() From c3e6142689ca8203992ac5a328b538560ef7486e Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Mon, 11 Jul 2022 16:58:27 -0400 Subject: [PATCH 2/9] Add release note --- .changelog/278.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/278.txt diff --git a/.changelog/278.txt b/.changelog/278.txt new file mode 100644 index 000000000..66c43dd6d --- /dev/null +++ b/.changelog/278.txt @@ -0,0 +1,3 @@ +```release-note:bug +Sync in-memory store to Consul at a regular interval in the background +``` From 21ce7fe9815cd7cdd793a43903c43b28980271e3 Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Mon, 11 Jul 2022 17:06:59 -0400 Subject: [PATCH 3/9] Acquire read lock during sync --- internal/store/memory/store.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/store/memory/store.go b/internal/store/memory/store.go index 462ada56f..aa754d392 100644 --- a/internal/store/memory/store.go +++ b/internal/store/memory/store.go @@ -173,11 +173,13 @@ func (s *Store) SyncAtInterval(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: + s.mutex.RLock() if err := s.sync(ctx); err != nil { s.logger.Warn("An error occurred during memory store sync, some changes may be out of sync", "error", err) } else { s.logger.Trace("Synced memory store in background") } + s.mutex.RUnlock() } } }) From 8a888062fe2289ce328b6f757dc0a4c53ce0bd12 Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Tue, 12 Jul 2022 13:13:45 -0400 Subject: [PATCH 4/9] Sync in goroutine instead of main --- internal/commands/server/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/commands/server/server.go b/internal/commands/server/server.go index e42450c1e..791d4599b 100644 --- a/internal/commands/server/server.go +++ b/internal/commands/server/server.go @@ -64,7 +64,10 @@ func RunServer(config ServerConfig) int { Adapter: consulAdapters.NewSyncAdapter(config.Logger.Named("consul-adapter"), consulClient), Logger: config.Logger.Named("state"), }) - store.SyncAtInterval(groupCtx) + group.Go(func() error { + store.SyncAtInterval(groupCtx) + return nil + }) controller.SetConsul(consulClient) controller.SetStore(store) From 659fba56d3d1925c606feac45ff1d1f15ba9ab36 Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Mon, 18 Jul 2022 13:44:17 -0400 Subject: [PATCH 5/9] When interval sync triggers, mark each gateway as requiring sync This is necessary because the sync is guarded to only run if a listener on the gateway is marked as needing to sync. In this case where we sync at an interval, a listener will usually not be marked and so the sync would be skipped even if it's needed due to some drift in the consul config. --- internal/store/memory/gateway.go | 40 ++++++++++++++++++++++++-------- internal/store/memory/store.go | 11 +++++++-- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/internal/store/memory/gateway.go b/internal/store/memory/gateway.go index 879fc24eb..e61a45f10 100644 --- a/internal/store/memory/gateway.go +++ b/internal/store/memory/gateway.go @@ -3,10 +3,11 @@ package memory import ( "context" - "github.com/hashicorp/consul-api-gateway/internal/core" - "github.com/hashicorp/consul-api-gateway/internal/store" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" + + "github.com/hashicorp/consul-api-gateway/internal/core" + "github.com/hashicorp/consul-api-gateway/internal/store" ) type gatewayState struct { @@ -16,6 +17,7 @@ type gatewayState struct { adapter core.SyncAdapter listeners map[string]*listenerState secrets map[string]struct{} + needsSync bool } // newGatewayState creates a bound gateway @@ -38,6 +40,7 @@ func newGatewayState(logger hclog.Logger, gateway store.Gateway, adapter core.Sy adapter: adapter, listeners: listeners, secrets: secrets, + needsSync: false, } } @@ -94,19 +97,36 @@ func (g *gatewayState) ShouldUpdate(other store.Gateway) bool { return g.Gateway.ShouldUpdate(other) } -func (g *gatewayState) Sync(ctx context.Context) (bool, error) { - didSync := false +func (g *gatewayState) ShouldSync(ctx context.Context) bool { + if g.needsSync { + return true + } + for _, listener := range g.listeners { if listener.ShouldSync() { - g.logger.Trace("syncing gateway") - if err := g.sync(ctx); err != nil { - return false, err - } - didSync = true - break + return true + } + } + + return false +} + +func (g *gatewayState) MarkSynced() { + g.needsSync = false +} + +func (g *gatewayState) Sync(ctx context.Context) (bool, error) { + didSync := false + + if g.ShouldSync(ctx) { + g.logger.Trace("syncing gateway") + if err := g.sync(ctx); err != nil { + return false, err } + didSync = true } + g.MarkSynced() for _, listener := range g.listeners { listener.MarkSynced() } diff --git a/internal/store/memory/store.go b/internal/store/memory/store.go index aa754d392..4f11053bd 100644 --- a/internal/store/memory/store.go +++ b/internal/store/memory/store.go @@ -173,13 +173,20 @@ func (s *Store) SyncAtInterval(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - s.mutex.RLock() + s.mutex.Lock() + + // Force each gateway to sync its state even though listeners + // on the gateway may not be marked as needing a sync right now + for _, gateway := range s.gateways { + gateway.needsSync = true + } + if err := s.sync(ctx); err != nil { s.logger.Warn("An error occurred during memory store sync, some changes may be out of sync", "error", err) } else { s.logger.Trace("Synced memory store in background") } - s.mutex.RUnlock() + s.mutex.Unlock() } } }) From c0135db324af18974e98dff2a9eef19049cbb376 Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Tue, 19 Jul 2022 09:20:09 -0400 Subject: [PATCH 6/9] Add e2e test coverage for background memory store sync --- internal/commands/server/k8s_e2e_test.go | 18 +++++++++++++++++- internal/testing/e2e/gateway.go | 7 ++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index 797456f25..4065d563f 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -299,7 +299,23 @@ func TestGatewayBasic(t *testing.T) { require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time") - err := resources.Delete(ctx, gw) + // check for ingress gateway config entry + var entries []api.ConfigEntry + var err error + require.Eventually(t, func() bool { + entries, _, err = client.ConfigEntries().List(api.IngressGateway, &api.QueryOptions{Namespace: e2e.ConsulNamespace(ctx)}) + return err == nil && len(entries) == 1 + }, 5*time.Minute, checkInterval, "config entry not created in allotted time") + + // verify background re-sync of config entries after delete + _, err = client.ConfigEntries().Delete(api.IngressGateway, entries[0].GetName(), &api.WriteOptions{Namespace: e2e.ConsulNamespace(ctx)}) + require.NoError(t, err) + assert.Eventually(t, func() bool { + entries, _, err := client.ConfigEntries().List(api.IngressGateway, &api.QueryOptions{Namespace: e2e.ConsulNamespace(ctx)}) + return err == nil && len(entries) == 1 + }, 5*time.Minute, checkInterval, "config entry not recreated after delete in allotted time") + + err = resources.Delete(ctx, gw) require.NoError(t, err) require.Eventually(t, func() bool { services, _, err := client.Catalog().Service(gatewayName, "", &api.QueryOptions{ diff --git a/internal/testing/e2e/gateway.go b/internal/testing/e2e/gateway.go index 38ed2fed0..d610cbd02 100644 --- a/internal/testing/e2e/gateway.go +++ b/internal/testing/e2e/gateway.go @@ -10,12 +10,13 @@ import ( "sigs.k8s.io/e2e-framework/pkg/env" "sigs.k8s.io/e2e-framework/pkg/envconf" + "github.com/hashicorp/go-hclog" + consulAdapters "github.com/hashicorp/consul-api-gateway/internal/adapters/consul" "github.com/hashicorp/consul-api-gateway/internal/consul" "github.com/hashicorp/consul-api-gateway/internal/envoy" "github.com/hashicorp/consul-api-gateway/internal/k8s" "github.com/hashicorp/consul-api-gateway/internal/store/memory" - "github.com/hashicorp/go-hclog" ) type gatewayTestContext struct{} @@ -104,6 +105,10 @@ func (p *gatewayTestEnvironment) run(ctx context.Context, namespace string, cfg group.Go(func() error { return controller.Start(groupCtx) }) + group.Go(func() error { + store.SyncAtInterval(groupCtx) + return nil + }) p.cancel = cancel p.group = group return nil From 8d0c5b3532384aacf2591c0548972d8dd8a0cf6e Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Wed, 20 Jul 2022 13:34:26 -0500 Subject: [PATCH 7/9] Deregister service instead of only removing config entry for e2e test --- internal/commands/server/k8s_e2e_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index 4065d563f..5cd23aac1 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" @@ -294,7 +295,7 @@ func TestGatewayBasic(t *testing.T) { } service := services[0] status := service.Checks.AggregatedStatus() - return status == "passing" + return status == api.HealthPassing }, checkTimeout, checkInterval, "no healthy consul service found in the allotted time") require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time") @@ -307,12 +308,15 @@ func TestGatewayBasic(t *testing.T) { return err == nil && len(entries) == 1 }, 5*time.Minute, checkInterval, "config entry not created in allotted time") - // verify background re-sync of config entries after delete - _, err = client.ConfigEntries().Delete(api.IngressGateway, entries[0].GetName(), &api.WriteOptions{Namespace: e2e.ConsulNamespace(ctx)}) + // verify background re-sync of config entries after service delete + services, err := client.Agent().Services() require.NoError(t, err) + require.Len(t, services, 1) + require.NoError(t, client.Agent().ServiceDeregister(maps.Values(services)[0].ID)) + assert.Eventually(t, func() bool { - entries, _, err := client.ConfigEntries().List(api.IngressGateway, &api.QueryOptions{Namespace: e2e.ConsulNamespace(ctx)}) - return err == nil && len(entries) == 1 + services, err := client.Agent().Services() + return err == nil && len(services) == 1 && maps.Values(services)[0].Service == gatewayName }, 5*time.Minute, checkInterval, "config entry not recreated after delete in allotted time") err = resources.Delete(ctx, gw) From 03e30cc8e0157412c57686e4153e85bd0dfef9f8 Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Wed, 20 Jul 2022 14:34:54 -0500 Subject: [PATCH 8/9] Move sync assertions into separate feature assessment --- internal/commands/server/k8s_e2e_test.go | 84 ++++++++++++++++++------ 1 file changed, 65 insertions(+), 19 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index 5cd23aac1..e692e2c2b 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -300,36 +300,82 @@ func TestGatewayBasic(t *testing.T) { require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time") - // check for ingress gateway config entry - var entries []api.ConfigEntry - var err error + err := resources.Delete(ctx, gw) + require.NoError(t, err) + require.Eventually(t, func() bool { + services, _, err := client.Catalog().Service(gatewayName, "", &api.QueryOptions{ + Namespace: e2e.ConsulNamespace(ctx), + }) + if err != nil { + return false + } + return len(services) == 0 + }, checkTimeout, checkInterval, "consul service not deregistered in the allotted time") + + return ctx + }). + Assess("background sync into Consul", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + namespace := e2e.Namespace(ctx) + resources := cfg.Client().Resources(namespace) + gatewayName := envconf.RandomName("gw", 16) + + useHostPorts := false + gcc, gc := createGatewayClassWithParams(ctx, t, resources, GatewayClassConfigParams{ + UseHostPorts: &useHostPorts, + }) + require.Eventually(t, gatewayClassStatusCheck(ctx, resources, gc.Name, namespace, conditionAccepted), checkTimeout, checkInterval, "gatewayclass not accepted in the allotted time") + + httpsListener := createHTTPSListener(ctx, t, 443) + gw := createGateway(ctx, t, resources, gatewayName, namespace, gc, []gwv1beta1.Listener{httpsListener}) + + require.Eventually(t, func() bool { + err := resources.Get(ctx, gatewayName, namespace, &apps.Deployment{}) + return err == nil + }, checkTimeout, checkInterval, "no deployment found in the allotted time") + + require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time") + + checkGatewayConfigAnnotation(ctx, t, resources, gatewayName, namespace, gcc) + + client := e2e.ConsulClient(ctx) + queryNamespace := &api.QueryOptions{Namespace: e2e.ConsulNamespace(ctx)} + + // Verify gateway is healthy + require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time") + + // Check for service and config entries in Consul require.Eventually(t, func() bool { - entries, _, err = client.ConfigEntries().List(api.IngressGateway, &api.QueryOptions{Namespace: e2e.ConsulNamespace(ctx)}) - return err == nil && len(entries) == 1 - }, 5*time.Minute, checkInterval, "config entry not created in allotted time") + services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) + return err == nil && len(services) == 1 && services[0].Checks.AggregatedStatus() == api.HealthPassing + }, checkTimeout, checkInterval, "no healthy consul service found in the allotted time") + require.Eventually(t, func() bool { + entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) + return err == nil && len(entries) == 1 && entries[0].GetName() == gatewayName + }, 5*time.Minute, checkInterval, "ingress-gateway config entry not created in allotted time") - // verify background re-sync of config entries after service delete + // De-register Consul service. This will remove all config entries as well. services, err := client.Agent().Services() require.NoError(t, err) require.Len(t, services, 1) + require.Equal(t, gatewayName, maps.Values(services)[0].Service) require.NoError(t, client.Agent().ServiceDeregister(maps.Values(services)[0].ID)) + // Check to make sure the controller recreates the service and config entries in the background. assert.Eventually(t, func() bool { services, err := client.Agent().Services() return err == nil && len(services) == 1 && maps.Values(services)[0].Service == gatewayName - }, 5*time.Minute, checkInterval, "config entry not recreated after delete in allotted time") + }, 5*time.Minute, checkInterval, "service not recreated after delete in allotted time") + assert.Eventually(t, func() bool { + entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) + return err == nil && len(entries) == 1 && entries[0].GetName() == gatewayName + }, 5*time.Minute, checkInterval, "ingress-gateway config entry not recreated after delete in allotted time") - err = resources.Delete(ctx, gw) - require.NoError(t, err) - require.Eventually(t, func() bool { - services, _, err := client.Catalog().Service(gatewayName, "", &api.QueryOptions{ - Namespace: e2e.ConsulNamespace(ctx), - }) - if err != nil { - return false - } - return len(services) == 0 - }, checkTimeout, checkInterval, "consul service not deregistered in the allotted time") + // Clean up + require.NoError(t, resources.Delete(ctx, gw)) + assert.Eventually(t, func() bool { + services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) + return err == nil && len(services) == 0 + }, checkTimeout, checkInterval, "gateway not deleted in the allotted time") return ctx }) From f91e1b06a3b6d6519852a545a93c67a21917aa95 Mon Sep 17 00:00:00 2001 From: Nathan Coleman Date: Wed, 20 Jul 2022 16:46:26 -0500 Subject: [PATCH 9/9] Make assertions more robust --- internal/commands/server/k8s_e2e_test.go | 47 ++++++++++++++++-------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index e692e2c2b..5fc869389 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -18,7 +18,6 @@ import ( "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/exp/maps" "golang.org/x/exp/slices" apps "k8s.io/api/apps/v1" core "k8s.io/api/core/v1" @@ -314,7 +313,7 @@ func TestGatewayBasic(t *testing.T) { return ctx }). - Assess("background sync into Consul", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { + Assess("background sync into consul", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context { namespace := e2e.Namespace(ctx) resources := cfg.Client().Resources(namespace) gatewayName := envconf.RandomName("gw", 16) @@ -344,31 +343,49 @@ func TestGatewayBasic(t *testing.T) { require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time") // Check for service and config entries in Consul + var service *api.CatalogService require.Eventually(t, func() bool { services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) - return err == nil && len(services) == 1 && services[0].Checks.AggregatedStatus() == api.HealthPassing + if err == nil && len(services) == 1 && services[0].Checks.AggregatedStatus() == api.HealthPassing { + service = services[0] + return true + } + return false }, checkTimeout, checkInterval, "no healthy consul service found in the allotted time") require.Eventually(t, func() bool { entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) return err == nil && len(entries) == 1 && entries[0].GetName() == gatewayName - }, 5*time.Minute, checkInterval, "ingress-gateway config entry not created in allotted time") + }, 5*time.Minute, checkInterval, "ingress-gateway config-entry not created in allotted time") + + // De-register Consul service + _, err := client.Catalog().Deregister(&api.CatalogDeregistration{ + Node: service.Node, + ServiceID: service.ServiceID, + Namespace: service.Namespace, + }, &api.WriteOptions{Namespace: service.Namespace}) + require.NoError(t, err) + require.Eventually(t, func() bool { + services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) + return err == nil && len(services) == 0 + }, 5*time.Minute, checkInterval, "service still returned after de-registering") - // De-register Consul service. This will remove all config entries as well. - services, err := client.Agent().Services() + // Delete ingress-gateway config-entry + _, err = client.ConfigEntries().Delete(api.IngressGateway, gatewayName, &api.WriteOptions{Namespace: e2e.ConsulNamespace(ctx)}) require.NoError(t, err) - require.Len(t, services, 1) - require.Equal(t, gatewayName, maps.Values(services)[0].Service) - require.NoError(t, client.Agent().ServiceDeregister(maps.Values(services)[0].ID)) + require.Eventually(t, func() bool { + entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) + return err == nil && len(entries) == 0 + }, 5*time.Minute, checkInterval, "ingress-gateway config entry still returned after deleting") - // Check to make sure the controller recreates the service and config entries in the background. + // Check to make sure the controller recreates the service and config-entry in the background. assert.Eventually(t, func() bool { - services, err := client.Agent().Services() - return err == nil && len(services) == 1 && maps.Values(services)[0].Service == gatewayName + services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) + return err == nil && len(services) == 1 }, 5*time.Minute, checkInterval, "service not recreated after delete in allotted time") assert.Eventually(t, func() bool { - entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) - return err == nil && len(entries) == 1 && entries[0].GetName() == gatewayName - }, 5*time.Minute, checkInterval, "ingress-gateway config entry not recreated after delete in allotted time") + entry, _, err := client.ConfigEntries().Get(api.IngressGateway, gatewayName, queryNamespace) + return err == nil && entry != nil + }, 5*time.Minute, checkInterval, "ingress-gateway config-entry not recreated after delete in allotted time") // Clean up require.NoError(t, resources.Delete(ctx, gw))