Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pod watch namespace for nv-ipam-node #37

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/ipam-node/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/google/renameio/v2"
"github.com/spf13/cobra"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

Expand Down Expand Up @@ -141,13 +143,22 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
Scheme: scheme,
Metrics: metricsserver.Options{BindAddress: opts.MetricsAddr},
HealthProbeBindAddress: opts.ProbeAddr,
Cache: cache.Options{DefaultNamespaces: map[string]cache.Config{opts.PoolsNamespace: {}}},
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{opts.PoolsNamespace: {}},
ByObject: map[client.Object]cache.ByObject{
&corev1.Pod{}: {Namespaces: map[string]cache.Config{cache.AllNamespaces: {}}}},
},
})
if err != nil {
logger.Error(err, "unable to initialize manager")
return err
}

k8sClient, err := client.New(config, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
logger.Error(err, "unable to direct k8s client")
}

if err = (&ippoolctrl.IPPoolReconciler{
PoolManager: poolManager,
Client: mgr.GetClient(),
Expand Down Expand Up @@ -252,7 +263,7 @@ func RunNodeDaemon(ctx context.Context, config *rest.Config, opts *options.Optio
}
return
}
c := cleaner.New(mgr.GetClient(), store, poolManager, time.Minute, 3)
c := cleaner.New(mgr.GetClient(), k8sClient, store, poolManager, time.Minute, 3)
c.Start(innerCtx)
logger.Info("cleaner stopped")
}()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
Expand Down
56 changes: 41 additions & 15 deletions pkg/ipam-node/cleaner/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ type Cleaner interface {
// New creates and initialize new cleaner instance
// "checkInterval" defines delay between checks for stale allocations.
// "checkCountBeforeRelease: defines how many check to do before remove the allocation
func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigReader,
func New(cachedClient client.Client, directClient client.Client, store storePkg.Store, poolConfReader pool.ConfigReader,
checkInterval time.Duration,
checkCountBeforeRelease int) Cleaner {
return &cleaner{
client: client,
cachedClient: cachedClient,
directClient: directClient,
store: store,
poolConfReader: poolConfReader,
checkInterval: checkInterval,
Expand All @@ -59,7 +60,8 @@ func New(client client.Client, store storePkg.Store, poolConfReader pool.ConfigR
}

type cleaner struct {
client client.Client
cachedClient client.Client
directClient client.Client
store storePkg.Store
poolConfReader pool.ConfigReader
checkInterval time.Duration
Expand Down Expand Up @@ -109,22 +111,27 @@ func (c *cleaner) loop(ctx context.Context) error {
resLogger.V(2).Info("reservation has no required metadata fields, skip")
continue
}
pod := &corev1.Pod{}
err := c.client.Get(ctx, apiTypes.NamespacedName{
Namespace: reservation.Metadata.PodNamespace,
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
// first check in the cache
found, err := c.checkPod(ctx, c.cachedClient, reservation)
if err != nil {
session.Cancel()
return fmt.Errorf("failed to read Pod info from the cache: %v", err)
}
if apiErrors.IsNotFound(err) ||
(reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) {
c.staleAllocations[key]++
resLogger.V(2).Info("pod not found in the API, increase stale counter",
"value", c.staleAllocations[key])
} else {
if !found {
resLogger.V(2).Info("cache check failed for the Pod, check in the API")
// pod not found in the cache, try to query API directly to make sure
// that we use latest state
found, err = c.checkPod(ctx, c.directClient, reservation)
if err != nil {
session.Cancel()
return fmt.Errorf("failed to read Pod info from the API: %v", err)
}
}
if found {
delete(c.staleAllocations, key)
} else {
c.staleAllocations[key]++
resLogger.V(2).Info("pod not found, increase stale counter", "value", c.staleAllocations[key])
}
}
}
Expand Down Expand Up @@ -156,6 +163,25 @@ func (c *cleaner) loop(ctx context.Context) error {
return nil
}

// return true if pod exist, error in case if an unknown error occurred
func (c *cleaner) checkPod(ctx context.Context, k8sClient client.Client, reservation types.Reservation) (bool, error) {
pod := &corev1.Pod{}
err := k8sClient.Get(ctx, apiTypes.NamespacedName{
Namespace: reservation.Metadata.PodNamespace,
Name: reservation.Metadata.PodName,
}, pod)
if err != nil && !apiErrors.IsNotFound(err) {
return false, fmt.Errorf("failed to read Pod info: %v", err)
}
if apiErrors.IsNotFound(err) ||
(reservation.Metadata.PodUUID != "" && reservation.Metadata.PodUUID != string(pod.UID)) {
// pod not found or it has different UUID (was recreated)
return false, nil
}
// pod exist
return true, nil
}

func (c *cleaner) getStaleAllocKey(poolName string, r types.Reservation) string {
return fmt.Sprintf("%s|%s|%s", poolName, r.ContainerID, r.InterfaceName)
}
15 changes: 10 additions & 5 deletions pkg/ipam-node/cleaner/cleaner_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ import (

"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/envtest"
)

var (
cfg *rest.Config
k8sClient client.Client
testEnv *envtest.Environment
cFunc context.CancelFunc
ctx context.Context
cfg *rest.Config
k8sClient client.Client
fakeClient client.Client
testEnv *envtest.Environment
cFunc context.CancelFunc
ctx context.Context
)

func TestCleaner(t *testing.T) {
Expand All @@ -56,6 +58,9 @@ var _ = BeforeSuite(func() {
k8sClient, err = client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())

fakeClient = fake.NewFakeClient()
Expect(fakeClient).NotTo(BeNil())
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ipam-node/cleaner/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var _ = Describe("Cleaner", func() {
// this will create empty pool config
session.SetLastReservedIP(testPool3, net.ParseIP("192.168.33.100"))

cleaner := cleanerPkg.New(k8sClient, store, poolManager, time.Millisecond*100, 3)
cleaner := cleanerPkg.New(fakeClient, k8sClient, store, poolManager, time.Millisecond*100, 3)

pod1UID := createPod(testPodName1, testNamespace)
_ = createPod(testPodName2, testNamespace)
Expand Down
Loading