diff --git a/pkg/storage/cacher/cacher_whitebox_test.go b/pkg/storage/cacher/cacher_whitebox_test.go index b6d59af07..4a0115b78 100644 --- a/pkg/storage/cacher/cacher_whitebox_test.go +++ b/pkg/storage/cacher/cacher_whitebox_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -1864,3 +1865,128 @@ func TestForgetWatcher(t *testing.T) { assertCacherInternalState(0, 0) require.Equal(t, 2, forgetCounter) } + +func TestWatchStreamSeparation(t *testing.T) { + tcs := []struct { + name string + separateCacheWatchRPC bool + useWatchCacheContextMetadata bool + expectBookmarkOnWatchCache bool + expectBookmarkOnEtcd bool + }{ + { + name: "common RPC > both get bookmarks", + separateCacheWatchRPC: false, + expectBookmarkOnEtcd: true, + expectBookmarkOnWatchCache: true, + }, + { + name: "common RPC & watch cache context > both get bookmarks", + separateCacheWatchRPC: false, + useWatchCacheContextMetadata: true, + expectBookmarkOnEtcd: true, + expectBookmarkOnWatchCache: true, + }, + { + name: "separate RPC > only etcd gets bookmarks", + separateCacheWatchRPC: true, + expectBookmarkOnEtcd: true, + expectBookmarkOnWatchCache: false, + }, + { + name: "separate RPC & watch cache context > only watch cache gets bookmarks", + separateCacheWatchRPC: true, + useWatchCacheContextMetadata: true, + expectBookmarkOnEtcd: false, + expectBookmarkOnWatchCache: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)() + _, cacher, _, terminate := testSetupWithEtcdServer(t) + t.Cleanup(terminate) + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + getCacherRV := func() uint64 { + cacher.watchCache.RLock() + defer cacher.watchCache.RUnlock() + return cacher.watchCache.resourceVersion + } + waitContext, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage) + + var out example.Pod + err := cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0) + if err != nil { + t.Fatal(err) + } + versioner := storage.APIObjectVersioner{} + var lastResourceVersion uint64 + lastResourceVersion, err = versioner.ObjectResourceVersion(&out) + if err != nil { + t.Fatal(err) + } + + var contextMetadata metadata.MD + if tc.useWatchCacheContextMetadata { + contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata + } + // Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507 + // TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix. + time.Sleep(time.Second) + err = cacher.storage.RequestWatchProgress(metadata.NewOutgoingContext(context.Background(), contextMetadata)) + if err != nil { + t.Fatal(err) + } + // Give time for bookmark to arrive + time.Sleep(time.Second) + + etcdWatchResourceVersion := waitForEtcdBookmark() + gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion + if gotEtcdWatchBookmark != tc.expectBookmarkOnEtcd { + t.Errorf("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v", etcdWatchResourceVersion, etcdWatchResourceVersion, tc.expectBookmarkOnEtcd) + } + + watchCacheResourceVersion := getCacherRV() + cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion + if cacherGotBookmark != tc.expectBookmarkOnWatchCache { + t.Errorf("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v", watchCacheResourceVersion, cacherGotBookmark, tc.expectBookmarkOnWatchCache) + } + }) + } +} + +func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) { + opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true} + opts.Predicate.AllowWatchBookmarks = true + w, err := etcdStorage.Watch(ctx, "/pods/", opts) + if err != nil { + t.Fatal(err) + } + + versioner := storage.APIObjectVersioner{} + var rv uint64 + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for event := range w.ResultChan() { + if event.Type == watch.Bookmark { + rv, err = versioner.ObjectResourceVersion(event.Object) + break + } + } + }() + return func() (resourceVersion uint64) { + defer w.Stop() + wg.Wait() + if err != nil { + t.Fatal(err) + } + return rv + } +}