Skip to content

Commit

Permalink
Start reconciling on the new field
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Mar 4, 2024
1 parent 9513f75 commit 51a3fa2
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 38 deletions.
1 change: 1 addition & 0 deletions pkg/controller/endpointslice/endpointslice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
c.maxEndpointsPerSlice,
c.endpointSliceTracker,
c.topologyCache,
utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution),
c.eventRecorder,
controllerName,
)
Expand Down
5 changes: 4 additions & 1 deletion staging/src/k8s.io/endpointslice/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ var (
Name: "endpointslices_changed_per_sync",
Help: "Number of EndpointSlices changed on each Service sync",
},
[]string{"topology"}, // either "Auto" or "Disabled"
[]string{
"topology", // either "Auto" or "Disabled"
"traffic_distribution", // "PreferClose" or <empty>
},
)

// EndpointSliceSyncs tracks the number of sync operations the controller
Expand Down
65 changes: 54 additions & 11 deletions staging/src/k8s.io/endpointslice/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/endpointslice/metrics"
"k8s.io/endpointslice/topologycache"
"k8s.io/endpointslice/trafficdist"
endpointsliceutil "k8s.io/endpointslice/util"
"k8s.io/klog/v2"
)
Expand All @@ -50,6 +51,9 @@ type Reconciler struct {
// topologyCache tracks the distribution of Nodes and endpoints across zones
// to enable TopologyAwareHints.
topologyCache *topologycache.TopologyCache
// trafficDistributionEnabled determines if endpointDistribution field is to
// be considered when reconciling EndpointSlice hints.
trafficDistributionEnabled bool
// eventRecorder allows Reconciler to record and publish events.
eventRecorder record.EventRecorder
controllerName string
Expand Down Expand Up @@ -261,9 +265,32 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}

canUseTrafficDistribution := r.trafficDistributionEnabled && !hintsEnabled(service.Annotations)

// Check if we need to add/remove hints based on the topology annotation.
//
// This if/else clause can be removed once the annotation has been deprecated.
// Ref: https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/4444-service-routing-preference
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
// Reaching this point means that we need to configure hints based on the
// topology annotation.
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(logger, si)

} else {
// Reaching this point means that we will not be configuring hints based on
// the topology annotation. We need to do 2 things:
// 1. If hints were added previously based on the annotation, we need to
// clear up any locally cached hints from the topologyCache object.
// 2. Optionally remove the actual hints from the EndpointSlice if we know
// that the `trafficDistribution` field is also NOT being used. In other
// words, if we know that the `trafficDistribution` field has been
// correctly configured by the customer, we DO NOT remove the hints and
// wait for the trafficDist handlers to correctly configure them. Always
// unconditionally removing hints here (and letting them get readded by
// the trafficDist) adds extra overhead in the form of DeepCopy (done
// within topologyCache.RemoveHints)

// Check 1.
if r.topologyCache != nil {
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
logger.Info("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
Expand All @@ -275,8 +302,17 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
}
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)

// Check 2.
if !canUseTrafficDistribution {
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
}

if canUseTrafficDistribution {
slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete))
}

err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
Expand All @@ -288,16 +324,17 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.

}

func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, eventRecorder record.EventRecorder, controllerName string) *Reconciler {
func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, trafficDistributionEnabled bool, eventRecorder record.EventRecorder, controllerName string) *Reconciler {
return &Reconciler{
client: client,
nodeLister: nodeLister,
maxEndpointsPerSlice: maxEndpointsPerSlice,
endpointSliceTracker: endpointSliceTracker,
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
topologyCache: topologyCache,
eventRecorder: eventRecorder,
controllerName: controllerName,
client: client,
nodeLister: nodeLister,
maxEndpointsPerSlice: maxEndpointsPerSlice,
endpointSliceTracker: endpointSliceTracker,
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
topologyCache: topologyCache,
trafficDistributionEnabled: trafficDistributionEnabled,
eventRecorder: eventRecorder,
controllerName: controllerName,
}
}

Expand Down Expand Up @@ -401,9 +438,15 @@ func (r *Reconciler) finalize(
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
topologyLabel = "Auto"
}
var trafficDistribution string
if r.trafficDistributionEnabled && !hintsEnabled(service.Annotations) {
if service.Spec.TrafficDistribution != nil && *service.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose {
trafficDistribution = *service.Spec.TrafficDistribution
}
}

numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete)
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged))
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel, trafficDistribution).Observe(float64(numSlicesChanged))

return nil
}
Expand Down
Loading

0 comments on commit 51a3fa2

Please sign in to comment.