Skip to content
This repository has been archived by the owner on Mar 19, 2024. It is now read-only.

Commit

Permalink
Extract binding logic into Binder (#332)
Browse files Browse the repository at this point in the history
* Extract binding logic into Binder

* Remove unnecessary TODO

* Restore prefix for route conversion

* Remove K8sRoute funcs that are no longer part of interface

* Update unit test to use RouteState.Bind

* Remove unused function

* Remove unused functions on K8sGateway

* Include existing ingress on sync state for Consul adapter

* Rework binder.Bind() to handle routes that no longer reference gateway

Also restructures some code to favor early returns over nested blocks, adds some comments, etc.

* Remove optimization preventing background sync into Consul

When a resource is removed out-of-band, such as via the Consul API, the in-memory store is unaware of it. Since the store still contains the deleted resource, the background sync will always early-out and thus we lose the self-healing benefit that the background sync provides.

* Remove unused function

* Un-export package-local binder, add docstrings
  • Loading branch information
nathancoleman authored Sep 1, 2022
1 parent 5856c23 commit 8f90401
Show file tree
Hide file tree
Showing 24 changed files with 1,123 additions and 1,094 deletions.
56 changes: 31 additions & 25 deletions internal/adapters/consul/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
"sync"
"time"

"github.com/hashicorp/consul-api-gateway/internal/common"
"github.com/hashicorp/consul-api-gateway/internal/consul"
"github.com/hashicorp/consul-api-gateway/internal/core"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"

"github.com/hashicorp/consul-api-gateway/internal/common"
"github.com/hashicorp/consul-api-gateway/internal/consul"
"github.com/hashicorp/consul-api-gateway/internal/core"
)

type syncState struct {
ingress *api.IngressGatewayConfigEntry
routers *consul.ConfigEntryIndex
splitters *consul.ConfigEntryIndex
defaults *consul.ConfigEntryIndex
Expand Down Expand Up @@ -208,19 +210,20 @@ func discoveryChain(gateway core.ResolvedGateway) (*api.IngressGatewayConfigEntr
return ingress, routers, splitters, defaults
}

func (a *SyncAdapter) entriesForGateway(id core.GatewayID) (*consul.ConfigEntryIndex, *consul.ConfigEntryIndex, *consul.ConfigEntryIndex) {
func (a *SyncAdapter) entriesForGateway(id core.GatewayID) (*api.IngressGatewayConfigEntry, *consul.ConfigEntryIndex, *consul.ConfigEntryIndex, *consul.ConfigEntryIndex) {
existing, found := a.sync[id]
if !found {
routers := consul.NewConfigEntryIndex(api.ServiceRouter)
splitters := consul.NewConfigEntryIndex(api.ServiceSplitter)
defaults := consul.NewConfigEntryIndex(api.ServiceDefaults)
return routers, splitters, defaults
return nil, routers, splitters, defaults
}
return existing.routers, existing.splitters, existing.defaults
return existing.ingress, existing.routers, existing.splitters, existing.defaults
}

func (a *SyncAdapter) setEntriesForGateway(gateway core.ResolvedGateway, routers *consul.ConfigEntryIndex, splitters *consul.ConfigEntryIndex, defaults *consul.ConfigEntryIndex) {
func (a *SyncAdapter) setEntriesForGateway(gateway core.ResolvedGateway, ingress *api.IngressGatewayConfigEntry, routers *consul.ConfigEntryIndex, splitters *consul.ConfigEntryIndex, defaults *consul.ConfigEntryIndex) {
a.sync[gateway.ID] = syncState{
ingress: ingress,
routers: routers,
splitters: splitters,
defaults: defaults,
Expand Down Expand Up @@ -257,12 +260,7 @@ func (a *SyncAdapter) Clear(ctx context.Context, id core.GatewayID) error {
defer a.logger.Trace("entries cleared", "time", time.Now(), "spent", time.Since(started))
}

ingress := &api.IngressGatewayConfigEntry{
Kind: api.IngressGateway,
Name: id.Service,
Namespace: id.ConsulNamespace,
}
existingRouters, existingSplitters, existingDefaults := a.entriesForGateway(id)
ingress, existingRouters, existingSplitters, existingDefaults := a.entriesForGateway(id)
removedRouters := existingRouters.ToArray()
removedSplitters := existingSplitters.ToArray()
removedDefaults := existingDefaults.ToArray()
Expand Down Expand Up @@ -299,7 +297,7 @@ func (a *SyncAdapter) Clear(ctx context.Context, id core.GatewayID) error {
return nil
}

func (a *SyncAdapter) Sync(ctx context.Context, gateway core.ResolvedGateway) error {
func (a *SyncAdapter) Sync(ctx context.Context, gateway core.ResolvedGateway) (bool, error) {
a.mutex.Lock()
defer a.mutex.Unlock()

Expand All @@ -314,7 +312,7 @@ func (a *SyncAdapter) Sync(ctx context.Context, gateway core.ResolvedGateway) er
}

ingress, computedRouters, computedSplitters, computedDefaults := discoveryChain(gateway)
existingRouters, existingSplitters, existingDefaults := a.entriesForGateway(gateway.ID)
_, existingRouters, existingSplitters, existingDefaults := a.entriesForGateway(gateway.ID)

// Since we can't make multiple config entry changes in a single transaction we must
// perform the operations in a set that is least likely to induce downtime.
Expand All @@ -330,6 +328,14 @@ func (a *SyncAdapter) Sync(ctx context.Context, gateway core.ResolvedGateway) er
removedDefaults := computedDefaults.Difference(existingDefaults).ToArray()

if a.logger.IsTrace() {
started := time.Now()
resolved, err := json.MarshalIndent(gateway, "", " ")
if err == nil {
a.logger.Trace("reconciling gateway snapshot", "gateway", string(resolved))
}
a.logger.Trace("started reconciliation", "time", started)
defer a.logger.Trace("reconciliation finished", "time", time.Now(), "spent", time.Since(started))

ingressEntry, err := json.MarshalIndent(ingress, "", " ")
if err == nil {
a.logger.Trace("setting ingress", "items", string(ingressEntry))
Expand All @@ -346,33 +352,33 @@ func (a *SyncAdapter) Sync(ctx context.Context, gateway core.ResolvedGateway) er

// defaults need to go first, otherwise the routers are always configured to use tcp
if err := a.setConfigEntries(ctx, addedDefaults...); err != nil {
return fmt.Errorf("error adding service defaults config entries: %w", err)
return false, fmt.Errorf("error adding service defaults config entries: %w", err)
}
if err := a.setConfigEntries(ctx, addedRouters...); err != nil {
return fmt.Errorf("error adding service router config entries: %w", err)
return false, fmt.Errorf("error adding service router config entries: %w", err)
}
if err := a.setConfigEntries(ctx, addedSplitters...); err != nil {
return fmt.Errorf("error adding service splitter config entries: %w", err)
return false, fmt.Errorf("error adding service splitter config entries: %w", err)
}

if err := a.setConfigEntries(ctx, ingress); err != nil {
return fmt.Errorf("error adding ingress config entry: %w", err)
return false, fmt.Errorf("error adding ingress config entry: %w", err)
}

if err := a.deleteConfigEntries(ctx, removedRouters...); err != nil {
return fmt.Errorf("error removing service router config entries: %w", err)
return false, fmt.Errorf("error removing service router config entries: %w", err)
}
if err := a.deleteConfigEntries(ctx, removedSplitters...); err != nil {
return fmt.Errorf("error removing service splitter config entries: %w", err)
return false, fmt.Errorf("error removing service splitter config entries: %w", err)
}
if err := a.deleteConfigEntries(ctx, removedDefaults...); err != nil {
return fmt.Errorf("error removing service defaults config entries: %w", err)
return false, fmt.Errorf("error removing service defaults config entries: %w", err)
}

a.setEntriesForGateway(gateway, computedRouters, computedSplitters, computedDefaults)
a.setEntriesForGateway(gateway, ingress, computedRouters, computedSplitters, computedDefaults)
if err := a.syncIntentionsForGateway(gateway.ID, ingress); err != nil {
return fmt.Errorf("error syncing service intention config entries: %w", err)
return false, fmt.Errorf("error syncing service intention config entries: %w", err)
}

return nil
return true, nil
}
2 changes: 1 addition & 1 deletion internal/adapters/consul/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestConsulSyncAdapter_Sync(t *testing.T) {
}},
}

err = adapter.Sync(ctx, gateway)
_, err = adapter.Sync(ctx, gateway)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (
// SyncAdapter is used for synchronizing store state to
// an external system
type SyncAdapter interface {
Sync(ctx context.Context, gateway ResolvedGateway) error
Sync(ctx context.Context, gateway ResolvedGateway) (bool, error)
Clear(ctx context.Context, id GatewayID) error
}
14 changes: 10 additions & 4 deletions internal/envoy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@ import (
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul-api-gateway/internal/metrics"
"github.com/hashicorp/consul-api-gateway/internal/store"
)

// RequestHandler implements the handlers for an SDS Delta server
type RequestHandler struct {
logger hclog.Logger
secretManager SecretManager
registry GatewaySecretRegistry
store store.Store
nodeMap sync.Map
streamContexts sync.Map
activeStreams int64
}

// NewRequestHandler initializes a RequestHandler instance and wraps it in a github.com/envoyproxy/go-control-plane/pkg/server/v3,(*CallbackFuncs)
// so that it can be used by the stock go-control-plane server implementation
func NewRequestHandler(logger hclog.Logger, registry GatewaySecretRegistry, secretManager SecretManager) *server.CallbackFuncs {
func NewRequestHandler(logger hclog.Logger, store store.Store, secretManager SecretManager) *server.CallbackFuncs {
handler := &RequestHandler{
registry: registry,
store: store,
logger: logger,
secretManager: secretManager,
}
Expand Down Expand Up @@ -85,7 +86,12 @@ func (r *RequestHandler) OnStreamRequest(streamID int64, req *discovery.Discover
resources := req.GetResourceNames()

// check to make sure we're actually authorized to do this
allowed, err := r.registry.CanFetchSecrets(ctx, GatewayFromContext(ctx), resources)
gateway, err := r.store.GetGateway(ctx, GatewayFromContext(ctx))
if err != nil {
r.logger.Error("error fetching gateway", "error", err)
return err
}
allowed, err := gateway.CanFetchSecrets(ctx, resources)
if err != nil {
r.logger.Error("error checking gateway secrets", "error", err)
return err
Expand Down
50 changes: 31 additions & 19 deletions internal/envoy/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul-api-gateway/internal/envoy/mocks"
storeMocks "github.com/hashicorp/consul-api-gateway/internal/store/mocks"

"github.com/hashicorp/go-hclog"
)

Expand All @@ -27,9 +29,11 @@ func TestOnStreamRequest(t *testing.T) {
defer ctrl.Finish()

secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
registry.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any(), requestedSecrets).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
gateway := storeMocks.NewMockGateway(ctrl)
store.EXPECT().GetGateway(gomock.Any(), gomock.Any()).Return(gateway, nil)
gateway.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any()).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

request := &discovery.DiscoveryRequest{
ResourceNames: requestedSecrets,
Expand Down Expand Up @@ -57,9 +61,11 @@ func TestOnStreamRequest_PermissionError(t *testing.T) {
defer ctrl.Finish()

secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
registry.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any(), requestedSecrets).Return(false, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
gateway := storeMocks.NewMockGateway(ctrl)
store.EXPECT().GetGateway(gomock.Any(), gomock.Any()).Return(gateway, nil)
gateway.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any()).Return(false, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

request := &discovery.DiscoveryRequest{
ResourceNames: requestedSecrets,
Expand Down Expand Up @@ -87,9 +93,11 @@ func TestOnStreamRequest_SetResourcesForNodeError(t *testing.T) {

expectedErr := errors.New("error")
secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
registry.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any(), requestedSecrets).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
gateway := storeMocks.NewMockGateway(ctrl)
store.EXPECT().GetGateway(gomock.Any(), gomock.Any()).Return(gateway, nil)
gateway.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any()).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

request := &discovery.DiscoveryRequest{
ResourceNames: requestedSecrets,
Expand Down Expand Up @@ -117,9 +125,11 @@ func TestOnStreamRequest_Graceful(t *testing.T) {
defer ctrl.Finish()

secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
registry.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any(), requestedSecrets).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
gateway := storeMocks.NewMockGateway(ctrl)
store.EXPECT().GetGateway(gomock.Any(), gomock.Any()).Return(gateway, nil)
gateway.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any()).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

request := &discovery.DiscoveryRequest{
ResourceNames: requestedSecrets,
Expand All @@ -146,9 +156,11 @@ func TestOnStreamClosed(t *testing.T) {
defer ctrl.Finish()

secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
registry.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any(), requestedSecrets).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
gateway := storeMocks.NewMockGateway(ctrl)
store.EXPECT().GetGateway(gomock.Any(), gomock.Any()).Return(gateway, nil)
gateway.EXPECT().CanFetchSecrets(gomock.Any(), gomock.Any()).Return(true, nil)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

request := &discovery.DiscoveryRequest{
ResourceNames: requestedSecrets,
Expand All @@ -173,8 +185,8 @@ func TestOnStreamClosed_Graceful(t *testing.T) {
defer ctrl.Finish()

secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

// no-ops instead of panics without setting up the stream context in the open call
handler.OnStreamClosed(1)
Expand All @@ -187,8 +199,8 @@ func TestOnStreamOpen(t *testing.T) {
defer ctrl.Finish()

secrets := mocks.NewMockSecretManager(ctrl)
registry := mocks.NewMockGatewaySecretRegistry(ctrl)
handler := NewRequestHandler(hclog.NewNullLogger(), registry, secrets)
store := storeMocks.NewMockStore(ctrl)
handler := NewRequestHandler(hclog.NewNullLogger(), store, secrets)

// errors on non secret requests
err := handler.OnStreamOpen(context.Background(), 1, resource.ClusterType)
Expand Down
24 changes: 8 additions & 16 deletions internal/envoy/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/hashicorp/consul-api-gateway/internal/core"
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul-api-gateway/internal/core"
"github.com/hashicorp/consul-api-gateway/internal/store"
)

//go:generate mockgen -source ./middleware.go -destination ./mocks/middleware.go -package mocks GatewaySecretRegistry
Expand Down Expand Up @@ -48,30 +50,20 @@ func GatewayFromContext(ctx context.Context) core.GatewayID {
return value.(core.GatewayID)
}

// GatewaySecretRegistry is used as the authority for determining what gateways the SDS server
// should actually respond to because they're managed by consul-api-gateway
type GatewaySecretRegistry interface {
// GatewayExists is used to determine whether or not we know a particular gateway instance
GatewayExists(ctx context.Context, info core.GatewayID) (bool, error)
// CanFetchSecrets is used to determine whether a gateway should be able to fetch a set
// of secrets it has requested
CanFetchSecrets(ctx context.Context, info core.GatewayID, secrets []string) (bool, error)
}

// SPIFFEStreamMiddleware verifies the spiffe entries for the certificate
// and sets the client identidy on the request context. If no
// spiffe information is detected, or if the service is unknown,
// the request is rejected.
func SPIFFEStreamMiddleware(logger hclog.Logger, fetcher CertificateFetcher, registry GatewaySecretRegistry) grpc.StreamServerInterceptor {
func SPIFFEStreamMiddleware(logger hclog.Logger, fetcher CertificateFetcher, store store.Store) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if info, ok := verifySPIFFE(ss.Context(), logger, registry); ok {
if info, ok := verifySPIFFE(ss.Context(), logger, store); ok {
return handler(srv, wrapStream(ss, info))
}
return status.Errorf(codes.Unauthenticated, "unable to authenticate request")
}
}

func verifySPIFFE(ctx context.Context, logger hclog.Logger, registry GatewaySecretRegistry) (core.GatewayID, bool) {
func verifySPIFFE(ctx context.Context, logger hclog.Logger, store store.Store) (core.GatewayID, bool) {
if p, ok := peer.FromContext(ctx); ok {
if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok {
// grab the peer certificate info
Expand All @@ -90,12 +82,12 @@ func verifySPIFFE(ctx context.Context, logger hclog.Logger, registry GatewaySecr
continue
}
// if we're tracking the gateway then we're good
exists, err := registry.GatewayExists(ctx, info)
gateway, err := store.GetGateway(ctx, info)
if err != nil {
logger.Error("error checking for gateway, skipping", "error", err)
continue
}
if exists {
if gateway != nil {
return info, true
}
logger.Warn("gateway not found", "namespace", info.ConsulNamespace, "service", info.Service)
Expand Down
Loading

0 comments on commit 8f90401

Please sign in to comment.