Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Net 6820 customize mesh gateway limits into release/1.18.x #20983

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20945.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
peering: mesh gateways will now respect upstream config limits set in a service default object named 'mesh-gateway'
```
33 changes: 33 additions & 0 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err
}

err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
//TODO @Gateway Management is this always going to be the name of the mesh gateway
Name: "mesh-gateway",
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, serviceDefaultsWatchID, s.ch)
if err != nil {
return snap, err
}

snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
Expand Down Expand Up @@ -648,6 +660,27 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
}

snap.MeshGateway.PeerServers = peerServers
case serviceDefaultsWatchID:
resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if resp.Entry == nil {
return nil
}
serviceDefaults, ok := resp.Entry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if !ok {
return fmt.Errorf("invalid type for service defaults: %T", u.Result)
}
limits := serviceDefaults.UpstreamConfig.Defaults.Limits
if limits != nil {
snap.MeshGateway.Limits = limits
}

default:
switch {
Expand Down
28 changes: 28 additions & 0 deletions agent/proxycfg/proxycfg.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,22 @@ func (o *configSnapshotMeshGateway) DeepCopy() *configSnapshotMeshGateway {
}
}
}
if o.Limits != nil {
cp.Limits = new(structs.UpstreamLimits)
*cp.Limits = *o.Limits
if o.Limits.MaxConnections != nil {
cp.Limits.MaxConnections = new(int)
*cp.Limits.MaxConnections = *o.Limits.MaxConnections
}
if o.Limits.MaxPendingRequests != nil {
cp.Limits.MaxPendingRequests = new(int)
*cp.Limits.MaxPendingRequests = *o.Limits.MaxPendingRequests
}
if o.Limits.MaxConcurrentRequests != nil {
cp.Limits.MaxConcurrentRequests = new(int)
*cp.Limits.MaxConcurrentRequests = *o.Limits.MaxConcurrentRequests
}
}
return &cp
}

Expand Down Expand Up @@ -879,5 +895,17 @@ func (o *configSnapshotTerminatingGateway) DeepCopy() *configSnapshotTerminating
cp.InboundPeerTrustBundles[k2] = cp_InboundPeerTrustBundles_v2
}
}
if o.Limits.MaxConnections != nil {
cp.Limits.MaxConnections = new(int)
*cp.Limits.MaxConnections = *o.Limits.MaxConnections
}
if o.Limits.MaxPendingRequests != nil {
cp.Limits.MaxPendingRequests = new(int)
*cp.Limits.MaxPendingRequests = *o.Limits.MaxPendingRequests
}
if o.Limits.MaxConcurrentRequests != nil {
cp.Limits.MaxConcurrentRequests = new(int)
*cp.Limits.MaxConcurrentRequests = *o.Limits.MaxConcurrentRequests
}
return &cp
}
6 changes: 6 additions & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ type configSnapshotTerminatingGateway struct {
// These bundles are used to configure RBAC policies for inbound filter chains on the gateway
// from services that are in a cluster-peered datacenter.
InboundPeerTrustBundles map[structs.ServiceName][]*pbpeering.PeeringTrustBundle

// Limits
Limits structs.UpstreamLimits
}

// ValidServices returns the list of service keys that have enough data to be emitted.
Expand Down Expand Up @@ -497,6 +500,9 @@ type configSnapshotMeshGateway struct {
// PeeringTrustBundlesSet indicates that the watch on the peer trust
// bundles has completed at least once.
PeeringTrustBundlesSet bool

// Limits
Limits *structs.UpstreamLimits
}

// MeshGatewayValidExportedServices ensures that the following data is present
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
consulServerListWatchID = "consul-server-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
serviceDefaultsWatchID = "service-defaults"
gatewayServicesWatchID = "gateway-services"
gatewayConfigWatchID = "gateway-config"
apiGatewayConfigWatchID = "api-gateway-config"
Expand Down
20 changes: 20 additions & 0 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package proxycfg

import (
"k8s.io/utils/pointer"
"math"
"time"

Expand Down Expand Up @@ -264,6 +265,25 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
},
},
})
case "limits-added":
extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: "service-defaults", // serviceResolversWatchID
Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mesh-gateway",
UpstreamConfig: &structs.UpstreamConfiguration{
Defaults: &structs.UpstreamConfig{
Limits: &structs.UpstreamLimits{
MaxConnections: pointer.Int(1),
MaxPendingRequests: pointer.Int(10),
MaxConcurrentRequests: pointer.Int(100),
},
},
},
},
},
})
default:
t.Fatalf("unknown variant: %s", variant)
return nil
Expand Down
21 changes: 13 additions & 8 deletions agent/structs/config_entry_gateways.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ type LinkedService struct {
// SNI is the optional name to specify during the TLS handshake with a linked service
SNI string `json:",omitempty"`

//DisableAutoHostRewrite disables terminating gateways auto host rewrite feature when set to true.
DisableAutoHostRewrite bool `json:",omitempty"`

acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
}

Expand Down Expand Up @@ -668,6 +671,7 @@ type GatewayService struct {
FromWildcard bool `json:",omitempty"`
ServiceKind GatewayServiceKind `json:",omitempty"`
RaftIndex
AutoHostRewrite bool `json:",omitempty"`
}

type GatewayServices []*GatewayService
Expand Down Expand Up @@ -715,14 +719,15 @@ func (g *GatewayService) Clone() *GatewayService {
Port: g.Port,
Protocol: g.Protocol,
// See https://github.com/go101/go101/wiki/How-to-efficiently-clone-a-slice%3F
Hosts: append(g.Hosts[:0:0], g.Hosts...),
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
SNI: g.SNI,
FromWildcard: g.FromWildcard,
RaftIndex: g.RaftIndex,
ServiceKind: g.ServiceKind,
Hosts: append(g.Hosts[:0:0], g.Hosts...),
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
SNI: g.SNI,
FromWildcard: g.FromWildcard,
RaftIndex: g.RaftIndex,
ServiceKind: g.ServiceKind,
AutoHostRewrite: g.AutoHostRewrite,
}
}

Expand Down
26 changes: 22 additions & 4 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain),
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[key.String()],
isRemote: true,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -554,6 +555,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: cfgSnap.ServerSNIFn(key.Datacenter, ""),
hostnameEndpoints: hostnameEndpoints,
isRemote: !key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrDefault()),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -563,7 +565,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
servers, _ := cfgSnap.MeshGateway.WatchedLocalServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -579,14 +582,15 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
limits: cfgSnap.MeshGateway.Limits,
})
clusters = append(clusters, cluster)
}
}

// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers, cfgSnap.MeshGateway.Limits)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,7 +668,7 @@ func (s *ResourceGenerator) makePeerServerClusters(cfgSnap *proxycfg.ConfigSnaps
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
res := []proto.Message{}
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers, nil)
if err != nil {
return nil, err
}
Expand All @@ -683,6 +687,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
cfgSnap *proxycfg.ConfigSnapshot,
services map[structs.ServiceName]structs.CheckServiceNodes,
resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry,
limits *structs.UpstreamLimits,
) ([]proto.Message, error) {
var hostnameEndpoints structs.CheckServiceNodes

Expand Down Expand Up @@ -724,6 +729,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
hostnameEndpoints: hostnameEndpoints,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -763,6 +769,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
onlyPassing: subset.OnlyPassing,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand All @@ -786,6 +793,8 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
return nil, fmt.Errorf("unsupported gateway kind %q", cfgSnap.Kind)
}

limits := cfgSnap.MeshGateway.Limits

var clusters []proto.Message

for _, serviceGroups := range cfgSnap.MeshGateway.PeeringServices {
Expand All @@ -812,6 +821,7 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
name: clusterName,
isRemote: true,
hostnameEndpoints: hostnameEndpoints,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -1706,6 +1716,8 @@ type clusterOpts struct {
// Corresponds to a valid address/port pairs to be routed externally
// these addresses will be embedded in the cluster configuration and will never use EDS
addresses []structs.ServiceAddress

limits *structs.UpstreamLimits
}

// makeGatewayCluster creates an Envoy cluster for a mesh or terminating gateway
Expand Down Expand Up @@ -1768,6 +1780,12 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
)
}

if opts.limits != nil {
cluster.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(opts.limits),
}
}

return cluster
}

Expand Down
8 changes: 8 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ func getMeshGatewayGoldenTestCases() []goldenTestCase {
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-with-limits",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotMeshGateway(t, "limits-added", nil, nil)
},
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-using-federation-states",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
Expand Down
Loading
Loading