Skip to content

Commit

Permalink
virtual: add multi-level hierarchy support
Browse files Browse the repository at this point in the history
  • Loading branch information
sttts committed Mar 21, 2022
1 parent d898aab commit fdae37f
Show file tree
Hide file tree
Showing 12 changed files with 649 additions and 542 deletions.
4 changes: 2 additions & 2 deletions pkg/virtual/workspaces/authorization/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package auth provides mechanisms for enforcing authorization to Workspace resources in KCP
// This package is largely insired from openshift/openshift-apiserver/pkg/project/auth
// Package authorization provides mechanisms for enforcing authorization to Workspace resources in KCP
// This package is largely inspired from openshift/openshift-apiserver/pkg/project/auth
// https://github.com/openshift/openshift-apiserver/blob/9271466bfd02a9eb02fb5a43c8b9ff1ced76aca9/pkg/project/auth
package authorization
13 changes: 13 additions & 0 deletions pkg/virtual/workspaces/authorization/reviewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package authorization

import (
"context"

rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user"
Expand Down Expand Up @@ -54,6 +56,9 @@ func NewReviewer(subjectLocator rbac.SubjectLocator) *Reviewer {

// Review returns a Review for attributes.
func (r *Reviewer) Review(attributes kauthorizer.Attributes) Review {
if r.subjectLocater == nil {
return Review{}
}
subjects, err := r.subjectLocater.AllowedSubjects(attributes)
review := Review{
EvaluationError: err,
Expand All @@ -62,6 +67,14 @@ func (r *Reviewer) Review(attributes kauthorizer.Attributes) Review {
return review
}

func (r *Reviewer) Authorize(ctx context.Context, attributes kauthorizer.Attributes) (authorized kauthorizer.Decision, reason string, err error) {
review := r.Review(attributes)
if review.Allows(attributes.GetUser()) {
return kauthorizer.DecisionAllow, "", nil
}
return kauthorizer.DecisionNoOpinion, "SubjectNotAllowed", nil
}

func rbacSubjectsToUsersAndGroups(subjects []rbacv1.Subject) (users []string, groups []string) {
for _, subject := range subjects {
switch {
Expand Down
11 changes: 9 additions & 2 deletions pkg/virtual/workspaces/authorization/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,15 @@ func NewUserWorkspaceWatcher(user user.Info, lclusterName logicalcluster.Logical
func (w *userWorkspaceWatcher) GroupMembershipChanged(workspaceName string, users, groups sets.String) {
hasAccess := users.Has(w.user.GetName()) || groups.HasAny(w.user.GetGroups()...)
_, known := w.knownWorkspaces[workspaceName]

var workspace workspaceapibeta1.Workspace
projection.ProjectClusterWorkspaceToWorkspace(&workspaceapi.ClusterWorkspace{ObjectMeta: metav1.ObjectMeta{Name: workspaceName, ClusterName: w.lclusterName.String()}}, &workspace)
projection.ProjectClusterWorkspaceToWorkspace(&workspaceapi.ClusterWorkspace{
ObjectMeta: metav1.ObjectMeta{
Name: workspaceName,
ClusterName: w.lclusterName.String(),
},
}, &workspace)

switch {
// this means that we were removed from the workspace
case !hasAccess && known:
Expand All @@ -158,7 +165,7 @@ func (w *userWorkspaceWatcher) GroupMembershipChanged(workspaceName string, user
}

case hasAccess:
clusterWorkspace, err := w.clusterWorkspaceCache.GetWorkspace(w.lclusterName, workspaceName)
clusterWorkspace, err := w.clusterWorkspaceCache.Get(w.lclusterName, workspaceName)
if err != nil {
utilruntime.HandleError(err)
return
Expand Down
20 changes: 6 additions & 14 deletions pkg/virtual/workspaces/authorization/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *mockClusterClient) Cluster(name logicalcluster.LogicalCluster) kcpclien

var _ kcpclient.ClusterInterface = (*mockClusterClient)(nil)

func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, workspaces ...*workspaceapi.ClusterWorkspace) (*userWorkspaceWatcher, *fakeAuthCache, chan struct{}) {
func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, workspaces ...*workspaceapi.ClusterWorkspace) (*userWorkspaceWatcher, *fakeAuthCache) {
objects := []runtime.Object{}
for i := range workspaces {
objects = append(objects, workspaces[i])
Expand All @@ -63,14 +63,10 @@ func newTestWatcher(username string, groups []string, predicate storage.Selectio
workspaceCache := workspacecache.NewClusterWorkspaceCache(
informers.Tenancy().V1alpha1().ClusterWorkspaces().Informer(),
&mockClusterClient{mockClient: mockClient},
"",
)
fakeAuthCache := &fakeAuthCache{}

stopCh := make(chan struct{})
go workspaceCache.Run(stopCh)

return NewUserWorkspaceWatcher(&user.DefaultInfo{Name: username, Groups: groups}, "lclusterName", workspaceCache, fakeAuthCache, false, predicate), fakeAuthCache, stopCh
return NewUserWorkspaceWatcher(&user.DefaultInfo{Name: username, Groups: groups}, "lclusterName", workspaceCache, fakeAuthCache, false, predicate), fakeAuthCache
}

type fakeAuthCache struct {
Expand All @@ -95,8 +91,7 @@ func (w *fakeAuthCache) List(userInfo user.Info, labelSelector labels.Selector,
}

func TestFullIncoming(t *testing.T) {
watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newClusterWorkspaces("ns-01")...)
defer close(stopCh)
watcher, fakeAuthCache := newTestWatcher("bob", nil, matchAllPredicate(), newClusterWorkspaces("ns-01")...)
watcher.cacheIncoming = make(chan watch.Event)

go watcher.Watch()
Expand Down Expand Up @@ -144,8 +139,7 @@ func TestFullIncoming(t *testing.T) {
}

func TestAddModifyDeleteEventsByUser(t *testing.T) {
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newClusterWorkspaces("ns-01")...)
defer close(stopCh)
watcher, _ := newTestWatcher("bob", nil, matchAllPredicate(), newClusterWorkspaces("ns-01")...)
go watcher.Watch()

watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{})
Expand Down Expand Up @@ -187,8 +181,7 @@ func TestWorkspaceSelectionPredicate(t *testing.T) {
field := fields.ParseSelectorOrDie("metadata.name=ns-03")
m := workspaceutil.MatchWorkspace(labels.Everything(), field)

watcher, _, stopCh := newTestWatcher("bob", nil, m, newClusterWorkspaces("ns-01", "ns-02", "ns-03")...)
defer close(stopCh)
watcher, _ := newTestWatcher("bob", nil, m, newClusterWorkspaces("ns-01", "ns-02", "ns-03")...)

if watcher.emit == nil {
t.Fatalf("unset emit function")
Expand Down Expand Up @@ -249,8 +242,7 @@ func TestWorkspaceSelectionPredicate(t *testing.T) {
}

func TestAddModifyDeleteEventsByGroup(t *testing.T) {
watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), newClusterWorkspaces("ns-01")...)
defer close(stopCh)
watcher, _ := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), newClusterWorkspaces("ns-01")...)
go watcher.Watch()

watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"))
Expand Down
53 changes: 12 additions & 41 deletions pkg/virtual/workspaces/builder/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/kcp-dev/apimachinery/pkg/logicalcluster"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
rbacinformers "k8s.io/client-go/informers/rbac/v1"
Expand All @@ -43,21 +42,20 @@ import (
tenancywrapper "github.com/kcp-dev/kcp/pkg/virtual/framework/wrappers/tenancy"
workspaceauth "github.com/kcp-dev/kcp/pkg/virtual/workspaces/authorization"
workspacecache "github.com/kcp-dev/kcp/pkg/virtual/workspaces/cache"
virtualworkspacesregistry "github.com/kcp-dev/kcp/pkg/virtual/workspaces/registry"
"github.com/kcp-dev/kcp/pkg/virtual/workspaces/registry"
)

const WorkspacesVirtualWorkspaceName string = "workspaces"

func BuildVirtualWorkspace(rootPathPrefix string, wildcardsClusterWorkspaces workspaceinformer.ClusterWorkspaceInformer, wildcardsRbacInformers rbacinformers.Interface, kubeClusterClient kubernetes.ClusterInterface, kcpClusterClient kcpclient.ClusterInterface) framework.VirtualWorkspace {
crbInformer := wildcardsRbacInformers.ClusterRoleBindings()
_ = virtualworkspacesregistry.AddNameIndexers(crbInformer)
_ = registry.AddNameIndexers(crbInformer)

if !strings.HasSuffix(rootPathPrefix, "/") {
rootPathPrefix += "/"
}
var rootWorkspaceAuthorizationCache *workspaceauth.AuthorizationCache
var globalClusterWorkspaceCache *workspacecache.ClusterWorkspaceCache
var orgListener *orgListener

return &fixedgvs.FixedGroupVersionsVirtualWorkspace{
Name: WorkspacesVirtualWorkspaceName,
Expand All @@ -70,9 +68,6 @@ func BuildVirtualWorkspace(rootPathPrefix string, wildcardsClusterWorkspaces wor
return errors.New("WorkspaceAuthorizationCache is not ready for access")
}

if orgListener == nil || !orgListener.Ready() {
return errors.New("Organization listener is not ready for access")
}
return nil
},
RootPathResolver: func(urlPath string, requestContext context.Context) (accepted bool, prefixToStrip string, completedContext context.Context) {
Expand All @@ -84,14 +79,14 @@ func BuildVirtualWorkspace(rootPathPrefix string, wildcardsClusterWorkspaces wor
return
}
org, scope := segments[0], segments[1]
if !virtualworkspacesregistry.ScopeSet.Has(scope) {
if !registry.ScopeSet.Has(scope) {
return
}

return true, rootPathPrefix + strings.Join(segments[:2], "/"),
context.WithValue(
context.WithValue(requestContext, virtualworkspacesregistry.WorkspacesScopeKey, scope),
virtualworkspacesregistry.WorkspacesOrgKey, logicalcluster.LogicalCluster(org),
context.WithValue(requestContext, registry.WorkspacesScopeKey, scope),
registry.WorkspacesOrgKey, logicalcluster.LogicalCluster(org),
)
}
return
Expand All @@ -102,21 +97,13 @@ func BuildVirtualWorkspace(rootPathPrefix string, wildcardsClusterWorkspaces wor
AddToScheme: tenancyv1beta1.AddToScheme,
OpenAPIDefinitions: kcpopenapi.GetOpenAPIDefinitions,
BootstrapRestResources: func(mainConfig genericapiserver.CompletedConfig) (map[string]fixedgvs.RestStorageBuilder, error) {

rootTenancyClient := kcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1()
rootRBACClient := kubeClusterClient.Cluster(tenancyv1alpha1.RootCluster).RbacV1()

globalClusterWorkspaceCache = workspacecache.NewClusterWorkspaceCache(
wildcardsClusterWorkspaces.Informer(),
kcpClusterClient,
"")

rootRBACInformers := rbacwrapper.FilterInformers(tenancyv1alpha1.RootCluster, wildcardsRbacInformers)
rootSubjectLocator := frameworkrbac.NewSubjectLocator(rootRBACInformers)
rootReviewer := workspaceauth.NewReviewer(rootSubjectLocator)

rootClusterWorkspaceInformer := tenancywrapper.FilterClusterWorkspaceInformer(tenancyv1alpha1.RootCluster, wildcardsClusterWorkspaces)

globalClusterWorkspaceCache = workspacecache.NewClusterWorkspaceCache(wildcardsClusterWorkspaces.Informer(), kcpClusterClient)

rootWorkspaceAuthorizationCache = workspaceauth.NewAuthorizationCache(
rootClusterWorkspaceInformer.Lister(),
rootClusterWorkspaceInformer.Informer(),
Expand All @@ -128,23 +115,13 @@ func BuildVirtualWorkspace(rootPathPrefix string, wildcardsClusterWorkspaces wor
rootRBACInformers,
)

rootOrg := virtualworkspacesregistry.NewRootOrg(rootRBACClient, rootRBACInformers.ClusterRoleBindings(), rootReviewer, rootTenancyClient.ClusterWorkspaces(), rootWorkspaceAuthorizationCache)

orgListener = NewOrgListener(globalClusterWorkspaceCache, rootOrg, func(orgClusterName logicalcluster.LogicalCluster) *virtualworkspacesregistry.Org {
return virtualworkspacesregistry.CreateAndStartOrg(
kubeClusterClient.Cluster(orgClusterName).RbacV1(),
kcpClusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces(),
orgListener := NewOrgListener(wildcardsClusterWorkspaces, func(orgClusterName logicalcluster.LogicalCluster, initialWatchers []workspaceauth.CacheWatcher) registry.FilteredClusterWorkspaces {
return CreateAndStartOrg(
rbacwrapper.FilterInformers(orgClusterName, wildcardsRbacInformers),
rbacwrapper.FilterClusterRoleBindingInformer(orgClusterName, crbInformer),
tenancywrapper.FilterClusterWorkspaceInformer(orgClusterName, wildcardsClusterWorkspaces))
tenancywrapper.FilterClusterWorkspaceInformer(orgClusterName, wildcardsClusterWorkspaces),
initialWatchers)
})

if err := mainConfig.AddPostStartHook("clusterworkspaces.kcp.dev-workspacecache", func(context genericapiserver.PostStartHookContext) error {
go globalClusterWorkspaceCache.Run(context.StopCh)
return nil
}); err != nil {
return nil, err
}
if err := mainConfig.AddPostStartHook("clusterworkspaces.kcp.dev-workspaceauthorizationcache", func(context genericapiserver.PostStartHookContext) error {
for _, informer := range []cache.SharedIndexInformer{
wildcardsClusterWorkspaces.Informer(),
Expand All @@ -158,18 +135,12 @@ func BuildVirtualWorkspace(rootPathPrefix string, wildcardsClusterWorkspaces wor
}
}
rootWorkspaceAuthorizationCache.Run(1*time.Second, context.StopCh)
go func() {
_ = wait.PollImmediateUntil(100*time.Millisecond, func() (done bool, err error) {
return rootWorkspaceAuthorizationCache.ReadyForAccess(), nil
}, context.StopCh)
orgListener.Initialize(rootWorkspaceAuthorizationCache)
}()
return nil
}); err != nil {
return nil, err
}

workspacesRest, kubeconfigSubresourceRest := virtualworkspacesregistry.NewREST(kcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1(), kubeClusterClient, globalClusterWorkspaceCache, crbInformer, orgListener.GetOrg, rootReviewer)
workspacesRest, kubeconfigSubresourceRest := registry.NewREST(kcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1(), kubeClusterClient, kcpClusterClient, globalClusterWorkspaceCache, crbInformer, orgListener.FilteredClusterWorkspaces)
return map[string]fixedgvs.RestStorageBuilder{
"workspaces": func(apiGroupAPIServerConfig genericapiserver.CompletedConfig) (rest.Storage, error) {
return workspacesRest, nil
Expand Down
98 changes: 98 additions & 0 deletions pkg/virtual/workspaces/builder/clusterworkspaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2022 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package builder

import (
"time"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/authentication/user"
rbacinformers "k8s.io/client-go/informers/rbac/v1"

tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
workspaceinformer "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/tenancy/v1alpha1"
frameworkrbac "github.com/kcp-dev/kcp/pkg/virtual/framework/rbac"
workspaceauth "github.com/kcp-dev/kcp/pkg/virtual/workspaces/authorization"
"github.com/kcp-dev/kcp/pkg/virtual/workspaces/registry"
)

var _ registry.FilteredClusterWorkspaces = &authCacheClusterWorkspaces{}

// authCacheClusterWorkspaces implement registry.FilteredClusterWorkspaces using an
// authorization cache.
type authCacheClusterWorkspaces struct {
// workspaceLister can enumerate workspace lists that enforce policy
clusterWorkspaceLister workspaceauth.Lister
// authCache is a cache of cluster workspaces and associated subjects for a given org.
authCache *workspaceauth.AuthorizationCache
// stopCh allows stopping the authCache for this org.
stopCh chan struct{}
}

// CreateAndStartOrg creates an Org that contains all the required clients and caches to retrieve user workspaces inside an org
// As part of an Org, a WorkspaceAuthCache is created and ensured to be started.
func CreateAndStartOrg(
rbacInformers rbacinformers.Interface,
clusterWorkspaceInformer workspaceinformer.ClusterWorkspaceInformer,
initialWatchers []workspaceauth.CacheWatcher,
) *authCacheClusterWorkspaces {
authCache := workspaceauth.NewAuthorizationCache(
clusterWorkspaceInformer.Lister(),
clusterWorkspaceInformer.Informer(),
workspaceauth.NewReviewer(frameworkrbac.NewSubjectLocator(rbacInformers)),
*workspaceauth.NewAttributesBuilder().
Verb("get").
Resource(tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaces"), "workspace").
AttributesRecord,
rbacInformers,
)

cws := &authCacheClusterWorkspaces{
clusterWorkspaceLister: authCache,
stopCh: make(chan struct{}),
authCache: authCache,
}

for _, watcher := range initialWatchers {
authCache.AddWatcher(watcher)
}

cws.authCache.Run(1*time.Second, cws.stopCh)

return cws
}

func (o *authCacheClusterWorkspaces) List(user user.Info, labelSelector labels.Selector, fieldSelector fields.Selector) (*tenancyv1alpha1.ClusterWorkspaceList, error) {
return o.clusterWorkspaceLister.List(user, labelSelector, fieldSelector)
}

func (o *authCacheClusterWorkspaces) RemoveWatcher(watcher workspaceauth.CacheWatcher) {
o.authCache.RemoveWatcher(watcher)
}

func (o *authCacheClusterWorkspaces) AddWatcher(watcher workspaceauth.CacheWatcher) {
o.authCache.AddWatcher(watcher)
}

func (o *authCacheClusterWorkspaces) Ready() bool {
return o.authCache.ReadyForAccess()
}

func (o *authCacheClusterWorkspaces) Stop() {
o.stopCh <- struct{}{}
}
Loading

0 comments on commit fdae37f

Please sign in to comment.