Skip to content
This repository has been archived by the owner on Mar 19, 2024. It is now read-only.

Sync in-memory store w/ Consul at constant interval #278

Merged
merged 9 commits into from
Jul 22, 2022
3 changes: 3 additions & 0 deletions .changelog/278.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
Sync in-memory store to Consul at a regular interval in the background
```
85 changes: 84 additions & 1 deletion internal/commands/server/k8s_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestGatewayBasic(t *testing.T) {
}
service := services[0]
status := service.Checks.AggregatedStatus()
return status == "passing"
return status == api.HealthPassing
}, checkTimeout, checkInterval, "no healthy consul service found in the allotted time")

require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time")
Expand All @@ -311,6 +311,89 @@ func TestGatewayBasic(t *testing.T) {
return len(services) == 0
}, checkTimeout, checkInterval, "consul service not deregistered in the allotted time")

return ctx
}).
Assess("background sync into consul", func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
namespace := e2e.Namespace(ctx)
resources := cfg.Client().Resources(namespace)
gatewayName := envconf.RandomName("gw", 16)

useHostPorts := false
gcc, gc := createGatewayClassWithParams(ctx, t, resources, GatewayClassConfigParams{
UseHostPorts: &useHostPorts,
})
require.Eventually(t, gatewayClassStatusCheck(ctx, resources, gc.Name, namespace, conditionAccepted), checkTimeout, checkInterval, "gatewayclass not accepted in the allotted time")

httpsListener := createHTTPSListener(ctx, t, 443)
gw := createGateway(ctx, t, resources, gatewayName, namespace, gc, []gwv1beta1.Listener{httpsListener})

require.Eventually(t, func() bool {
err := resources.Get(ctx, gatewayName, namespace, &apps.Deployment{})
return err == nil
}, checkTimeout, checkInterval, "no deployment found in the allotted time")

require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time")

checkGatewayConfigAnnotation(ctx, t, resources, gatewayName, namespace, gcc)

client := e2e.ConsulClient(ctx)
queryNamespace := &api.QueryOptions{Namespace: e2e.ConsulNamespace(ctx)}

// Verify gateway is healthy
require.Eventually(t, gatewayStatusCheck(ctx, resources, gatewayName, namespace, conditionReady), checkTimeout, checkInterval, "no gateway found in the allotted time")

// Check for service and config entries in Consul
var service *api.CatalogService
require.Eventually(t, func() bool {
services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace)
if err == nil && len(services) == 1 && services[0].Checks.AggregatedStatus() == api.HealthPassing {
service = services[0]
return true
}
return false
}, checkTimeout, checkInterval, "no healthy consul service found in the allotted time")
require.Eventually(t, func() bool {
entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace)
return err == nil && len(entries) == 1 && entries[0].GetName() == gatewayName
}, 5*time.Minute, checkInterval, "ingress-gateway config-entry not created in allotted time")

// De-register Consul service
_, err := client.Catalog().Deregister(&api.CatalogDeregistration{
Node: service.Node,
ServiceID: service.ServiceID,
Namespace: service.Namespace,
}, &api.WriteOptions{Namespace: service.Namespace})
require.NoError(t, err)
require.Eventually(t, func() bool {
services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace)
return err == nil && len(services) == 0
}, 5*time.Minute, checkInterval, "service still returned after de-registering")

// Delete ingress-gateway config-entry
_, err = client.ConfigEntries().Delete(api.IngressGateway, gatewayName, &api.WriteOptions{Namespace: e2e.ConsulNamespace(ctx)})
require.NoError(t, err)
require.Eventually(t, func() bool {
entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace)
return err == nil && len(entries) == 0
}, 5*time.Minute, checkInterval, "ingress-gateway config entry still returned after deleting")

// Check to make sure the controller recreates the service and config-entry in the background.
assert.Eventually(t, func() bool {
services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace)
return err == nil && len(services) == 1
}, 5*time.Minute, checkInterval, "service not recreated after delete in allotted time")
assert.Eventually(t, func() bool {
entry, _, err := client.ConfigEntries().Get(api.IngressGateway, gatewayName, queryNamespace)
return err == nil && entry != nil
}, 5*time.Minute, checkInterval, "ingress-gateway config-entry not recreated after delete in allotted time")

// Clean up
require.NoError(t, resources.Delete(ctx, gw))
assert.Eventually(t, func() bool {
services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace)
return err == nil && len(services) == 0
}, checkTimeout, checkInterval, "gateway not deleted in the allotted time")

return ctx
})

Expand Down
35 changes: 17 additions & 18 deletions internal/commands/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (

"golang.org/x/sync/errgroup"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"

consulAdapters "github.com/hashicorp/consul-api-gateway/internal/adapters/consul"
"github.com/hashicorp/consul-api-gateway/internal/consul"
"github.com/hashicorp/consul-api-gateway/internal/envoy"
"github.com/hashicorp/consul-api-gateway/internal/k8s"
"github.com/hashicorp/consul-api-gateway/internal/metrics"
"github.com/hashicorp/consul-api-gateway/internal/profiling"
"github.com/hashicorp/consul-api-gateway/internal/store/memory"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
)

type ServerConfig struct {
Expand All @@ -34,21 +35,10 @@ type ServerConfig struct {

func RunServer(config ServerConfig) int {
// Set up signal handlers and global context
ctx, cancel := context.WithCancel(config.Context)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer func() {
signal.Stop(interrupt)
cancel()
}()
go func() {
select {
case <-interrupt:
config.Logger.Debug("received shutdown signal")
cancel()
case <-ctx.Done():
}
}()
ctx, cancel := signal.NotifyContext(config.Context, os.Interrupt, syscall.SIGTERM)
defer cancel()

group, groupCtx := errgroup.WithContext(ctx)

secretClient := envoy.NewMultiSecretClient()
k8sSecretClient, err := k8s.NewK8sSecretClient(config.Logger.Named("cert-fetcher"), config.K8sConfig.RestConfig)
Expand All @@ -74,6 +64,10 @@ func RunServer(config ServerConfig) int {
Adapter: consulAdapters.NewSyncAdapter(config.Logger.Named("consul-adapter"), consulClient),
Logger: config.Logger.Named("state"),
})
group.Go(func() error {
store.SyncAtInterval(groupCtx)
return nil
})

controller.SetConsul(consulClient)
controller.SetStore(store)
Expand All @@ -85,7 +79,6 @@ func RunServer(config ServerConfig) int {
"consul-api-gateway-controller",
options,
)
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
return certManager.Manage(groupCtx)
})
Expand All @@ -100,26 +93,32 @@ func RunServer(config ServerConfig) int {
}
config.Logger.Trace("initial certificates written")

// Run SDS server
server := envoy.NewSDSServer(config.Logger.Named("sds-server"), certManager, secretClient, store)
group.Go(func() error {
return server.Run(groupCtx)
})

// Run controller
group.Go(func() error {
return controller.Start(groupCtx)
})

// Run metrics server if configured
if config.MetricsPort != 0 {
group.Go(func() error {
return metrics.RunServer(groupCtx, config.Logger.Named("metrics"), fmt.Sprintf("127.0.0.1:%d", config.MetricsPort))
})
}

// Run profiling server if configured
if config.ProfilingPort != 0 {
group.Go(func() error {
return profiling.RunServer(groupCtx, config.Logger.Named("pprof"), fmt.Sprintf("127.0.0.1:%d", config.ProfilingPort))
})
}

// Wait for any of the above to exit
if err := group.Wait(); err != nil {
config.Logger.Error("unexpected error", "error", err)
return 1
Expand Down
5 changes: 3 additions & 2 deletions internal/consul/intentions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

"github.com/cenkalti/backoff"

"github.com/hashicorp/consul-api-gateway/internal/common"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/consul-api-gateway/internal/common"
)

//go:generate mockgen -source ./intentions.go -destination ./mocks/intentions.go -package mocks consulDiscoveryChains,consulConfigEntries
Expand Down Expand Up @@ -152,7 +153,7 @@ func (r *IntentionsReconciler) sourceIntention() *api.SourceIntention {
// reconciler is stopped.
// 2. A discoChainWatcher sends a discoChainWatchResult which will compute and added or removed discovery chain
// targets and synchronize intentions.
// 3. The intentionSyncInterval is met, triggering the a ticker to fire and synchronize intentions.
// 3. The intentionSyncInterval is met, triggering the ticker to fire and synchronize intentions.
//
// The loop stops and returns if the struct context is cancelled.
func (r *IntentionsReconciler) reconcileLoop() {
Expand Down
40 changes: 30 additions & 10 deletions internal/store/memory/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package memory
import (
"context"

"github.com/hashicorp/consul-api-gateway/internal/core"
"github.com/hashicorp/consul-api-gateway/internal/store"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/consul-api-gateway/internal/core"
"github.com/hashicorp/consul-api-gateway/internal/store"
)

type gatewayState struct {
Expand All @@ -16,6 +17,7 @@ type gatewayState struct {
adapter core.SyncAdapter
listeners map[string]*listenerState
secrets map[string]struct{}
needsSync bool
}

// newGatewayState creates a bound gateway
Expand All @@ -38,6 +40,7 @@ func newGatewayState(logger hclog.Logger, gateway store.Gateway, adapter core.Sy
adapter: adapter,
listeners: listeners,
secrets: secrets,
needsSync: false,
}
}

Expand Down Expand Up @@ -94,19 +97,36 @@ func (g *gatewayState) ShouldUpdate(other store.Gateway) bool {
return g.Gateway.ShouldUpdate(other)
}

func (g *gatewayState) Sync(ctx context.Context) (bool, error) {
didSync := false
func (g *gatewayState) ShouldSync(ctx context.Context) bool {
if g.needsSync {
return true
}

for _, listener := range g.listeners {
if listener.ShouldSync() {
g.logger.Trace("syncing gateway")
if err := g.sync(ctx); err != nil {
return false, err
}
didSync = true
break
return true
}
}

return false
}

func (g *gatewayState) MarkSynced() {
g.needsSync = false
}

func (g *gatewayState) Sync(ctx context.Context) (bool, error) {
didSync := false

if g.ShouldSync(ctx) {
g.logger.Trace("syncing gateway")
if err := g.sync(ctx); err != nil {
return false, err
}
didSync = true
}

g.MarkSynced()
for _, listener := range g.listeners {
listener.MarkSynced()
}
Expand Down
54 changes: 42 additions & 12 deletions internal/store/memory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package memory

import (
"context"
"errors"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/consul-api-gateway/internal/core"
"github.com/hashicorp/consul-api-gateway/internal/metrics"
"github.com/hashicorp/consul-api-gateway/internal/store"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
)

var (
ErrCannotBindListener = errors.New("cannot bind listener")
consulSyncInterval = 60 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to consider moving this into a variable for command-line override eventually -- there should only be a few API calls per-gateway, but I would imagine that for some with large numbers of gateways deployed (not the normal case), this could cause problems down the line if it's too aggressive. That said, 60s seems like it's fairly safe in like 99.9% of use-cases (i.e. non-overloaded Consul server or low number of gateways).

startSyncLoopOnce sync.Once
)

type Store struct {
Expand All @@ -25,14 +27,10 @@ type Store struct {
gateways map[core.GatewayID]*gatewayState
routes map[string]store.Route

// This mutex acts as a stop-the-world type
// global mutex, as the store is a singleton
// what this means is that once a lock on the
// mutex is acquired, any mutable operations
// on the gateway interfaces wrapped by our
// state-building structures can happen
// concerns of thread-safety (unless they)
// spin up additional goroutines.
// This mutex acts as a stop-the-world type global mutex, as the store is a singleton.
// What this means is that once a lock on the mutex is acquired, any mutable operations
// on the gateway interfaces wrapped by our state-building structures can happen without
// concerns of thread-safety (unless they) spin up additional goroutines.
mutex sync.RWMutex

activeGateways int64
Expand Down Expand Up @@ -162,6 +160,38 @@ func (s *Store) Sync(ctx context.Context) error {
return s.sync(ctx)
}

// SyncAtInterval syncs the objects in the store w/ Consul at a constant interval
// until the provided context is cancelled. Calling SyncAtInterval multiple times
// will only result in a single sync loop as it should only be called during startup.
func (s *Store) SyncAtInterval(ctx context.Context) {
startSyncLoopOnce.Do(func() {
ticker := time.NewTicker(consulSyncInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.mutex.Lock()

// Force each gateway to sync its state even though listeners
// on the gateway may not be marked as needing a sync right now
for _, gateway := range s.gateways {
gateway.needsSync = true
}

if err := s.sync(ctx); err != nil {
s.logger.Warn("An error occurred during memory store sync, some changes may be out of sync", "error", err)
} else {
s.logger.Trace("Synced memory store in background")
}
s.mutex.Unlock()
}
}
})
}

func (s *Store) DeleteRoute(ctx context.Context, id string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
Expand Down
Loading