Skip to content

Commit

Permalink
Remove reflector metrics as they currently cause a memory leak
Browse files Browse the repository at this point in the history
Kubernetes-commit: ca096f8069aff73b774c8ef38900dca898c61938
  • Loading branch information
logicalhan authored and k8s-publishing-bot committed Feb 27, 2019
1 parent 269fa37 commit a1320a3
Showing 1 changed file with 1 addition and 26 deletions.
27 changes: 1 addition & 26 deletions tools/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
"net"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -96,17 +94,10 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// reflectorDisambiguator is used to disambiguate started reflectors.
// initialized to an unstable value to ensure meaning isn't attributed to the suffix.
var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
r := &Reflector{
name: name,
// we need this to be unique per process (some names are still the same) but obvious who it belongs to
metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
name: name,
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
Expand Down Expand Up @@ -174,8 +165,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
r.metrics.numberOfLists.Inc()
start := r.clock.Now()

if err := func() error {
initTrace := trace.New("Reflector " + r.name + " ListAndWatch")
Expand Down Expand Up @@ -204,7 +193,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
initTrace.Step("Objects listed")
r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
Expand All @@ -216,7 +204,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
r.metrics.numberOfItemsInList.Observe(float64(len(items)))
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
Expand Down Expand Up @@ -272,7 +259,6 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
TimeoutSeconds: &timeoutSeconds,
}

r.metrics.numberOfWatches.Inc()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
Expand Down Expand Up @@ -324,11 +310,6 @@ func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, err
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
// update metrics
defer func() {
r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
r.metrics.watchDuration.Observe(time.Since(start).Seconds())
}()

loop:
for {
Expand Down Expand Up @@ -384,7 +365,6 @@ loop:

watchDuration := r.clock.Now().Sub(start)
if watchDuration < 1*time.Second && eventCount == 0 {
r.metrics.numberOfShortWatches.Inc()
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
Expand All @@ -403,9 +383,4 @@ func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v

rv, err := strconv.Atoi(v)
if err == nil {
r.metrics.lastResourceVersion.Set(float64(rv))
}
}

0 comments on commit a1320a3

Please sign in to comment.