Skip to content

Commit

Permalink
Merge pull request #2121 from inteon/informer_map_context
Browse files Browse the repository at this point in the history
✨Use correct context to cancel "list and watch" & wait for all informers to complete
  • Loading branch information
k8s-ci-robot committed Jan 16, 2023
2 parents 8499b67 + 45e7aa7 commit 5028a59
Showing 1 changed file with 48 additions and 28 deletions.
76 changes: 48 additions & 28 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ type InformersMap struct {
// paramCodec is used by list and watch
paramCodec runtime.ParameterCodec

// stop is the stop channel to stop informers
stop <-chan struct{}

// resync is the base frequency the informers are resynced
// a 10 percent jitter will be added to the resync period between informers
// so that all informers will not send list requests simultaneously.
Expand All @@ -128,13 +125,22 @@ type InformersMap struct {
// mu guards access to the map
mu sync.RWMutex

// start is true if the informers have been started
// started is true if the informers have been started
started bool

// startWait is a channel that is closed after the
// informer has been started.
startWait chan struct{}

// waitGroup is the wait group that is used to wait for all informers to stop
waitGroup sync.WaitGroup

// stopped is true if the informers have been stopped
stopped bool

// ctx is the context to stop informers
ctx context.Context

// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string
Expand All @@ -157,28 +163,47 @@ func (ip *InformersMap) Start(ctx context.Context) error {
ip.mu.Lock()
defer ip.mu.Unlock()

// Set the stop channel so it can be passed to informers that are added later
ip.stop = ctx.Done()
// Set the context so it can be passed to informers that are added later
ip.ctx = ctx

// Start each informer
for _, i := range ip.informers.Structured {
go i.Informer.Run(ctx.Done())
ip.startInformerLocked(i.Informer)
}
for _, i := range ip.informers.Unstructured {
go i.Informer.Run(ctx.Done())
ip.startInformerLocked(i.Informer)
}
for _, i := range ip.informers.Metadata {
go i.Informer.Run(ctx.Done())
ip.startInformerLocked(i.Informer)
}

// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()
<-ctx.Done()
<-ctx.Done() // Block until the context is done
ip.mu.Lock()
ip.stopped = true // Set stopped to true so we don't start any new informers
ip.mu.Unlock()
ip.waitGroup.Wait() // Block until all informers have stopped
return nil
}

func (ip *InformersMap) startInformerLocked(informer cache.SharedIndexInformer) {
// Don't start the informer in case we are already waiting for the items in
// the waitGroup to finish, since waitGroups don't support waiting and adding
// at the same time.
if ip.stopped {
return
}

ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
informer.Run(ip.ctx.Done())
}()
}

func (ip *InformersMap) waitForStarted(ctx context.Context) bool {
select {
case <-ip.startWait:
Expand Down Expand Up @@ -307,20 +332,15 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
}
ip.informersByType(obj)[gvk] = i

// Start the Informer if need by
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
// Start the informer in case the InformersMap has started, otherwise it will be
// started when the InformersMap starts.
if ip.started {
go i.Informer.Run(ip.stop)
ip.startInformerLocked(i.Informer)
}
return i, ip.started, nil
}

func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
// TODO(vincepri): Wire the context in here and don't use TODO().
// Can we use the context from the Get call?
ctx := context.TODO()

// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand Down Expand Up @@ -351,16 +371,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if namespace != "" {
return resources.Namespace(namespace).List(ctx, opts)
return resources.Namespace(namespace).List(ip.ctx, opts)
}
return resources.List(ctx, opts)
return resources.List(ip.ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
if namespace != "" {
return resources.Namespace(namespace).Watch(ctx, opts)
return resources.Namespace(namespace).Watch(ip.ctx, opts)
}
return resources.Watch(ctx, opts)
return resources.Watch(ip.ctx, opts)
},
}, nil
//
Expand All @@ -386,9 +406,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
err error
)
if namespace != "" {
list, err = resources.Namespace(namespace).List(ctx, opts)
list, err = resources.Namespace(namespace).List(ip.ctx, opts)
} else {
list, err = resources.List(ctx, opts)
list, err = resources.List(ip.ctx, opts)
}
if list != nil {
for i := range list.Items {
Expand All @@ -400,9 +420,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
if namespace != "" {
watcher, err = resources.Namespace(namespace).Watch(ctx, opts)
watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
} else {
watcher, err = resources.Watch(ctx, opts)
watcher, err = resources.Watch(ip.ctx, opts)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -433,7 +453,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime

// Create the resulting object, and execute the request.
res := listObj.DeepCopyObject()
if err := req.Do(ctx).Into(res); err != nil {
if err := req.Do(ip.ctx).Into(res); err != nil {
return nil, err
}
return res, nil
Expand All @@ -446,7 +466,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
req.Namespace(namespace)
}
// Call the watch.
return req.Watch(ctx)
return req.Watch(ip.ctx)
},
}, nil
}
Expand Down

0 comments on commit 5028a59

Please sign in to comment.