Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Net 6820 customize mesh gateway limits into release/1.18.x #20983

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/20945.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
gateways: service defaults configuration entries can now be used to set default upstream limits for mesh-gateways
```
30 changes: 30 additions & 0 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
if err != nil {
return snap, err
}
// Watch for service default object that matches this mesh gateway's name
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: s.service,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, serviceDefaultsWatchID, s.ch)
if err != nil {
return snap, err
}

snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
Expand Down Expand Up @@ -648,6 +659,25 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
}

snap.MeshGateway.PeerServers = peerServers
case serviceDefaultsWatchID:
resp, ok := u.Result.(*structs.ConfigEntryResponse)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if resp.Entry == nil {
return nil
}
serviceDefaults, ok := resp.Entry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}

if serviceDefaults.UpstreamConfig != nil && serviceDefaults.UpstreamConfig.Defaults != nil {
if serviceDefaults.UpstreamConfig.Defaults.Limits != nil {
snap.MeshGateway.Limits = serviceDefaults.UpstreamConfig.Defaults.Limits
}
}

default:
switch {
Expand Down
16 changes: 16 additions & 0 deletions agent/proxycfg/proxycfg.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,22 @@ func (o *configSnapshotMeshGateway) DeepCopy() *configSnapshotMeshGateway {
}
}
}
if o.Limits != nil {
cp.Limits = new(structs.UpstreamLimits)
*cp.Limits = *o.Limits
if o.Limits.MaxConnections != nil {
cp.Limits.MaxConnections = new(int)
*cp.Limits.MaxConnections = *o.Limits.MaxConnections
}
if o.Limits.MaxPendingRequests != nil {
cp.Limits.MaxPendingRequests = new(int)
*cp.Limits.MaxPendingRequests = *o.Limits.MaxPendingRequests
}
if o.Limits.MaxConcurrentRequests != nil {
cp.Limits.MaxConcurrentRequests = new(int)
*cp.Limits.MaxConcurrentRequests = *o.Limits.MaxConcurrentRequests
}
}
return &cp
}

Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ type configSnapshotMeshGateway struct {
// PeeringTrustBundlesSet indicates that the watch on the peer trust
// bundles has completed at least once.
PeeringTrustBundlesSet bool

// Limits
Limits *structs.UpstreamLimits
}

// MeshGatewayValidExportedServices ensures that the following data is present
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
consulServerListWatchID = "consul-server-list"
datacentersWatchID = "datacenters"
serviceResolversWatchID = "service-resolvers"
serviceDefaultsWatchID = "service-defaults"
gatewayServicesWatchID = "gateway-services"
gatewayConfigWatchID = "gateway-config"
apiGatewayConfigWatchID = "api-gateway-config"
Expand Down
23 changes: 23 additions & 0 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,25 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
},
},
})
case "limits-added":
extraUpdates = append(extraUpdates, UpdateEvent{
CorrelationID: serviceDefaultsWatchID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "mesh-gateway",
UpstreamConfig: &structs.UpstreamConfiguration{
Defaults: &structs.UpstreamConfig{
Limits: &structs.UpstreamLimits{
MaxConnections: pointerTo(1),
MaxPendingRequests: pointerTo(10),
MaxConcurrentRequests: pointerTo(100),
},
},
},
},
},
})
default:
t.Fatalf("unknown variant: %s", variant)
return nil
Expand Down Expand Up @@ -1124,3 +1143,7 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
},
}, nsFn, nil, testSpliceEvents(baseEvents, extraUpdates))
}

func pointerTo[T any](v T) *T {
return &v
}
21 changes: 13 additions & 8 deletions agent/structs/config_entry_gateways.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ type LinkedService struct {
// SNI is the optional name to specify during the TLS handshake with a linked service
SNI string `json:",omitempty"`

//DisableAutoHostRewrite disables terminating gateways auto host rewrite feature when set to true.
DisableAutoHostRewrite bool `json:",omitempty"`

acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
}

Expand Down Expand Up @@ -668,6 +671,7 @@ type GatewayService struct {
FromWildcard bool `json:",omitempty"`
ServiceKind GatewayServiceKind `json:",omitempty"`
RaftIndex
AutoHostRewrite bool `json:",omitempty"`
}

type GatewayServices []*GatewayService
Expand Down Expand Up @@ -715,14 +719,15 @@ func (g *GatewayService) Clone() *GatewayService {
Port: g.Port,
Protocol: g.Protocol,
// See https://github.com/go101/go101/wiki/How-to-efficiently-clone-a-slice%3F
Hosts: append(g.Hosts[:0:0], g.Hosts...),
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
SNI: g.SNI,
FromWildcard: g.FromWildcard,
RaftIndex: g.RaftIndex,
ServiceKind: g.ServiceKind,
Hosts: append(g.Hosts[:0:0], g.Hosts...),
CAFile: g.CAFile,
CertFile: g.CertFile,
KeyFile: g.KeyFile,
SNI: g.SNI,
FromWildcard: g.FromWildcard,
RaftIndex: g.RaftIndex,
ServiceKind: g.ServiceKind,
AutoHostRewrite: g.AutoHostRewrite,
}
}

Expand Down
24 changes: 20 additions & 4 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: connect.GatewaySNI(key.Datacenter, key.Partition, cfgSnap.Roots.TrustDomain),
hostnameEndpoints: cfgSnap.MeshGateway.HostnameDatacenters[key.String()],
isRemote: true,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -554,6 +555,7 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
name: cfgSnap.ServerSNIFn(key.Datacenter, ""),
hostnameEndpoints: hostnameEndpoints,
isRemote: !key.Matches(cfgSnap.Datacenter, cfgSnap.ProxyID.PartitionOrDefault()),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -563,7 +565,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
servers, _ := cfgSnap.MeshGateway.WatchedLocalServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)
clusters = append(clusters, cluster)
Expand All @@ -579,14 +582,15 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
limits: cfgSnap.MeshGateway.Limits,
})
clusters = append(clusters, cluster)
}
}

// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers, cfgSnap.MeshGateway.Limits)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -664,7 +668,7 @@ func (s *ResourceGenerator) makePeerServerClusters(cfgSnap *proxycfg.ConfigSnaps
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
res := []proto.Message{}
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
gwClusters, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers, nil)
if err != nil {
return nil, err
}
Expand All @@ -683,6 +687,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
cfgSnap *proxycfg.ConfigSnapshot,
services map[structs.ServiceName]structs.CheckServiceNodes,
resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry,
limits *structs.UpstreamLimits,
) ([]proto.Message, error) {
var hostnameEndpoints structs.CheckServiceNodes

Expand Down Expand Up @@ -724,6 +729,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
hostnameEndpoints: hostnameEndpoints,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -763,6 +769,7 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
onlyPassing: subset.OnlyPassing,
connectTimeout: resolver.ConnectTimeout,
isRemote: isRemote,
limits: limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -812,6 +819,7 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg
name: clusterName,
isRemote: true,
hostnameEndpoints: hostnameEndpoints,
limits: cfgSnap.MeshGateway.Limits,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

Expand Down Expand Up @@ -1706,6 +1714,8 @@ type clusterOpts struct {
// Corresponds to a valid address/port pairs to be routed externally
// these addresses will be embedded in the cluster configuration and will never use EDS
addresses []structs.ServiceAddress

limits *structs.UpstreamLimits
}

// makeGatewayCluster creates an Envoy cluster for a mesh or terminating gateway
Expand Down Expand Up @@ -1768,6 +1778,12 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op
)
}

if opts.limits != nil {
cluster.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
Thresholds: makeThresholdsIfNeeded(opts.limits),
}
}

return cluster
}

Expand Down
8 changes: 8 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ func getMeshGatewayGoldenTestCases() []goldenTestCase {
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-with-limits",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotMeshGateway(t, "limits-added", nil, nil)
},
// TODO(proxystate): mesh gateway will come at a later time
alsoRunTestForV2: false,
},
{
name: "mesh-gateway-using-federation-states",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
Expand Down
Loading
Loading