Skip to content

Commit

Permalink
Test that separation of streams work by using progress notifies
Browse files Browse the repository at this point in the history
Kubernetes-commit: b1e1d68cfb9acb1849423f7d0d67e87da3a359c2
  • Loading branch information
serathius authored and k8s-publishing-bot committed Feb 29, 2024
1 parent 88805ca commit b9037e3
Showing 1 changed file with 126 additions and 0 deletions.
126 changes: 126 additions & 0 deletions pkg/storage/cacher/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -1864,3 +1865,128 @@ func TestForgetWatcher(t *testing.T) {
assertCacherInternalState(0, 0)
require.Equal(t, 2, forgetCounter)
}

func TestWatchStreamSeparation(t *testing.T) {
tcs := []struct {
name string
separateCacheWatchRPC bool
useWatchCacheContextMetadata bool
expectBookmarkOnWatchCache bool
expectBookmarkOnEtcd bool
}{
{
name: "common RPC > both get bookmarks",
separateCacheWatchRPC: false,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: true,
},
{
name: "common RPC & watch cache context > both get bookmarks",
separateCacheWatchRPC: false,
useWatchCacheContextMetadata: true,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: true,
},
{
name: "separate RPC > only etcd gets bookmarks",
separateCacheWatchRPC: true,
expectBookmarkOnEtcd: true,
expectBookmarkOnWatchCache: false,
},
{
name: "separate RPC & watch cache context > only watch cache gets bookmarks",
separateCacheWatchRPC: true,
useWatchCacheContextMetadata: true,
expectBookmarkOnEtcd: false,
expectBookmarkOnWatchCache: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SeparateCacheWatchRPC, tc.separateCacheWatchRPC)()
_, cacher, _, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate)
if err := cacher.ready.wait(context.TODO()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}

getCacherRV := func() uint64 {
cacher.watchCache.RLock()
defer cacher.watchCache.RUnlock()
return cacher.watchCache.resourceVersion
}
waitContext, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
waitForEtcdBookmark := watchAndWaitForBookmark(t, waitContext, cacher.storage)

var out example.Pod
err := cacher.Create(context.Background(), "foo", &example.Pod{}, &out, 0)
if err != nil {
t.Fatal(err)
}
versioner := storage.APIObjectVersioner{}
var lastResourceVersion uint64
lastResourceVersion, err = versioner.ObjectResourceVersion(&out)
if err != nil {
t.Fatal(err)
}

var contextMetadata metadata.MD
if tc.useWatchCacheContextMetadata {
contextMetadata = cacher.watchCache.waitingUntilFresh.contextMetadata
}
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove sleep when etcd is upgraded to version with fix.
time.Sleep(time.Second)
err = cacher.storage.RequestWatchProgress(metadata.NewOutgoingContext(context.Background(), contextMetadata))
if err != nil {
t.Fatal(err)
}
// Give time for bookmark to arrive
time.Sleep(time.Second)

etcdWatchResourceVersion := waitForEtcdBookmark()
gotEtcdWatchBookmark := etcdWatchResourceVersion == lastResourceVersion
if gotEtcdWatchBookmark != tc.expectBookmarkOnEtcd {
t.Errorf("Unexpected etcd bookmark check result, rv: %d, got: %v, want: %v", etcdWatchResourceVersion, etcdWatchResourceVersion, tc.expectBookmarkOnEtcd)
}

watchCacheResourceVersion := getCacherRV()
cacherGotBookmark := watchCacheResourceVersion == lastResourceVersion
if cacherGotBookmark != tc.expectBookmarkOnWatchCache {
t.Errorf("Unexpected watch cache bookmark check result, rv: %d, got: %v, want: %v", watchCacheResourceVersion, cacherGotBookmark, tc.expectBookmarkOnWatchCache)
}
})
}
}

func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
opts.Predicate.AllowWatchBookmarks = true
w, err := etcdStorage.Watch(ctx, "/pods/", opts)
if err != nil {
t.Fatal(err)
}

versioner := storage.APIObjectVersioner{}
var rv uint64
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for event := range w.ResultChan() {
if event.Type == watch.Bookmark {
rv, err = versioner.ObjectResourceVersion(event.Object)
break
}
}
}()
return func() (resourceVersion uint64) {
defer w.Stop()
wg.Wait()
if err != nil {
t.Fatal(err)
}
return rv
}
}

0 comments on commit b9037e3

Please sign in to comment.