From f2938a8e51608eba691f4898a8127868dceb710d Mon Sep 17 00:00:00 2001 From: Levi Blackstone Date: Fri, 6 Sep 2019 14:03:42 -0600 Subject: [PATCH] Add PodAggregator for use by resource awaiters The PodAggregator encapsulates the logic required to aggregate warning/error messages from Pods related to a parent resource such as a Deployment or Job. This will make it easier to collect relevant intermediate status in a consistent way. --- CHANGELOG.md | 2 + pkg/await/await.go | 24 +++++ pkg/await/util.go | 1 + pkg/await/watchers.go | 174 +++++++++++++++++++++++++++++++++++++ pkg/await/watchers_test.go | 115 ++++++++++++++++++++++++ pkg/clients/clients.go | 6 +- pkg/logging/types.go | 14 +++ 7 files changed, 333 insertions(+), 3 deletions(-) create mode 100644 pkg/await/watchers.go create mode 100644 pkg/await/watchers_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4434cde09d..531f94beec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ - Warn for deprecated apiVersions. (https://github.com/pulumi/pulumi-kubernetes/pull/779). +- Add PodAggregator for use by resource awaiters + (https://github.com/pulumi/pulumi-kubernetes/pull/785). ### Bug fixes diff --git a/pkg/await/await.go b/pkg/await/await.go index 668d6c3d8d..1b68f91e31 100644 --- a/pkg/await/await.go +++ b/pkg/await/await.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" ) @@ -83,6 +84,29 @@ type DeleteConfig struct { Timeout float64 } +type ResourceId struct { + Name string + Namespace string // Namespace should never be "" (use "default" instead). + GVK schema.GroupVersionKind + Generation int64 +} + +func (r ResourceId) String() string { + if len(r.Namespace) > 0 { + return r.Namespace + "/" + r.Name + } + return r.Name +} + +func ResourceIdFromUnstructured(uns *unstructured.Unstructured) ResourceId { + return ResourceId{ + Namespace: clients.NamespaceOrDefault(uns.GetNamespace()), + Name: uns.GetName(), + GVK: uns.GroupVersionKind(), + Generation: uns.GetGeneration(), + } +} + // Creation (as the usage, `await.Creation`, implies) will block until one of the following is true: // (1) the Kubernetes resource is reported to be initialized; (2) the initialization timeout has // occurred; or (3) an error has occurred while the resource was being initialized. diff --git a/pkg/await/util.go b/pkg/await/util.go index 55646097f2..d186aa7b42 100644 --- a/pkg/await/util.go +++ b/pkg/await/util.go @@ -190,6 +190,7 @@ func is404(err error) bool { // -------------------------------------------------------------------------- +// TODO: Remove in favor of PodAggregator. func isOwnedBy(obj, possibleOwner *unstructured.Unstructured) bool { if possibleOwner == nil { return false diff --git a/pkg/await/watchers.go b/pkg/await/watchers.go new file mode 100644 index 0000000000..0da213e90f --- /dev/null +++ b/pkg/await/watchers.go @@ -0,0 +1,174 @@ +// Copyright 2016-2019, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package await + +import ( + "sync" + + "github.com/golang/glog" + "github.com/pulumi/pulumi-kubernetes/pkg/await/states" + "github.com/pulumi/pulumi-kubernetes/pkg/clients" + "github.com/pulumi/pulumi-kubernetes/pkg/kinds" + "github.com/pulumi/pulumi-kubernetes/pkg/logging" + "github.com/pulumi/pulumi/pkg/diag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" +) + +// PodAggregator tracks status for any Pods related to the owner resource, and writes +// warning/error messages to a channel that can be consumed by a resource awaiter. +type PodAggregator struct { + // Synchronization + sync.Mutex + stopped bool + + // Owner identity + owner ResourceId + + // Pod checker + checker *states.StateChecker + + // Clients + client dynamic.ResourceInterface + watcher watch.Interface + + // Messages + messages chan logging.Messages +} + +// NewPodAggregator returns an initialized PodAggregator. +func NewPodAggregator(owner ResourceId, clientset *clients.DynamicClientSet) (*PodAggregator, error) { + client, err := clients.ResourceClient(kinds.Pod, owner.Namespace, clientset) + if err != nil { + return nil, err + } + + watcher, err := client.Watch(metav1.ListOptions{}) + if err != nil { + return nil, err + } + + pa := &PodAggregator{ + stopped: false, + owner: owner, + checker: states.NewPodChecker(), + client: client, + watcher: watcher, + messages: make(chan logging.Messages), + } + go pa.run() + + return pa, nil +} + +// run contains the aggregation logic and is executed as a goroutine when a PodAggregator +// is initialized. +func (pa *PodAggregator) run() { + defer close(pa.messages) + + checkPod := func(object runtime.Object) { + pod, err := clients.PodFromUnstructured(object.(*unstructured.Unstructured)) + if err != nil { + glog.V(3).Infof("Failed to unmarshal Pod event: %v", err) + return + } + if relatedResource(pa.owner, pod) { + pa.messages <- pa.checker.Update(pod).MessagesWithSeverity(diag.Warning, diag.Error) + } + } + + // Get existing Pods. + pods, err := pa.client.List(metav1.ListOptions{}) + if err != nil { + glog.V(3).Infof("Failed to list existing Pods: %v", err) + } else { + // Log errors and move on. + _ = pods.EachListItem(func(object runtime.Object) error { + checkPod(object) + return nil + }) + } + + for { + if pa.stopping() { + return + } + event := <-pa.watcher.ResultChan() + if event.Object == nil { + continue + } + checkPod(event.Object) + } +} + +// Stop safely stops a PodAggregator and underlying watch client. +func (pa *PodAggregator) Stop() { + pa.Lock() + defer pa.Unlock() + if !pa.stopped { + pa.stopped = true + pa.watcher.Stop() + } +} + +// stopping returns true if Stop() was called previously. +func (pa *PodAggregator) stopping() bool { + pa.Lock() + defer pa.Unlock() + return pa.stopped +} + +// ResultChan returns a reference to the message channel used by the PodAggregator to +// communicate warning/error messages to a resource awaiter. +func (pa *PodAggregator) ResultChan() <-chan logging.Messages { + return pa.messages +} + +// relatedResource returns true if a resource ownerReference and metadata matches the provided owner +// ResourceId, false otherwise. +// +// Example ownerReference: +// { +// "apiVersion": "batch/v1", +// "blockOwnerDeletion": true, +// "controller": true, +// "kind": "Job", +// "name": "foo", +// "uid": "14ba58cc-cf83-11e9-8c3a-025000000001" +// } +func relatedResource(owner ResourceId, object metav1.Object) bool { + if owner.Namespace != object.GetNamespace() { + return false + } + if owner.Generation != object.GetGeneration() { + return false + } + for _, ref := range object.GetOwnerReferences() { + if ref.APIVersion != owner.GVK.GroupVersion().String() { + continue + } + if ref.Kind != owner.GVK.Kind { + continue + } + if ref.Name != owner.Name { + continue + } + return true + } + return false +} diff --git a/pkg/await/watchers_test.go b/pkg/await/watchers_test.go new file mode 100644 index 0000000000..315916d502 --- /dev/null +++ b/pkg/await/watchers_test.go @@ -0,0 +1,115 @@ +// Copyright 2016-2019, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package await + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestRelatedResource(t *testing.T) { + pod := &corev1.Pod{ + TypeMeta: v1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + Generation: 0, + OwnerReferences: []v1.OwnerReference{ + { + APIVersion: "batch/v1", + Kind: "Job", + Name: "baz", + UID: "14ba58cc-cf83-11e9-8c3a-025000000001", + }, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "nginx", + }, + }, + }, + } + type args struct { + owner ResourceId + object v1.Object + } + tests := []struct { + name string + args args + want bool + }{ + {"Matching pod", args{ + owner: ResourceId{ + Name: "baz", + Namespace: "bar", + GVK: schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}, + Generation: 0, + }, + object: pod, + }, true}, + {"Different namespace", args{ + owner: ResourceId{ + Name: "baz", + Namespace: "default", + GVK: schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}, + Generation: 0, + }, + object: pod, + }, false}, + {"Different name", args{ + owner: ResourceId{ + Name: "different", + Namespace: "bar", + GVK: schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}, + Generation: 0, + }, + object: pod, + }, false}, + {"Different GVK", args{ + owner: ResourceId{ + Name: "baz", + Namespace: "bar", + GVK: schema.GroupVersionKind{Group: "core", Version: "v1", Kind: "Pod"}, + Generation: 0, + }, + object: pod, + }, false}, + {"Different generation", args{ + owner: ResourceId{ + Name: "baz", + Namespace: "bar", + GVK: schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}, + Generation: 1, + }, + object: pod, + }, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := relatedResource(tt.args.owner, tt.args.object); got != tt.want { + t.Errorf("relatedResource() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/clients/clients.go b/pkg/clients/clients.go index 7e10b8b1bf..b9ccd89351 100644 --- a/pkg/clients/clients.go +++ b/pkg/clients/clients.go @@ -93,7 +93,7 @@ func (dcs *DynamicClientSet) ResourceClient(gvk schema.GroupVersionKind, namespa } if namespaced { - return dcs.GenericClient.Resource(m.Resource).Namespace(namespaceOrDefault(namespace)), nil + return dcs.GenericClient.Resource(m.Resource).Namespace(NamespaceOrDefault(namespace)), nil } else { return dcs.GenericClient.Resource(m.Resource), nil } @@ -194,8 +194,8 @@ func IsNoNamespaceInfoErr(err error) bool { } } -// namespaceOrDefault returns `ns` or the the default namespace `"default"` if `ns` is empty. -func namespaceOrDefault(ns string) string { +// NamespaceOrDefault returns `ns` or the the default namespace `"default"` if `ns` is empty. +func NamespaceOrDefault(ns string) string { if ns == "" { return "default" } diff --git a/pkg/logging/types.go b/pkg/logging/types.go index 8950bb6130..f0556d343f 100644 --- a/pkg/logging/types.go +++ b/pkg/logging/types.go @@ -86,3 +86,17 @@ func (m Messages) Errors() Messages { return messages } + +// MessagesWithSeverity returns Messages matching any of the provided Severity levels. +func (m Messages) MessagesWithSeverity(sev ...diag.Severity) Messages { + var messages Messages + for _, message := range m { + for _, s := range sev { + if message.Severity == s { + messages = append(messages, message) + } + } + } + + return messages +}