Skip to content

Commit

Permalink
add upstream limits to mesh gateway cluster generation
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahalsmiller committed Apr 2, 2024
1 parent 3dc2751 commit cf7ea96
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 17 deletions.
33 changes: 33 additions & 0 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err
}

err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
//TODO @Gateway Management is this always going to be the name of the mesh gateway
Name: "mesh-gateway",
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, serviceDefaultsWatchID, s.ch)
if err != nil {
return snap, err
}

snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
Expand Down Expand Up @@ -648,6 +660,27 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
}

snap.MeshGateway.PeerServers = peerServers
case serviceDefaultsWatchID:
resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if resp.Entry == nil {
return nil
}
serviceDefaults, ok := resp.Entry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if !ok {
return fmt.Errorf("invalid type for service defaults: %T", u.Result)
}
limits := serviceDefaults.UpstreamConfig.Defaults.Limits
if limits != nil {
snap.MeshGateway.Limits = limits
}

default:
switch {
Expand Down
6 changes: 6 additions & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ type configSnapshotTerminatingGateway struct {
// These bundles are used to configure RBAC policies for inbound filter chains on the gateway
// from services that are in a cluster-peered datacenter.
InboundPeerTrustBundles map[structs.ServiceName][]*pbpeering.PeeringTrustBundle

// Limits
Limits structs.UpstreamLimits
}

// ValidServices returns the list of service keys that have enough data to be emitted.
Expand Down Expand Up @@ -497,6 +500,9 @@ type configSnapshotMeshGateway struct {
// PeeringTrustBundlesSet indicates that the watch on the peer trust
// bundles has completed at least once.
PeeringTrustBundlesSet bool

// Limits
Limits *structs.UpstreamLimits
}

// MeshGatewayValidExportedServices ensures that the following data is present
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
consulServerListWatchID = "consul-server-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
serviceDefaultsWatchID = "service-defaults"
gatewayServicesWatchID = "gateway-services"
gatewayConfigWatchID = "gateway-config"
apiGatewayConfigWatchID = "api-gateway-config"
Expand Down
20 changes: 20 additions & 0 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package proxycfg

import (
"k8s.io/utils/pointer"
"math"
"time"

Expand Down Expand Up @@ -264,6 +265,25 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
},
},
})
case "limits-added":
extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: "service-defaults", // serviceResolversWatchID
Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mesh-gateway",
UpstreamConfig: &structs.UpstreamConfiguration{
Defaults: &structs.UpstreamConfig{
Limits: &structs.UpstreamLimits{
MaxConnections: pointer.Int(1),
MaxPendingRequests: pointer.Int(10),
MaxConcurrentRequests: pointer.Int(100),
},
},
},
},
},
})
default:
t.Fatalf("unknown variant: %s", variant)
return nil
Expand Down
21 changes: 8 additions & 13 deletions agent/structs/config_entry_gateways.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,6 @@ type LinkedService struct {
// SNI is the optional name to specify during the TLS handshake with a linked service
SNI string `json:",omitempty"`

//DisableAutoHostRewrite disables terminating gateways auto host rewrite feature when set to true.
DisableAutoHostRewrite bool `json:",omitempty"`

acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
}

Expand Down Expand Up @@ -671,7 +668,6 @@ type GatewayService struct {
FromWildcard bool `json:",omitempty"`
ServiceKind GatewayServiceKind `json:",omitempty"`
RaftIndex
AutoHostRewrite bool `json:",omitempty"`
}

type GatewayServices []*GatewayService
Expand Down Expand Up @@ -719,15 +715,14 @@ func (g *GatewayService) Clone() *GatewayService {
Port: g.Port,
Protocol: g.Protocol,
// See https://github.com/go101/go101/wiki/How-to-efficiently-clone-a-slice%3F
Hosts: append(g.Hosts[:0:0], g.Hosts...),
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
SNI: g.SNI,
FromWildcard: g.FromWildcard,
RaftIndex: g.RaftIndex,
ServiceKind: g.ServiceKind,
AutoHostRewrite: g.AutoHostRewrite,
Hosts: append(g.Hosts[:0:0], g.Hosts...),
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
SNI: g.SNI,
FromWildcard: g.FromWildcard,
RaftIndex: g.RaftIndex,
ServiceKind: g.ServiceKind,
}
}

Expand Down
26 changes: 22 additions & 4 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain),
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[key.String()],
isRemote: true,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -554,6 +555,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: cfgSnap.ServerSNIFn(key.Datacenter, ""),
hostnameEndpoints: hostnameEndpoints,
isRemote: !key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrDefault()),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -563,7 +565,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
servers, _ := cfgSnap.MeshGateway.WatchedLocalServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -579,14 +582,15 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
limits: cfgSnap.MeshGateway.Limits,
})
clusters = append(clusters, cluster)
}
}

// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers, cfgSnap.MeshGateway.Limits)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,7 +668,7 @@ func (s *ResourceGenerator) makePeerServerClusters(cfgSnap *proxycfg.ConfigSnaps
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
res := []proto.Message{}
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers, nil)
if err != nil {
return nil, err
}
Expand All @@ -683,6 +687,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
cfgSnap *proxycfg.ConfigSnapshot,
services map[structs.ServiceName]structs.CheckServiceNodes,
resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry,
limits *structs.UpstreamLimits,
) ([]proto.Message, error) {
var hostnameEndpoints structs.CheckServiceNodes

Expand Down Expand Up @@ -724,6 +729,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
hostnameEndpoints: hostnameEndpoints,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -763,6 +769,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
onlyPassing: subset.OnlyPassing,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand All @@ -786,6 +793,8 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
return nil, fmt.Errorf("unsupported gateway kind %q", cfgSnap.Kind)
}

limits := cfgSnap.MeshGateway.Limits

var clusters []proto.Message

for _, serviceGroups := range cfgSnap.MeshGateway.PeeringServices {
Expand All @@ -812,6 +821,7 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
name: clusterName,
isRemote: true,
hostnameEndpoints: hostnameEndpoints,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -1706,6 +1716,8 @@ type clusterOpts struct {
// Corresponds to a valid address/port pairs to be routed externally
// these addresses will be embedded in the cluster configuration and will never use EDS
addresses []structs.ServiceAddress

limits *structs.UpstreamLimits
}

// makeGatewayCluster creates an Envoy cluster for a mesh or terminating gateway
Expand Down Expand Up @@ -1768,6 +1780,12 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
)
}

if opts.limits != nil {
cluster.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(opts.limits),
}
}

return cluster
}

Expand Down
8 changes: 8 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,14 @@ func getMeshGatewayGoldenTestCases() []goldenTestCase {
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-with-limits",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotMeshGateway(t, "limits-added", nil, nil)
},
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-using-federation-states",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
Expand Down
Loading

0 comments on commit cf7ea96

Please sign in to comment.