Skip to content

Commit

Permalink
use correct context to cancel list and watch & wait for all informers…
Browse files Browse the repository at this point in the history
… to complete

Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Jan 8, 2023
1 parent b756161 commit 11170e1
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ type InformersMap struct {
// paramCodec is used by list and watch
paramCodec runtime.ParameterCodec

// stop is the stop channel to stop informers
stop <-chan struct{}
// ctx is the context to stop informers
ctx context.Context

// waitGroup is the wait group that is used to wait for all informers to stop
waitGroup sync.WaitGroup

// resync is the base frequency the informers are resynced
// a 10 percent jitter will be added to the resync period between informers
Expand Down Expand Up @@ -158,24 +161,38 @@ func (ip *InformersMap) Start(ctx context.Context) error {
defer ip.mu.Unlock()

// Set the stop channel so it can be passed to informers that are added later
ip.stop = ctx.Done()
ip.ctx = ctx

ip.waitGroup.Add(len(ip.informers.Structured) + len(ip.informers.Unstructured) + len(ip.informers.Metadata))

// Start each informer
for _, i := range ip.informers.Structured {
go i.Informer.Run(ctx.Done())
i := i
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ctx.Done())
}()
}
for _, i := range ip.informers.Unstructured {
go i.Informer.Run(ctx.Done())
i := i
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ctx.Done())
}()
}
for _, i := range ip.informers.Metadata {
go i.Informer.Run(ctx.Done())
i := i
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ctx.Done())
}()
}

// Set started to true so we immediately start any informers added later.
ip.started = true
close(ip.startWait)
}()
<-ctx.Done()
ip.waitGroup.Wait() // Block until all informers have stopped
return nil
}

Expand Down Expand Up @@ -311,16 +328,16 @@ func (ip *InformersMap) addInformerToMap(gvk schema.GroupVersionKind, obj runtim
// TODO(seans): write thorough tests and document what happens here - can you add indexers?
// can you add eventhandlers?
if ip.started {
go i.Informer.Run(ip.stop)
ip.waitGroup.Add(1)
go func() {
defer ip.waitGroup.Done()
i.Informer.Run(ip.ctx.Done())
}()
}
return i, ip.started, nil
}

func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
// TODO(vincepri): Wire the context in here and don't use TODO().
// Can we use the context from the Get call?
ctx := context.TODO()

// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
Expand Down Expand Up @@ -351,16 +368,16 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
if namespace != "" {
return resources.Namespace(namespace).List(ctx, opts)
return resources.Namespace(namespace).List(ip.ctx, opts)
}
return resources.List(ctx, opts)
return resources.List(ip.ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
if namespace != "" {
return resources.Namespace(namespace).Watch(ctx, opts)
return resources.Namespace(namespace).Watch(ip.ctx, opts)
}
return resources.Watch(ctx, opts)
return resources.Watch(ip.ctx, opts)
},
}, nil
//
Expand All @@ -386,9 +403,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
err error
)
if namespace != "" {
list, err = resources.Namespace(namespace).List(ctx, opts)
list, err = resources.Namespace(namespace).List(ip.ctx, opts)
} else {
list, err = resources.List(ctx, opts)
list, err = resources.List(ip.ctx, opts)
}
if list != nil {
for i := range list.Items {
Expand All @@ -400,9 +417,9 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
if namespace != "" {
watcher, err = resources.Namespace(namespace).Watch(ctx, opts)
watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
} else {
watcher, err = resources.Watch(ctx, opts)
watcher, err = resources.Watch(ip.ctx, opts)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -433,7 +450,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime

// Create the resulting object, and execute the request.
res := listObj.DeepCopyObject()
if err := req.Do(ctx).Into(res); err != nil {
if err := req.Do(ip.ctx).Into(res); err != nil {
return nil, err
}
return res, nil
Expand All @@ -446,7 +463,7 @@ func (ip *InformersMap) makeListWatcher(gvk schema.GroupVersionKind, obj runtime
req.Namespace(namespace)
}
// Call the watch.
return req.Watch(ctx)
return req.Watch(ip.ctx)
},
}, nil
}
Expand Down

0 comments on commit 11170e1

Please sign in to comment.