Skip to content

Commit

Permalink
Mitigate memory leaks from long RequeueAfter periods (#1989)
Browse files Browse the repository at this point in the history
* Ensure we don't RequeueAfter for more than 10 hours

We have an issue where the underlying timer used by client-go worker
queue implementation stays in memory until it expires.
Since one gets created at every reconciliation attempt, we end up with a
big bunch of timers in memory that will expire in 365 days by default.

To mitigate the memory leak, let's wait for no more than 10 hours to
reconcile.

This is done at the level of the aggregated results, to decouple this
wokaround from any business logic like certs expiration.

For more details, see
#1984.

* Use aggregated results in the license controller
  • Loading branch information
sebgl committed Oct 14, 2019
1 parent e64c786 commit 6e6118b
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
13 changes: 13 additions & 0 deletions pkg/controller/common/reconciler/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
package reconciler

import (
"time"

k8serrors "k8s.io/apimachinery/pkg/util/errors"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// MaximumRequeueAfter is the maximum period of time in which we requeue a reconciliation.
const MaximumRequeueAfter = 10 * time.Hour

// Results collects intermediate results of a reconciliation run and any errors that occurred.
type Results struct {
results []reconcile.Result
Expand Down Expand Up @@ -56,13 +61,21 @@ func (r *Results) Apply(step string, recoverableStep func() (reconcile.Result, e
// Aggregate compares the collected results with each other and returns the most specific one.
// Where specific means requeue at a given time is more specific then generic requeue which is more specific
// than no requeue. It also returns any errors recorded.
// The aggregated `result.RequeueAfter` period will not be larger than MaximumRequeueAfter.
func (r *Results) Aggregate() (reconcile.Result, error) {
var current reconcile.Result
for _, next := range r.results {
if nextResultTakesPrecedence(current, next) {
current = next
}
}
if current.RequeueAfter > MaximumRequeueAfter {
// A client-go leaky timer issue will cause memory leaks for long requeue periods,
// see https://github.com/elastic/cloud-on-k8s/issues/1984.
// To prevent this from happening, let's restrict the requeue to a fixed short-term value.
// TODO: remove once https://github.com/kubernetes/client-go/issues/701 is fixed.
current.RequeueAfter = MaximumRequeueAfter
}
return current, k8serrors.NewAggregate(r.errors)
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/common/reconciler/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_nextTakesPrecedence(t *testing.T) {
}
}

func TestResults(t *testing.T) {
func TestResults_Aggregate(t *testing.T) {
tests := []struct {
name string
args []reconcile.Result
Expand All @@ -83,6 +83,11 @@ func TestResults(t *testing.T) {
args: []reconcile.Result{{}, {Requeue: true}, {RequeueAfter: 1 * time.Second}},
want: reconcile.Result{RequeueAfter: 1 * time.Second},
},
{
name: "multiple with large RequeueAfter: reduced to the maximum value",
args: []reconcile.Result{{}, {Requeue: true}, {RequeueAfter: 100 * time.Hour}},
want: reconcile.Result{RequeueAfter: MaximumRequeueAfter},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/license/license_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ var log = logf.Log.WithName(name)
// This happens independently from any watch triggered reconcile request.
func (r *ReconcileLicenses) Reconcile(request reconcile.Request) (reconcile.Result, error) {
defer common.LogReconciliationRun(log, request, &r.iteration)()
return r.reconcileInternal(request)
results := r.reconcileInternal(request)
return results.Aggregate()
}

// Add creates a new EnterpriseLicense Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller
Expand Down Expand Up @@ -221,26 +222,29 @@ func (r *ReconcileLicenses) reconcileClusterLicense(cluster v1beta1.Elasticsearc
return matchingSpec.ExpiryTime(), err
}

func (r *ReconcileLicenses) reconcileInternal(request reconcile.Request) (reconcile.Result, error) {
func (r *ReconcileLicenses) reconcileInternal(request reconcile.Request) *reconciler.Results {
res := &reconciler.Results{}

// Fetch the cluster to ensure it still exists
cluster := v1beta1.Elasticsearch{}
err := r.Get(request.NamespacedName, &cluster)
if err != nil {
if errors.IsNotFound(err) {
// nothing to do no cluster
return reconcile.Result{}, nil
return res
}
return reconcile.Result{}, err
return res.WithError(err)
}

if !cluster.DeletionTimestamp.IsZero() {
// cluster is being deleted nothing to do
return reconcile.Result{}, nil
return res
}

newExpiry, err := r.reconcileClusterLicense(cluster)
if err != nil {
return reconcile.Result{Requeue: true}, err
return res.WithError(err)
}
return nextReconcile(newExpiry, defaultSafetyMargin), nil

return res.WithResult(nextReconcile(newExpiry, defaultSafetyMargin))
}
2 changes: 1 addition & 1 deletion pkg/controller/license/license_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestReconcileLicenses_reconcileInternal(t *testing.T) {
checker: commonlicense.MockChecker{},
}
nsn := k8s.ExtractNamespacedName(tt.cluster)
res, err := r.reconcileInternal(reconcile.Request{NamespacedName: nsn})
res, err := r.reconcileInternal(reconcile.Request{NamespacedName: nsn}).Aggregate()
if tt.wantErr != "" {
require.EqualError(t, err, tt.wantErr)
return
Expand Down

0 comments on commit 6e6118b

Please sign in to comment.