From a9cc5589253b53ada4611fc56fbc14562884fb6c Mon Sep 17 00:00:00 2001 From: Levi Blackstone Date: Fri, 6 Sep 2019 14:03:42 -0600 Subject: [PATCH] Add PodAggregator for use by resource awaiters The PodAggregator encapsulates the logic required to aggregate warning/error messages from Pods related to a parent resource such as a Deployment or Job. This will make it easier to collect relevant intermediate status in a consistent way. --- CHANGELOG.md | 2 + pkg/await/await.go | 24 ++++++ pkg/await/watchers.go | 174 +++++++++++++++++++++++++++++++++++++++++ pkg/clients/clients.go | 6 +- pkg/logging/types.go | 14 ++++ 5 files changed, 217 insertions(+), 3 deletions(-) create mode 100644 pkg/await/watchers.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4434cde09d..531f94beec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ - Warn for deprecated apiVersions. (https://github.com/pulumi/pulumi-kubernetes/pull/779). +- Add PodAggregator for use by resource awaiters + (https://github.com/pulumi/pulumi-kubernetes/pull/785). ### Bug fixes diff --git a/pkg/await/await.go b/pkg/await/await.go index 668d6c3d8d..1b68f91e31 100644 --- a/pkg/await/await.go +++ b/pkg/await/await.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" ) @@ -83,6 +84,29 @@ type DeleteConfig struct { Timeout float64 } +type ResourceId struct { + Name string + Namespace string // Namespace should never be "" (use "default" instead). + GVK schema.GroupVersionKind + Generation int64 +} + +func (r ResourceId) String() string { + if len(r.Namespace) > 0 { + return r.Namespace + "/" + r.Name + } + return r.Name +} + +func ResourceIdFromUnstructured(uns *unstructured.Unstructured) ResourceId { + return ResourceId{ + Namespace: clients.NamespaceOrDefault(uns.GetNamespace()), + Name: uns.GetName(), + GVK: uns.GroupVersionKind(), + Generation: uns.GetGeneration(), + } +} + // Creation (as the usage, `await.Creation`, implies) will block until one of the following is true: // (1) the Kubernetes resource is reported to be initialized; (2) the initialization timeout has // occurred; or (3) an error has occurred while the resource was being initialized. diff --git a/pkg/await/watchers.go b/pkg/await/watchers.go new file mode 100644 index 0000000000..a4d6e579d6 --- /dev/null +++ b/pkg/await/watchers.go @@ -0,0 +1,174 @@ +// Copyright 2016-2019, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package await + +import ( + "sync" + + "github.com/golang/glog" + "github.com/pulumi/pulumi-kubernetes/pkg/await/states" + "github.com/pulumi/pulumi-kubernetes/pkg/clients" + "github.com/pulumi/pulumi-kubernetes/pkg/kinds" + "github.com/pulumi/pulumi-kubernetes/pkg/logging" + "github.com/pulumi/pulumi/pkg/diag" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" +) + +// PodAggregator tracks status for any Pods related to the owner resource, and writes +// warning/error messages to a channel that can be consumed by a resource awaiter. +type PodAggregator struct { + // Synchronization + sync.Mutex + stopped bool + + // Owner identity + owner ResourceId + + // Pod checker + checker *states.StateChecker + + // Clients + client dynamic.ResourceInterface + watcher watch.Interface + + // Messages + messages chan logging.Messages +} + +// NewPodAggregator returns an initialized PodAggregator. +func NewPodAggregator(owner ResourceId, clientset *clients.DynamicClientSet) (*PodAggregator, error) { + client, err := clients.ResourceClient(kinds.Pod, owner.Namespace, clientset) + if err != nil { + return nil, err + } + + watcher, err := client.Watch(metav1.ListOptions{}) + if err != nil { + return nil, err + } + + pa := &PodAggregator{ + stopped: false, + owner: owner, + checker: states.NewPodChecker(), + client: client, + watcher: watcher, + messages: make(chan logging.Messages), + } + go pa.run() + + return pa, nil +} + +// run contains the aggregation logic and is executed as a goroutine when a PodAggregator +// is initialized. +func (pa *PodAggregator) run() { + defer close(pa.messages) + + /* + Example ownerReference: + { + "apiVersion": "batch/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Job", + "name": "foo", + "uid": "14ba58cc-cf83-11e9-8c3a-025000000001" + } + + Note that the ownerReference does not include namespace. Instead, Pods are implicitly + filtered by namespace by the resource client. + */ + matchingOwner := func(pod *v1.Pod) bool { + for _, ref := range pod.ObjectMeta.OwnerReferences { + if ref.APIVersion != pa.owner.GVK.GroupVersion().String() { + continue + } + if ref.Kind != pa.owner.GVK.Kind { + continue + } + if ref.Name != pa.owner.Name { + continue + } + return true + } + return false + } + + checkPod := func(object runtime.Object) { + pod, err := clients.PodFromUnstructured(object.(*unstructured.Unstructured)) + if err != nil { + glog.V(3).Infof("Failed to unmarshal Pod event: %v", err) + return + } + if pod.GetGeneration() != pa.owner.Generation { + return + } + if matchingOwner(pod) { + pa.messages <- pa.checker.Update(pod).MessagesWithSeverity(diag.Warning, diag.Error) + } + } + + // Get existing Pods. + pods, err := pa.client.List(metav1.ListOptions{}) + if err != nil { + glog.V(3).Infof("Failed to list existing Pods: %v", err) + } else { + // Log errors and move on. + _ = pods.EachListItem(func(object runtime.Object) error { + checkPod(object) + return nil + }) + } + + for { + if pa.stopping() { + return + } + event := <-pa.watcher.ResultChan() + if event.Object == nil { + continue + } + checkPod(event.Object) + } +} + +// Stop safely stops a PodAggregator and underlying watch client. +func (pa *PodAggregator) Stop() { + pa.Lock() + defer pa.Unlock() + if !pa.stopped { + pa.stopped = true + pa.watcher.Stop() + } +} + +// stopping returns true if Stop() was called previously. +func (pa *PodAggregator) stopping() bool { + pa.Lock() + defer pa.Unlock() + return pa.stopped +} + +// ResultChan returns a reference to the message channel used by the PodAggregator to +// communicate warning/error messages to a resource awaiter. +func (pa *PodAggregator) ResultChan() <-chan logging.Messages { + return pa.messages +} diff --git a/pkg/clients/clients.go b/pkg/clients/clients.go index 7e10b8b1bf..b9ccd89351 100644 --- a/pkg/clients/clients.go +++ b/pkg/clients/clients.go @@ -93,7 +93,7 @@ func (dcs *DynamicClientSet) ResourceClient(gvk schema.GroupVersionKind, namespa } if namespaced { - return dcs.GenericClient.Resource(m.Resource).Namespace(namespaceOrDefault(namespace)), nil + return dcs.GenericClient.Resource(m.Resource).Namespace(NamespaceOrDefault(namespace)), nil } else { return dcs.GenericClient.Resource(m.Resource), nil } @@ -194,8 +194,8 @@ func IsNoNamespaceInfoErr(err error) bool { } } -// namespaceOrDefault returns `ns` or the the default namespace `"default"` if `ns` is empty. -func namespaceOrDefault(ns string) string { +// NamespaceOrDefault returns `ns` or the the default namespace `"default"` if `ns` is empty. +func NamespaceOrDefault(ns string) string { if ns == "" { return "default" } diff --git a/pkg/logging/types.go b/pkg/logging/types.go index 8950bb6130..f0556d343f 100644 --- a/pkg/logging/types.go +++ b/pkg/logging/types.go @@ -86,3 +86,17 @@ func (m Messages) Errors() Messages { return messages } + +// MessagesWithSeverity returns Messages matching any of the provided Severity levels. +func (m Messages) MessagesWithSeverity(sev ...diag.Severity) Messages { + var messages Messages + for _, message := range m { + for _, s := range sev { + if message.Severity == s { + messages = append(messages, message) + } + } + } + + return messages +}