Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Honor allowed namespaces in all cluster/git operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Alfonso Acosta authored and 2opremio committed Mar 13, 2019
1 parent 3a0450f commit 47d75cf
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 61 deletions.
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Cluster interface {
// Get all of the services (optionally, from a specific namespace), excluding those
AllWorkloads(maybeNamespace string) ([]Workload, error)
SomeWorkloads([]flux.ResourceID) ([]Workload, error)
IsAllowedResource(flux.ResourceID) bool
Ping() error
Export() ([]byte, error)
Sync(SyncSet) error
Expand Down
38 changes: 34 additions & 4 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/cluster/kubernetes/resource"
fhrclient "github.com/weaveworks/flux/integrations/client/clientset/versioned"
"github.com/weaveworks/flux/ssh"
)
Expand Down Expand Up @@ -120,11 +121,14 @@ func NewCluster(client ExtendedClient, applier Applier, sshKeyRing ssh.KeyRing,
// --- cluster.Cluster

// SomeWorkloads returns the workloads named, missing out any that don't
// exist in the cluster. They do not necessarily have to be returned
// in the order requested.
// exist in the cluster or aren't in an allowed namespace.
// They do not necessarily have to be returned in the order requested.
func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload, err error) {
var workloads []cluster.Workload
for _, id := range ids {
if !c.IsAllowedResource(id) {
continue
}
ns, kind, name := id.Components()

resourceKind, ok := resourceKinds[kind]
Expand All @@ -147,7 +151,7 @@ func (c *Cluster) SomeWorkloads(ids []flux.ResourceID) (res []cluster.Workload,
return workloads, nil
}

// AllWorkloads returns all workloads matching the criteria; that is, in
// AllWorkloads returns all workloads in allowed namespaces matching the criteria; that is, in
// the namespace (or any namespace if that argument is empty)
func (c *Cluster) AllWorkloads(namespace string) (res []cluster.Workload, err error) {
namespaces, err := c.getAllowedNamespaces()
Expand Down Expand Up @@ -282,7 +286,7 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
nsList = append(nsList, *ns)
case apierrors.IsUnauthorized(err) || apierrors.IsForbidden(err) || apierrors.IsNotFound(err):
if !c.loggedAllowedNS[name] {
c.logger.Log("warning", "cannot access namespace set as allowed",
c.logger.Log("warning", "cannot access allowed namespace",
"namespace", name, "err", err)
c.loggedAllowedNS[name] = true
}
Expand All @@ -300,6 +304,32 @@ func (c *Cluster) getAllowedNamespaces() ([]apiv1.Namespace, error) {
return namespaces.Items, nil
}

func (c *Cluster) IsAllowedResource(id flux.ResourceID) bool {
if len(c.allowedNamespaces) == 0 {
// All resources are allowed when all namespaces are allowed
return true
}

namespace, kind, name := id.Components()
namespaceToCheck := namespace

if namespace == resource.ClusterScope {
// All cluster-scoped resources (not namespaced) are allowed ...
if kind != "namespace" {
return true
}
// ... except namespaces themselves, whose name needs to be explicitly allowed
namespaceToCheck = name
}

for _, allowedNS := range c.allowedNamespaces {
if namespaceToCheck == allowedNS {
return true
}
}
return false
}

// kind & apiVersion must be passed separately as the object's TypeMeta is not populated
func appendYAML(buffer *bytes.Buffer, apiVersion, kind string, object interface{}) error {
yamlBytes, err := k8syaml.Marshal(object)
Expand Down
64 changes: 48 additions & 16 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ func (c *Cluster) Sync(syncSet cluster.SyncSet) error {

// NB we get all resources, since we care about leaving unsynced,
// _ignored_ resources alone.
clusterResources, err := c.getResourcesBySelector("")
clusterResources, err := c.getAllowedResourcesBySelector("")
if err != nil {
return errors.Wrap(err, "collating resources in cluster for sync")
}

cs := makeChangeSet()
var errs cluster.SyncError
for _, res := range syncSet.Resources {
id := res.ResourceID().String()
resID := res.ResourceID()
if !c.IsAllowedResource(resID) {
continue
}
id := resID.String()
// make a record of the checksum, whether we stage it to
// be applied or not, so that we don't delete it later.
csum := sha1.Sum(res.Bytes())
Expand Down Expand Up @@ -121,7 +125,7 @@ func (c *Cluster) collectGarbage(

orphanedResources := makeChangeSet()

clusterResources, err := c.getGCMarkedResourcesInSyncSet(syncSet.Name)
clusterResources, err := c.getAllowedGCMarkedResourcesInSyncSet(syncSet.Name)
if err != nil {
return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection")
}
Expand Down Expand Up @@ -187,7 +191,7 @@ func (r *kuberesource) GetGCMark() string {
return r.obj.GetLabels()[gcMarkLabel]
}

func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) {
func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
listOptions.LabelSelector = selector
Expand Down Expand Up @@ -215,19 +219,17 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
if !contains(verbs, "list") {
continue
}

groupVersion, err := schema.ParseGroupVersion(resource.GroupVersion)
if err != nil {
return nil, err
}

resourceClient := c.client.dynamicClient.Resource(groupVersion.WithResource(apiResource.Name))
data, err := resourceClient.List(listOptions)
gvr := groupVersion.WithResource(apiResource.Name)
list, err := c.listAllowedResources(apiResource.Namespaced, gvr, listOptions)
if err != nil {
return nil, err
}

for i, item := range data.Items {
for i, item := range list {
apiVersion := item.GetAPIVersion()
kind := item.GetKind()

Expand All @@ -238,7 +240,7 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
}
// TODO(michael) also exclude anything that has an ownerReference (that isn't "standard"?)

res := &kuberesource{obj: &data.Items[i], namespaced: apiResource.Namespaced}
res := &kuberesource{obj: &list[i], namespaced: apiResource.Namespaced}
result[res.ResourceID().String()] = res
}
}
Expand All @@ -247,18 +249,48 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return result, nil
}

func (c *Cluster) getGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
func (c *Cluster) listAllowedResources(
namespaced bool, gvr schema.GroupVersionResource, options meta_v1.ListOptions) ([]unstructured.Unstructured, error) {
if !namespaced || len(c.allowedNamespaces) == 0 {
// The resource is not namespaced or all the namespaces are allowed
resourceClient := c.client.dynamicClient.Resource(gvr)
data, err := resourceClient.List(options)
if err != nil {
return nil, err
}
return data.Items, nil
}

// List resources only from the allowed namespaces
var result []unstructured.Unstructured
for _, ns := range c.allowedNamespaces {
data, err := c.client.dynamicClient.Resource(gvr).Namespace(ns).List(options)
if err != nil {
return result, err
}
result = append(result, data.Items...)
}
return result, nil
}

func (c *Cluster) getAllowedGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getAllowedResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
if err != nil {
return nil, err
}
syncSetGCMarkedResources := map[string]*kuberesource{}
allowedSyncSetGCMarkedResources := map[string]*kuberesource{}
for resID, kres := range allGCMarkedResources {
if kres.GetGCMark() == makeGCMark(syncSetName, resID) {
syncSetGCMarkedResources[resID] = kres
// Discard disallowed resources
if !c.IsAllowedResource(kres.ResourceID()) {
continue
}
// Discard resources out of the Sync Set
if kres.GetGCMark() != makeGCMark(syncSetName, resID) {
continue
}
allowedSyncSetGCMarkedResources[resID] = kres
}
return syncSetGCMarkedResources, nil
return allowedSyncSetGCMarkedResources, nil
}

func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) {
Expand Down
6 changes: 3 additions & 3 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ metadata:
panic(err)
}

// Now check what resources remain in the sync set
actual, err := kube.getGCMarkedResourcesInSyncSet("testset")
// Now check that the resources were created
actual, err := kube.getAllowedGCMarkedResourcesInSyncSet("testset")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -553,7 +553,7 @@ spec:
assert.NoError(t, err)

// Check that our resource-getting also sees the pre-existing resource
resources, err := kube.getResourcesBySelector("")
resources, err := kube.getAllowedResourcesBySelector("")
assert.NoError(t, err)
assert.Contains(t, resources, "foobar:deployment/dep1")

Expand Down
23 changes: 14 additions & 9 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (

// Doubles as a cluster.Cluster and cluster.Manifests implementation
type Mock struct {
AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error)
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error)
UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
AllWorkloadsFunc func(maybeNamespace string) ([]Workload, error)
SomeWorkloadsFunc func([]flux.ResourceID) ([]Workload, error)
IsAllowedResourceFunc func(flux.ResourceID) bool
PingFunc func() error
ExportFunc func() ([]byte, error)
SyncFunc func(SyncSet) error
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
UpdateImageFunc func(def []byte, id flux.ResourceID, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(base string, paths []string) (map[string]resource.Resource, error)
UpdatePoliciesFunc func([]byte, flux.ResourceID, policy.Update) ([]byte, error)
}

func (m *Mock) AllWorkloads(maybeNamespace string) ([]Workload, error) {
Expand All @@ -29,6 +30,10 @@ func (m *Mock) SomeWorkloads(s []flux.ResourceID) ([]Workload, error) {
return m.SomeWorkloadsFunc(s)
}

func (m *Mock) IsAllowedResource(id flux.ResourceID) bool {
return m.IsAllowedResourceFunc(id)
}

func (m *Mock) Ping() error {
return m.PingFunc()
}
Expand Down
9 changes: 7 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu
var anythingAutomated bool

for workloadID, u := range updates {
if d.Cluster.IsAllowedResource(workloadID) {
result.Result[workloadID] = update.WorkloadResult{
Status: update.ReleaseStatusSkipped,
}
}
if policy.Set(u.Add).Has(policy.Automated) {
anythingAutomated = true
}
Expand Down Expand Up @@ -707,7 +712,7 @@ func policyEvents(us policy.Updates, now time.Time) map[string]event.Event {
// policyEventTypes is a deduped list of all event types this update contains
func policyEventTypes(u policy.Update) []string {
types := map[string]struct{}{}
for p, _ := range u.Add {
for p := range u.Add {
switch {
case p == policy.Automated:
types[event.EventAutomate] = struct{}{}
Expand All @@ -718,7 +723,7 @@ func policyEventTypes(u policy.Update) []string {
}
}

for p, _ := range u.Remove {
for p := range u.Remove {
switch {
case p == policy.Automated:
types[event.EventDeautomate] = struct{}{}
Expand Down
1 change: 1 addition & 0 deletions daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func mockDaemon(t *testing.T) (*Daemon, func(), func(), *cluster.Mock, *mockEven
}
return []cluster.Workload{}, nil
}
k8s.IsAllowedResourceFunc = func(flux.ResourceID) bool { return true }
k8s.ExportFunc = func() ([]byte, error) { return testBytes, nil }
k8s.PingFunc = func() error { return nil }
k8s.SomeWorkloadsFunc = func([]flux.ResourceID) ([]cluster.Workload, error) {
Expand Down
4 changes: 2 additions & 2 deletions release/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ func (rc *ReleaseContext) SelectWorkloads(results update.Result, prefilters, pos
for _, s := range allDefined {
res := s.Filter(prefilters...)
if res.Error == "" {
// Give these a default value, in case we don't find them
// Give these a default value, in case we cannot access them
// in the cluster.
results[s.ResourceID] = update.WorkloadResult{
Status: update.ReleaseStatusSkipped,
Error: update.NotInCluster,
Error: update.NotAccessibleInCluster,
}
toAskClusterAbout = append(toAskClusterAbout, s.ResourceID)
} else {
Expand Down
Loading

0 comments on commit 47d75cf

Please sign in to comment.