Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Net 6820 customize mesh gateway limits #20945

Merged
merged 16 commits into from
Apr 16, 2024
Merged
3 changes: 3 additions & 0 deletions .changelog/20945.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
gateways: service defaults configuration entries can now be used to set default upstream limits for mesh-gateways
```
29 changes: 29 additions & 0 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err
}

sarahalsmiller marked this conversation as resolved.
Show resolved Hide resolved
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: s.service,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, serviceDefaultsWatchID, s.ch)
if err != nil {
return snap, err
}

snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
Expand Down Expand Up @@ -648,6 +659,24 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
}

snap.MeshGateway.PeerServers = peerServers
case serviceDefaultsWatchID:
resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if resp.Entry == nil {
return nil
}
serviceDefaults, ok := resp.Entry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

limits := serviceDefaults.UpstreamConfig.Defaults.Limits
nathancoleman marked this conversation as resolved.
Show resolved Hide resolved
if limits != nil {
snap.MeshGateway.Limits = limits
}

default:
switch {
Expand Down
16 changes: 16 additions & 0 deletions agent/proxycfg/proxycfg.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,22 @@ func (o *configSnapshotMeshGateway) DeepCopy() *configSnapshotMeshGateway {
}
}
}
if o.Limits != nil {
cp.Limits = new(structs.UpstreamLimits)
*cp.Limits = *o.Limits
if o.Limits.MaxConnections != nil {
cp.Limits.MaxConnections = new(int)
*cp.Limits.MaxConnections = *o.Limits.MaxConnections
}
if o.Limits.MaxPendingRequests != nil {
cp.Limits.MaxPendingRequests = new(int)
*cp.Limits.MaxPendingRequests = *o.Limits.MaxPendingRequests
}
if o.Limits.MaxConcurrentRequests != nil {
cp.Limits.MaxConcurrentRequests = new(int)
*cp.Limits.MaxConcurrentRequests = *o.Limits.MaxConcurrentRequests
}
}
return &cp
}

Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ type configSnapshotMeshGateway struct {
// PeeringTrustBundlesSet indicates that the watch on the peer trust
// bundles has completed at least once.
PeeringTrustBundlesSet bool

// Limits
Limits *structs.UpstreamLimits
}

// MeshGatewayValidExportedServices ensures that the following data is present
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
consulServerListWatchID = "consul-server-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
serviceDefaultsWatchID = "service-defaults"
gatewayServicesWatchID = "gateway-services"
gatewayConfigWatchID = "gateway-config"
apiGatewayConfigWatchID = "api-gateway-config"
Expand Down
23 changes: 23 additions & 0 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,25 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
},
},
})
case "limits-added":
extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceDefaultsWatchID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mesh-gateway",
UpstreamConfig: &structs.UpstreamConfiguration{
Defaults: &structs.UpstreamConfig{
Limits: &structs.UpstreamLimits{
MaxConnections: pointerTo(1),
MaxPendingRequests: pointerTo(10),
MaxConcurrentRequests: pointerTo(100),
},
},
},
},
},
})
default:
t.Fatalf("unknown variant: %s", variant)
return nil
Expand Down Expand Up @@ -1124,3 +1143,7 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
},
}, nsFn, nil, testSpliceEvents(baseEvents, extraUpdates))
}

func pointerTo[T any](v T) *T {
return &v
}
24 changes: 20 additions & 4 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain),
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[key.String()],
isRemote: true,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -554,6 +555,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: cfgSnap.ServerSNIFn(key.Datacenter, ""),
hostnameEndpoints: hostnameEndpoints,
isRemote: !key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrDefault()),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -563,7 +565,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
servers, _ := cfgSnap.MeshGateway.WatchedLocalServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -579,14 +582,15 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
limits: cfgSnap.MeshGateway.Limits,
})
clusters = append(clusters, cluster)
}
}

// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers, cfgSnap.MeshGateway.Limits)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,7 +668,7 @@ func (s *ResourceGenerator) makePeerServerClusters(cfgSnap *proxycfg.ConfigSnaps
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
res := []proto.Message{}
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers, nil)
if err != nil {
return nil, err
}
Expand All @@ -683,6 +687,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
cfgSnap *proxycfg.ConfigSnapshot,
services map[structs.ServiceName]structs.CheckServiceNodes,
resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry,
limits *structs.UpstreamLimits,
) ([]proto.Message, error) {
var hostnameEndpoints structs.CheckServiceNodes

Expand Down Expand Up @@ -724,6 +729,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
hostnameEndpoints: hostnameEndpoints,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -763,6 +769,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
onlyPassing: subset.OnlyPassing,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -812,6 +819,7 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
name: clusterName,
isRemote: true,
hostnameEndpoints: hostnameEndpoints,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -1706,6 +1714,8 @@ type clusterOpts struct {
// Corresponds to a valid address/port pairs to be routed externally
// these addresses will be embedded in the cluster configuration and will never use EDS
addresses []structs.ServiceAddress

limits *structs.UpstreamLimits
}

// makeGatewayCluster creates an Envoy cluster for a mesh or terminating gateway
Expand Down Expand Up @@ -1768,6 +1778,12 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
)
}

if opts.limits != nil {
cluster.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(opts.limits),
}
}

return cluster
}

Expand Down
8 changes: 8 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ func getMeshGatewayGoldenTestCases() []goldenTestCase {
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-with-limits",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotMeshGateway(t, "limits-added", nil, nil)
},
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-using-federation-states",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
Expand Down
151 changes: 151 additions & 0 deletions agent/xds/testdata/clusters/mesh-gateway-with-limits.latest.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{
"nonce": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"edsClusterConfig": {
"edsConfig": {
"ads": {},
"resourceApiVersion": "V3"
}
},
"name": "bar.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "EDS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"edsClusterConfig": {
"edsConfig": {
"ads": {},
"resourceApiVersion": "V3"
}
},
"name": "dc2.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "EDS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"dnsLookupFamily": "V4_ONLY",
"dnsRefreshRate": "10s",
"loadAssignment": {
"clusterName": "dc4.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "123.us-west-2.elb.notaws.com",
"portValue": 443
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
"name": "dc4.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "LOGICAL_DNS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"dnsLookupFamily": "V4_ONLY",
"dnsRefreshRate": "10s",
"loadAssignment": {
"clusterName": "dc6.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "123.us-east-1.elb.notaws.com",
"portValue": 443
}
}
},
"healthStatus": "UNHEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
"name": "dc6.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "LOGICAL_DNS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"edsClusterConfig": {
"edsConfig": {
"ads": {},
"resourceApiVersion": "V3"
}
},
"name": "foo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "EDS"
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"versionInfo": "00000001"
}
Loading
Loading