Skip to content

Commit

Permalink
Merge pull request kubernetes#123487 from gauravkghildiyal/kep-4444
Browse files Browse the repository at this point in the history
Introduce trafficDistribution field for Kubernetes Services
  • Loading branch information
k8s-ci-robot committed Mar 5, 2024
2 parents 229ebab + ec6fd2b commit a76a3e0
Show file tree
Hide file tree
Showing 30 changed files with 2,310 additions and 1,024 deletions.
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/openapi-spec/v3/api__v1_openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -7447,6 +7447,10 @@
],
"description": "sessionAffinityConfig contains the configurations of session affinity."
},
"trafficDistribution": {
"description": "TrafficDistribution offers a way to express preferences for how traffic is distributed to Service endpoints. Implementations can use this field as a hint, but are not required to guarantee strict adherence. If the field is not set, the implementation will apply its default routing strategy. If set to \"PreferClose\", implementations should prioritize endpoints that are topologically close (e.g., same zone).",
"type": "string"
},
"type": {
"description": "type determines how the Service is exposed. Defaults to ClusterIP. Valid options are ExternalName, ClusterIP, NodePort, and LoadBalancer. \"ClusterIP\" allocates a cluster-internal IP address for load-balancing to endpoints. Endpoints are determined by the selector or if that is not specified, by manual construction of an Endpoints object or EndpointSlice objects. If clusterIP is \"None\", no virtual IP is allocated and the endpoints are published as a set of endpoints rather than a virtual IP. \"NodePort\" builds on ClusterIP and allocates a port on every node which routes to the same endpoints as the clusterIP. \"LoadBalancer\" builds on NodePort and creates an external load-balancer (if supported in the current cloud) which routes to the same endpoints as the clusterIP. \"ExternalName\" aliases this service to the specified externalName. Several other fields do not apply to ExternalName services. More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types",
"type": "string"
Expand Down
21 changes: 21 additions & 0 deletions pkg/apis/core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4163,6 +4163,18 @@ const (
ServiceExternalTrafficPolicyLocal ServiceExternalTrafficPolicy = "Local"
)

// These are valid values for the TrafficDistribution field of a Service.
const (
// Indicates a preference for routing traffic to endpoints that are
// topologically proximate to the client. The interpretation of "topologically
// proximate" may vary across implementations and could encompass endpoints
// within the same node, rack, zone, or even region. Setting this value gives
// implementations permission to make different tradeoffs, e.g. optimizing for
// proximity rather than equal distribution of load. Users should not set this
// value if such tradeoffs are not acceptable.
ServiceTrafficDistributionPreferClose = "PreferClose"
)

// These are the valid conditions of a service.
const (
// LoadBalancerPortsError represents the condition of the requested ports
Expand Down Expand Up @@ -4426,6 +4438,15 @@ type ServiceSpec struct {
// (possibly modified by topology and other features).
// +optional
InternalTrafficPolicy *ServiceInternalTrafficPolicy

// TrafficDistribution offers a way to express preferences for how traffic is
// distributed to Service endpoints. Implementations can use this field as a
// hint, but are not required to guarantee strict adherence. If the field is
// not set, the implementation will apply its default routing strategy. If set
// to "PreferClose", implementations should prioritize endpoints that are
// topologically close (e.g., same zone).
// +optional
TrafficDistribution *string
}

// ServicePort represents the port on which the service is exposed
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/core/v1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions pkg/apis/core/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5534,6 +5534,9 @@ func ValidateService(service *core.Service) field.ErrorList {
// internal traffic policy field
allErrs = append(allErrs, validateServiceInternalTrafficFieldsValue(service)...)

// traffic distribution field
allErrs = append(allErrs, validateServiceTrafficDistribution(service)...)

return allErrs
}

Expand Down Expand Up @@ -5651,6 +5654,22 @@ func validateServiceInternalTrafficFieldsValue(service *core.Service) field.Erro
return allErrs
}

// validateServiceTrafficDistribution validates the values for the
// trafficDistribution field.
func validateServiceTrafficDistribution(service *core.Service) field.ErrorList {
allErrs := field.ErrorList{}

if service.Spec.TrafficDistribution == nil {
return allErrs
}

if *service.Spec.TrafficDistribution != v1.ServiceTrafficDistributionPreferClose {
allErrs = append(allErrs, field.NotSupported(field.NewPath("spec").Child("trafficDistribution"), *service.Spec.TrafficDistribution, []string{v1.ServiceTrafficDistributionPreferClose}))
}

return allErrs
}

// ValidateServiceCreate validates Services as they are created.
func ValidateServiceCreate(service *core.Service) field.ErrorList {
return ValidateService(service)
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/core/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17121,6 +17121,18 @@ func TestValidateServiceCreate(t *testing.T) {
s.Annotations[core.AnnotationTopologyMode] = "different"
},
numErrs: 1,
}, {
name: "valid: trafficDistribution field set to PreferClose",
tweakSvc: func(s *core.Service) {
s.Spec.TrafficDistribution = utilpointer.String("PreferClose")
},
numErrs: 0,
}, {
name: "invalid: trafficDistribution field set to Random",
tweakSvc: func(s *core.Service) {
s.Spec.TrafficDistribution = utilpointer.String("Random")
},
numErrs: 1,
},
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/core/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/controller/endpointslice/endpointslice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
c.topologyCache,
c.eventRecorder,
controllerName,
endpointslicerec.WithTrafficDistributionEnabled(utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution)),
)

return c
Expand Down
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,13 @@ const (
// Subdivide the NodePort range for dynamic and static port allocation.
ServiceNodePortStaticSubrange featuregate.Feature = "ServiceNodePortStaticSubrange"

// owner: @gauravkghildiyal @robscott
// kep: https://kep.k8s.io/4444
// alpha: v1.30
//
// Enables trafficDistribution field on Services.
ServiceTrafficDistribution featuregate.Feature = "ServiceTrafficDistribution"

// owner: @gjkim42 @SergeyKanzhelev @matthyx @tzneal
// kep: http://kep.k8s.io/753
// alpha: v1.28
Expand Down Expand Up @@ -1139,6 +1146,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

ServiceNodePortStaticSubrange: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.29; remove in 1.31

ServiceTrafficDistribution: {Default: false, PreRelease: featuregate.Alpha},

SidecarContainers: {Default: true, PreRelease: featuregate.Beta},

SizeMemoryBackedVolumes: {Default: true, PreRelease: featuregate.Beta},
Expand Down
7 changes: 7 additions & 0 deletions pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 18 additions & 12 deletions pkg/proxy/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,27 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
return
}

// canUseTopology returns true if topology aware routing is enabled and properly configured
// in this cluster. That is, it checks that:
// * The TopologyAwareHints feature is enabled
// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto"
// * The node's labels include "topology.kubernetes.io/zone"
// * All of the endpoints for this Service have a topology hint
// * At least one endpoint for this Service is hinted for this node's zone.
// canUseTopology returns true if topology aware routing is enabled and properly
// configured in this cluster. That is, it checks that:
// - The TopologyAwareHints or ServiceTrafficDistribution feature is enabled.
// - If ServiceTrafficDistribution feature gate is not enabled, then the
// hintsAnnotation should represent an enabled value.
// - The node's labels include "topology.kubernetes.io/zone".
// - All of the endpoints for this Service have a topology hint.
// - At least one endpoint for this Service is hinted for this node's zone.
func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) && !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
return false
}
// Any non-empty and non-disabled values for the hints annotation are acceptable.
hintsAnnotation := svcInfo.HintsAnnotation()
if hintsAnnotation == "" || hintsAnnotation == "disabled" || hintsAnnotation == "Disabled" {
return false

// Ignore value of hintsAnnotation if the ServiceTrafficDistribution feature
// gate is enabled.
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
// If the hintsAnnotation has a disabled value, we do not consider hints for route programming.
hintsAnnotation := svcInfo.HintsAnnotation()
if hintsAnnotation == "" || hintsAnnotation == "disabled" || hintsAnnotation == "Disabled" {
return false
}
}

zone, ok := nodeLabels[v1.LabelTopologyZone]
Expand Down
60 changes: 46 additions & 14 deletions pkg/proxy/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error

func TestCategorizeEndpoints(t *testing.T) {
testCases := []struct {
name string
hintsEnabled bool
pteEnabled bool
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
name string
hintsEnabled bool
trafficDistFeatureEnabled bool
pteEnabled bool
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint

// We distinguish `nil` ("service doesn't use this kind of endpoints") from
// `sets.Set[string]()` ("service uses this kind of endpoints but has no endpoints").
Expand Down Expand Up @@ -131,10 +132,39 @@ func TestCategorizeEndpoints(t *testing.T) {
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
name: "hints, hints annotation empty but trafficDist feature gate enabled, hints are not ignored",
hintsEnabled: true,
trafficDistFeatureEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), ready: true},
},
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints disabled, trafficDist feature gate enabled, hints are not ignored",
hintsEnabled: false,
trafficDistFeatureEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), ready: true},
},
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
trafficDistFeatureEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true, isLocal: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true, isLocal: true},
Expand All @@ -145,10 +175,11 @@ func TestCategorizeEndpoints(t *testing.T) {
localEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80"),
allEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
}, {
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080},
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
trafficDistFeatureEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true, isLocal: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true, isLocal: true},
Expand Down Expand Up @@ -458,6 +489,7 @@ func TestCategorizeEndpoints(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTrafficDistribution, tc.trafficDistFeatureEnabled)()

clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)

Expand Down
6 changes: 5 additions & 1 deletion pkg/registry/core/service/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ func (svcStrategy) AllowUnconditionalUpdate() bool {
// newSvc.Spec.MyFeature = nil
// }
func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) {

// Drop condition for TrafficDistribution field.
isTrafficDistributionInUse := (oldSvc != nil && oldSvc.Spec.TrafficDistribution != nil)
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) && !isTrafficDistributionInUse {
newSvc.Spec.TrafficDistribution = nil
}
}

type serviceStatusStrategy struct {
Expand Down
Loading

0 comments on commit a76a3e0

Please sign in to comment.