Skip to content

Commit

Permalink
apiserver/storage/watchcache: WaitUntilFreshAndList supports path prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
p0lyn0mial committed Jun 27, 2024
1 parent c259fe2 commit 2f9660d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 15 deletions.
9 changes: 4 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(
chanSize,
filterWithAttrsFunction(key, pred),
filterWithAttrsAndPrefixFunction(key, pred),
emptyFunc,
c.versioner,
deadline,
Expand Down Expand Up @@ -809,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
}
return nil, readResourceVersion, "", nil
}
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex(ctx))
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
}

// GetList implements storage.Interface
Expand Down Expand Up @@ -885,7 +885,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(preparedKey, pred)

objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
if err != nil {
Expand All @@ -905,7 +904,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
Expand Down Expand Up @@ -1320,7 +1319,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
}
}

func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
if !hasPathPrefix(objKey, key) {
return false
Expand Down
25 changes: 23 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,29 @@ func (s sortableStoreElements) Swap(i, j int) {

// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
if err != nil {
return nil, 0, "", err
}

var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, key) {
continue
}
result = append(result, item)
}

sort.Sort(sortableStoreElements(result))
return result, rv, index, nil
}

func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
Expand All @@ -511,7 +533,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}

defer func() { sort.Sort(sortableStoreElements(result)) }()
defer w.RUnlock()
if err != nil {
return result, rv, index, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}()

// list by empty MatchValues.
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil)
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -481,7 +481,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
{IndexName: "l:label", Value: "value1"},
{IndexName: "f:spec.nodeName", Value: "node2"},
}
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -500,7 +500,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
{IndexName: "l:not-exist-label", Value: "whatever"},
{IndexName: "f:spec.nodeName", Value: "node2"},
}
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -518,7 +518,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
matchValues = []storage.MatchValue{
{IndexName: "l:not-exist-label", Value: "whatever"},
}
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues)
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
}()

// list from future revision. Requires watch cache to request bookmark to get it.
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil)
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store.Add(makeTestPod("bar", 4))
}()

_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil)
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err)
}
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestReflectorForWatchCache(t *testing.T) {
defer store.Stop()

{
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil)
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -678,7 +678,7 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(wait.NeverStop)

{
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil)
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down

0 comments on commit 2f9660d

Please sign in to comment.