Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWX-37601: Using informer cache's event handling instead of native k8s watchers for pod monitoring and extender metrics. #1795

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"sync"

storkv1alpha1 "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/client-go/rest"
clientCache "k8s.io/client-go/tools/cache"
controllercache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -36,6 +38,9 @@ type SharedInformerCache interface {

// ListTransformedPods lists the all the Pods from the cache after applying TransformFunc
ListTransformedPods() (*corev1.PodList, error)

// WatchPods registers the pod event handlers with the informer cache
WatchPods(fn func(object interface{})) error
}

type cache struct {
Expand Down Expand Up @@ -84,6 +89,8 @@ func CreateSharedInformerCache(mgr manager.Manager) error {
}
}

currPod.ObjectMeta.DeletionTimestamp = podResource.ObjectMeta.DeletionTimestamp

currPod.Spec.Containers = podResource.Spec.Containers
currPod.Spec.NodeName = podResource.Spec.NodeName

Expand Down Expand Up @@ -204,3 +211,23 @@ func (c *cache) ListTransformedPods() (*corev1.PodList, error) {
}
return podList, nil
}

// WatchPods uses handlers for different pod events with shared informers.
func (c *cache) WatchPods(fn func(object interface{})) error {
informer, err := c.controllerCache.GetInformer(context.Background(), &corev1.Pod{})
if err != nil {
logrus.WithError(err).Error("error getting the informer for pods")
return err
}

informer.AddEventHandler(clientCache.ResourceEventHandlerFuncs{
AddFunc: fn,
UpdateFunc: func(oldObj, newObj interface{}) {
// Only considering the new pod object
fn(newObj)
},
DeleteFunc: fn,
})

return nil
}
26 changes: 23 additions & 3 deletions pkg/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"time"

"github.com/libopenstorage/stork/drivers/volume"
storkcache "github.com/libopenstorage/stork/pkg/cache"
storklog "github.com/libopenstorage/stork/pkg/log"
restore "github.com/libopenstorage/stork/pkg/snapshot/controllers"
"github.com/portworx/sched-ops/k8s/core"
Expand Down Expand Up @@ -483,9 +484,28 @@
return nil
}

if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods due to: %v", err)
return err
podHandler := func(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
log.Errorf("invalid object type on pod watch from cache: %v", object)
} else {
fn(pod)

Check warning on line 492 in pkg/extender/extender.go

View check run for this annotation

Codecov / codecov/patch

pkg/extender/extender.go#L488-L492

Added lines #L488 - L492 were not covered by tests
}
}

if storkcache.Instance() != nil {
log.Debugf("Shared informer cache has been initialized, using it for extender metrics.")
err := storkcache.Instance().WatchPods(podHandler)
if err != nil {
log.Errorf("failed to watch pods with informer cache for health monitoring, err: %v", err)

Check warning on line 500 in pkg/extender/extender.go

View check run for this annotation

Codecov / codecov/patch

pkg/extender/extender.go#L497-L500

Added lines #L497 - L500 were not covered by tests
}
log.Errorf("failed to watch pods with informer cache for metrics, err: %v", err)

Check warning on line 502 in pkg/extender/extender.go

View check run for this annotation

Codecov / codecov/patch

pkg/extender/extender.go#L502

Added line #L502 was not covered by tests
} else {
log.Warnf("Shared informer cache has not been initialized, using watch for extender metrics.")
if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods for metrics due to: %v", err)
return err

Check warning on line 507 in pkg/extender/extender.go

View check run for this annotation

Codecov / codecov/patch

pkg/extender/extender.go#L506-L507

Added lines #L506 - L507 were not covered by tests
}
}
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/mock/cache/cache.mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,27 @@
return nil
}

if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods due to: %v", err)
return err
podHandler := func(object interface{}) {
pod, ok := object.(*v1.Pod)
if !ok {
log.Errorf("invalid object type on pod watch from cache: %v", object)
} else {
fn(pod)

Check warning on line 191 in pkg/monitor/monitor.go

View check run for this annotation

Codecov / codecov/patch

pkg/monitor/monitor.go#L187-L191

Added lines #L187 - L191 were not covered by tests
}
}

if storkcache.Instance() != nil {
log.Debugf("Shared informer cache has been initialized, using it for pod monitor.")
err := storkcache.Instance().WatchPods(podHandler)
if err != nil {
log.Errorf("failed to watch pods with informer cache for health monitoring, err: %v", err)

Check warning on line 199 in pkg/monitor/monitor.go

View check run for this annotation

Codecov / codecov/patch

pkg/monitor/monitor.go#L196-L199

Added lines #L196 - L199 were not covered by tests
}
} else {
log.Warnf("Shared informer cache has not been initialized, using watch for pod monitor.")
if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil {
log.Errorf("failed to watch pods for health monitoring due to: %v", err)
return err

Check warning on line 205 in pkg/monitor/monitor.go

View check run for this annotation

Codecov / codecov/patch

pkg/monitor/monitor.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}
Comment on lines +202 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering in what scenario will we enter inside this function? I don't think we ever will? It's still ok to keep it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly not required, I also think as a failsafe , let's keep it.

}

return nil
Expand Down