Skip to content

Commit

Permalink
HPA: only send updates when the status has changed
Browse files Browse the repository at this point in the history
This commit only sends updates if the status has actually changed.
Since the HPA runs at a regular interval, this should reduce the volume
of writes, especially on short HPA intervals with relatively constant
metrics.
  • Loading branch information
DirectXMan12 committed Jun 7, 2017
1 parent 4a01f44 commit 00a19f1
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/controller/podautoscaler/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/controller/podautoscaler/metrics:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
Expand Down
46 changes: 34 additions & 12 deletions pkg/controller/podautoscaler/horizontal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -371,14 +372,20 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
}
hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
hpaStatusOriginalRaw, err := api.Scheme.DeepCopy(&hpa.Status)
if err != nil {
a.eventRecorder.Event(hpav1Shared, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
return fmt.Errorf("failed to deep-copy the HPA status: %v", err)
}
hpaStatusOriginal := hpaStatusOriginalRaw.(*autoscalingv2.HorizontalPodAutoscalerStatus)

reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

scale, err := a.scaleNamespacer.Scales(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Kind, hpa.Spec.ScaleTargetRef.Name)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.update(hpa)
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
Expand Down Expand Up @@ -412,7 +419,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
} else {
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
Expand Down Expand Up @@ -489,7 +499,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
if err != nil {
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("failed to rescale %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
Expand All @@ -501,7 +514,8 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
desiredReplicas = currentReplicas
}

return a.updateStatusWithReplicas(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
}

func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool {
Expand All @@ -528,14 +542,14 @@ func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutos
return false
}

func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
err := a.updateStatusWithReplicas(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
if err != nil {
utilruntime.HandleError(err)
}
// setCurrentReplicasInStatus sets the current replica count in the status of the HPA.
func (a *HorizontalController) setCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
}

func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) error {
// setStatus recreates the status of the given HPA, updating the current and
// desired replicas, as well as the metric statuses
func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas,
Expand All @@ -548,11 +562,19 @@ func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.Horiz
now := metav1.NewTime(time.Now())
hpa.Status.LastScaleTime = &now
}
}

return a.update(hpa)
// updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
// skip a write if we wouldn't need to update
if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
return nil
}
return a.updateStatus(newHPA)
}

func (a *HorizontalController) update(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
// updateStatus actually does the update request for the status of the given HPA
func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
// convert back to autoscalingv1
hpaRaw, err := UnsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
if err != nil {
Expand Down
80 changes: 79 additions & 1 deletion pkg/controller/podautoscaler/horizontal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (tc *testCase) verifyResults(t *testing.T) {
}
}

func (tc *testCase) runTest(t *testing.T) {
func (tc *testCase) setupController(t *testing.T) (*HorizontalController, informers.SharedInformerFactory) {
testClient, testMetricsClient, testCMClient := tc.prepareTestClient(t)
if tc.testClient != nil {
testClient = tc.testClient
Expand Down Expand Up @@ -598,6 +598,10 @@ func (tc *testCase) runTest(t *testing.T) {
)
hpaController.hpaListerSynced = alwaysReady

return hpaController, informerFactory
}

func (tc *testCase) runTestWithController(t *testing.T, hpaController *HorizontalController, informerFactory informers.SharedInformerFactory) {
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
Expand All @@ -616,6 +620,11 @@ func (tc *testCase) runTest(t *testing.T) {
tc.verifyResults(t)
}

func (tc *testCase) runTest(t *testing.T) {
hpaController, informerFactory := tc.setupController(t)
tc.runTestWithController(t, hpaController, informerFactory)
}

func TestScaleUp(t *testing.T) {
tc := testCase{
minReplicas: 2,
Expand Down Expand Up @@ -1594,4 +1603,73 @@ func TestScaleDownRCImmediately(t *testing.T) {
tc.runTest(t)
}

func TestAvoidUncessaryUpdates(t *testing.T) {
tc := testCase{
minReplicas: 2,
maxReplicas: 6,
initialReplicas: 3,
desiredReplicas: 3,
CPUTarget: 30,
CPUCurrent: 40,
verifyCPUCurrent: true,
reportedLevels: []uint64{400, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionFalse, v1.ConditionFalse},
useMetricsApi: true,
}
testClient, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient
var savedHPA *autoscalingv1.HorizontalPodAutoscaler
testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()

if savedHPA != nil {
// fake out the verification logic and mark that we're done processing
go func() {
// wait a tick and then mark that we're finished (otherwise, we have no
// way to indicate that we're finished, because the function decides not to do anything)
time.Sleep(1 * time.Second)
tc.statusUpdated = true
tc.processed <- "test-hpa"
}()
return true, &autoscalingv1.HorizontalPodAutoscalerList{
Items: []autoscalingv1.HorizontalPodAutoscaler{*savedHPA},
}, nil
}

// fallthrough
return false, nil, nil
})
testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()

if savedHPA == nil {
// save the HPA and return it
savedHPA = action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler)
return true, savedHPA, nil
}

assert.Fail(t, "should not have attempted to update the HPA when nothing changed")
// mark that we've processed this HPA
tc.processed <- ""
return true, nil, fmt.Errorf("unexpected call")
})

controller, informerFactory := tc.setupController(t)

// fake an initial processing loop to populate savedHPA
initialHPAs, err := testClient.Autoscaling().HorizontalPodAutoscalers("test-namespace").List(metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := controller.reconcileAutoscaler(&initialHPAs.Items[0]); err != nil {
t.Fatalf("unexpected error: %v", err)
}

// actually run the test
tc.runTestWithController(t, controller, informerFactory)
}

// TODO: add more tests

0 comments on commit 00a19f1

Please sign in to comment.