Skip to content

Commit

Permalink
Net 6820 customize mesh gateway limits (hashicorp#20945)
Browse files Browse the repository at this point in the history
* add upstream limits to mesh gateway cluster generation

* changelog

* go mod tidy

* readd changelog data

* undo reversion from rebase

* run codegen

* Update .changelog/20945.txt

Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>

* address notes

* gofmt

* clean up

* gofmt

* Update agent/proxycfg/mesh_gateway.go

* gofmt

* nil check

---------

Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
  • Loading branch information
sarahalsmiller and nathancoleman authored Apr 16, 2024
1 parent 5e9f02d commit 08761f1
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/20945.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
gateways: service defaults configuration entries can now be used to set default upstream limits for mesh-gateways
```
30 changes: 30 additions & 0 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
if err != nil {
return snap, err
}
// Watch for service default object that matches this mesh gateway's name
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: s.service,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, serviceDefaultsWatchID, s.ch)
if err != nil {
return snap, err
}

snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
Expand Down Expand Up @@ -648,6 +659,25 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
}

snap.MeshGateway.PeerServers = peerServers
case serviceDefaultsWatchID:
resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if resp.Entry == nil {
return nil
}
serviceDefaults, ok := resp.Entry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if serviceDefaults.UpstreamConfig != nil && serviceDefaults.UpstreamConfig.Defaults != nil {
if serviceDefaults.UpstreamConfig.Defaults.Limits != nil {
snap.MeshGateway.Limits = serviceDefaults.UpstreamConfig.Defaults.Limits
}
}

default:
switch {
Expand Down
16 changes: 16 additions & 0 deletions agent/proxycfg/proxycfg.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,22 @@ func (o *configSnapshotMeshGateway) DeepCopy() *configSnapshotMeshGateway {
}
}
}
if o.Limits != nil {
cp.Limits = new(structs.UpstreamLimits)
*cp.Limits = *o.Limits
if o.Limits.MaxConnections != nil {
cp.Limits.MaxConnections = new(int)
*cp.Limits.MaxConnections = *o.Limits.MaxConnections
}
if o.Limits.MaxPendingRequests != nil {
cp.Limits.MaxPendingRequests = new(int)
*cp.Limits.MaxPendingRequests = *o.Limits.MaxPendingRequests
}
if o.Limits.MaxConcurrentRequests != nil {
cp.Limits.MaxConcurrentRequests = new(int)
*cp.Limits.MaxConcurrentRequests = *o.Limits.MaxConcurrentRequests
}
}
return &cp
}

Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ type configSnapshotMeshGateway struct {
// PeeringTrustBundlesSet indicates that the watch on the peer trust
// bundles has completed at least once.
PeeringTrustBundlesSet bool

// Limits
Limits *structs.UpstreamLimits
}

// MeshGatewayValidExportedServices ensures that the following data is present
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
consulServerListWatchID = "consul-server-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
serviceDefaultsWatchID = "service-defaults"
gatewayServicesWatchID = "gateway-services"
gatewayConfigWatchID = "gateway-config"
apiGatewayConfigWatchID = "api-gateway-config"
Expand Down
23 changes: 23 additions & 0 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,25 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
},
},
})
case "limits-added":
extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceDefaultsWatchID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mesh-gateway",
UpstreamConfig: &structs.UpstreamConfiguration{
Defaults: &structs.UpstreamConfig{
Limits: &structs.UpstreamLimits{
MaxConnections: pointerTo(1),
MaxPendingRequests: pointerTo(10),
MaxConcurrentRequests: pointerTo(100),
},
},
},
},
},
})
default:
t.Fatalf("unknown variant: %s", variant)
return nil
Expand Down Expand Up @@ -1124,3 +1143,7 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
},
}, nsFn, nil, testSpliceEvents(baseEvents, extraUpdates))
}

func pointerTo[T any](v T) *T {
return &v
}
24 changes: 20 additions & 4 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain),
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[key.String()],
isRemote: true,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -554,6 +555,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: cfgSnap.ServerSNIFn(key.Datacenter, ""),
hostnameEndpoints: hostnameEndpoints,
isRemote: !key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrDefault()),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -563,7 +565,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
servers, _ := cfgSnap.MeshGateway.WatchedLocalServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -579,14 +582,15 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
limits: cfgSnap.MeshGateway.Limits,
})
clusters = append(clusters, cluster)
}
}

// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers, cfgSnap.MeshGateway.Limits)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,7 +668,7 @@ func (s *ResourceGenerator) makePeerServerClusters(cfgSnap *proxycfg.ConfigSnaps
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
res := []proto.Message{}
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers, nil)
if err != nil {
return nil, err
}
Expand All @@ -683,6 +687,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
cfgSnap *proxycfg.ConfigSnapshot,
services map[structs.ServiceName]structs.CheckServiceNodes,
resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry,
limits *structs.UpstreamLimits,
) ([]proto.Message, error) {
var hostnameEndpoints structs.CheckServiceNodes

Expand Down Expand Up @@ -724,6 +729,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
hostnameEndpoints: hostnameEndpoints,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -763,6 +769,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
onlyPassing: subset.OnlyPassing,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -812,6 +819,7 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
name: clusterName,
isRemote: true,
hostnameEndpoints: hostnameEndpoints,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -1706,6 +1714,8 @@ type clusterOpts struct {
// Corresponds to a valid address/port pairs to be routed externally
// these addresses will be embedded in the cluster configuration and will never use EDS
addresses []structs.ServiceAddress

limits *structs.UpstreamLimits
}

// makeGatewayCluster creates an Envoy cluster for a mesh or terminating gateway
Expand Down Expand Up @@ -1768,6 +1778,12 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
)
}

if opts.limits != nil {
cluster.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(opts.limits),
}
}

return cluster
}

Expand Down
8 changes: 8 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ func getMeshGatewayGoldenTestCases() []goldenTestCase {
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-with-limits",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotMeshGateway(t, "limits-added", nil, nil)
},
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-using-federation-states",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
Expand Down
151 changes: 151 additions & 0 deletions agent/xds/testdata/clusters/mesh-gateway-with-limits.latest.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{
"nonce": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"edsClusterConfig": {
"edsConfig": {
"ads": {},
"resourceApiVersion": "V3"
}
},
"name": "bar.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "EDS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"edsClusterConfig": {
"edsConfig": {
"ads": {},
"resourceApiVersion": "V3"
}
},
"name": "dc2.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "EDS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"dnsLookupFamily": "V4_ONLY",
"dnsRefreshRate": "10s",
"loadAssignment": {
"clusterName": "dc4.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "123.us-west-2.elb.notaws.com",
"portValue": 443
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
"name": "dc4.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "LOGICAL_DNS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"dnsLookupFamily": "V4_ONLY",
"dnsRefreshRate": "10s",
"loadAssignment": {
"clusterName": "dc6.internal.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "123.us-east-1.elb.notaws.com",
"portValue": 443
}
}
},
"healthStatus": "UNHEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
"name": "dc6.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "LOGICAL_DNS"
},
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"circuitBreakers": {
"thresholds": [
{
"maxConnections": 1,
"maxPendingRequests": 10,
"maxRequests": 100
}
]
},
"connectTimeout": "5s",
"edsClusterConfig": {
"edsConfig": {
"ads": {},
"resourceApiVersion": "V3"
}
},
"name": "foo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"outlierDetection": {},
"type": "EDS"
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"versionInfo": "00000001"
}
Loading

0 comments on commit 08761f1

Please sign in to comment.