Skip to content

Commit

Permalink
Use shared informer for await logic for deployments (#1639)
Browse files Browse the repository at this point in the history
* Use SharedInformers instead of watch clients

* Use shared informer for each deployment and downstream resource

* Update changelog

* Fix tests and linting

* Limit to the same namespace

* Handle default namespace

* Fix imports

* Remove informer factory from await config

* Add a common resync period to allow catch-up polling

* Remove watchers for older GVR variations

Co-authored-by: Levi Blackstone <levi@pulumi.com>
  • Loading branch information
Vivek Lakshmanan and lblackstone authored Jun 30, 2021
1 parent 1864a08 commit 059df9f
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Handle different namespaces for server-side diff (https://github.com/pulumi/pulumi-kubernetes/pull/1631)
- Handle auto-named namespaces for server-side diff (https://github.com/pulumi/pulumi-kubernetes/pull/1633)
- *Revert* Fix hanging updates for deployment await logic (https://github.com/pulumi/pulumi-kubernetes/pull/1596)
- Use shared informer for await logic for deployments (https://github.com/pulumi/pulumi-kubernetes/pull/1639)

## 3.4.1 (June 24, 2021)

Expand Down
1 change: 0 additions & 1 deletion provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package await
import (
"context"
"fmt"

"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/clients"
"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/cluster"
"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/kinds"
Expand Down
192 changes: 129 additions & 63 deletions provider/pkg/await/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package await
import (
"context"
"fmt"
"reflect"
"k8s.io/client-go/dynamic/dynamicinformer"
"strings"
"time"

"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"

"github.com/pkg/errors"
"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/await/states"
"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/clients"
Expand All @@ -17,12 +15,16 @@ import (
"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/metadata"
"github.com/pulumi/pulumi-kubernetes/provider/v3/pkg/openapi"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

// ------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -144,53 +146,72 @@ func (dia *deploymentInitAwaiter) Await() error {
// because it doesn't do a rollout (i.e., it simply creates the Deployment and
// corresponding ReplicaSet), and therefore there is no rollout to mark as "Progressing".
//

deploymentClient, replicaSetClient, podClient, pvcClient, err := dia.makeClients()
if err != nil {
return err
}

// Create Deployment watcher.
deploymentWatcher, err := deploymentClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "could not set up watch for Deployment object %q",
dia.config.currentInputs.GetName())
}
defer deploymentWatcher.Stop()

// Create ReplicaSet watcher.
replicaSetWatcher, err := replicaSetClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for ReplicaSet objects associated with Deployment %q",
dia.config.currentInputs.GetName())
}
defer replicaSetWatcher.Stop()

// Create Pod watcher.
podWatcher, err := podClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for Pods objects associated with Deployment %q",
dia.config.currentInputs.GetName())
}
defer podWatcher.Stop()

// Create PersistentVolumeClaims watcher.
pvcWatcher, err := pvcClient.Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err,
"Could not create watcher for PersistentVolumeClaims objects associated with Deployment %q",
dia.config.currentInputs.GetName())
}
defer pvcWatcher.Stop()
stopper := make(chan struct{})
defer close(stopper)

namespace := dia.deployment.GetNamespace()
if namespace == "" {
namespace = metav1.NamespaceDefault
}
informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dia.config.clientSet.GenericClient, 60*time.Second, namespace, nil)
// Limit the lifetime of this to each deployment await for now. We can reduce this sharing further later.
informerFactory.Start(stopper)

deploymentEvents := make(chan watch.Event)
deploymentV1Informer := dia.makeInformer(
informerFactory,
schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}, deploymentEvents)
go deploymentV1Informer.Informer().Run(stopper)

replicaSetEvents := make(chan watch.Event)
replicaSetV1Informer := dia.makeInformer(
informerFactory,
schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "replicasets",
}, replicaSetEvents)
go replicaSetV1Informer.Informer().Run(stopper)

podEvents := make(chan watch.Event)
podV1Informer := dia.makeInformer(
informerFactory,
schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}, podEvents)
go podV1Informer.Informer().Run(stopper)

pvcEvents := make(chan watch.Event)
pvcV1Informer := dia.makeInformer(
informerFactory,
schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "persistentvolumeclaims",
}, pvcEvents)
go pvcV1Informer.Informer().Run(stopper)

// Wait for the cache to sync
informerFactory.WaitForCacheSync(stopper)

aggregateErrorTicker := time.NewTicker(10 * time.Second)
defer aggregateErrorTicker.Stop()

timeout := metadata.TimeoutDuration(dia.config.timeout, dia.config.currentInputs, DefaultDeploymentTimeoutMins*60)

return dia.await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), aggregateErrorTicker.C)
deploymentEvents,
replicaSetEvents,
podEvents,
pvcEvents,
time.After(timeout),
aggregateErrorTicker.C)
}

func (dia *deploymentInitAwaiter) Read() error {
Expand Down Expand Up @@ -284,8 +305,12 @@ func (dia *deploymentInitAwaiter) read(

// await is a helper companion to `Await` designed to make it easy to test this module.
func (dia *deploymentInitAwaiter) await(
deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface,
timeout, aggregateErrorTicker <-chan time.Time,
deploymentEvents <-chan watch.Event,
replicaSetEvents <-chan watch.Event,
podEvents <-chan watch.Event,
pvcEvents <-chan watch.Event,
timeout,
aggregateErrorTicker <-chan time.Time,
) error {
dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available")

Expand All @@ -311,13 +336,13 @@ func (dia *deploymentInitAwaiter) await(
for _, message := range messages {
dia.config.logMessage(message)
}
case event := <-deploymentWatcher.ResultChan():
case event := <-deploymentEvents:
dia.processDeploymentEvent(event)
case event := <-replicaSetWatcher.ResultChan():
case event := <-replicaSetEvents:
dia.processReplicaSetEvent(event)
case event := <-podWatcher.ResultChan():
case event := <-podEvents:
dia.processPodEvent(event)
case event := <-pvcWatcher.ResultChan():
case event := <-pvcEvents:
dia.processPersistentVolumeClaimsEvent(event)
}
}
Expand Down Expand Up @@ -372,8 +397,8 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {

deployment, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
logger.V(3).Infof("Deployment watch received unknown object type %q",
reflect.TypeOf(deployment))
logger.V(3).Infof("Deployment watch received unknown object type %T",
event.Object)
return
}

Expand Down Expand Up @@ -487,8 +512,8 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) {
func (dia *deploymentInitAwaiter) processReplicaSetEvent(event watch.Event) {
rs, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
logger.V(3).Infof("ReplicaSet watch received unknown object type %q",
reflect.TypeOf(rs))
logger.V(3).Infof("ReplicaSet watch received unknown object type %T",
event.Object)
return
}

Expand Down Expand Up @@ -524,12 +549,12 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() {
logger.V(3).Infof("Deployment %q has generation %q, which corresponds to ReplicaSet %q",
inputs.GetName(), dia.replicaSetGeneration, rs.GetName())

var lastGeneration string
var lastRevision string
if outputs := dia.config.lastOutputs; outputs != nil {
lastGeneration = outputs.GetAnnotations()[revision]
lastRevision = outputs.GetAnnotations()[revision]
}

logger.V(3).Infof("The last generation of Deployment %q was %q", inputs.GetName(), lastGeneration)
logger.V(3).Infof("The last generation of Deployment %q was %q", inputs.GetName(), lastRevision)

// NOTE: Check `.spec.replicas` in the live `ReplicaSet` instead of the last input `Deployment`,
// since this is the plan of record. This protects against (e.g.) a user running `kubectl scale`
Expand Down Expand Up @@ -588,7 +613,7 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() {
}

if dia.changeTriggeredRollout() {
dia.updatedReplicaSetReady = lastGeneration != dia.replicaSetGeneration && updatedReplicaSetCreated &&
dia.updatedReplicaSetReady = lastRevision != dia.replicaSetGeneration && updatedReplicaSetCreated &&
doneWaitingOnReplicas() && !unavailableReplicasPresent && !tooManyReplicas &&
expectedNumberOfUpdatedReplicas
} else {
Expand All @@ -610,7 +635,12 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() {
rs.GetName(), specReplicas, readyReplicas)

if dia.changeTriggeredRollout() {
dia.updatedReplicaSetReady = lastGeneration != dia.replicaSetGeneration && updatedReplicaSetCreated &&
logger.V(9).
Infof("Template change detected for replicaset %q - last revision: %q, current revision: %q",
rs.GetName(),
lastRevision,
dia.replicaSetGeneration)
dia.updatedReplicaSetReady = lastRevision != dia.replicaSetGeneration && updatedReplicaSetCreated &&
doneWaitingOnReplicas()
} else {
dia.updatedReplicaSetReady = updatedReplicaSetCreated &&
Expand Down Expand Up @@ -679,8 +709,8 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() {
func (dia *deploymentInitAwaiter) processPodEvent(event watch.Event) {
pod, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
logger.V(3).Infof("Pod watch received unknown object type %q",
reflect.TypeOf(pod))
logger.V(3).Infof("Pod watch received unknown object type %T",
event.Object)
return
}

Expand All @@ -703,8 +733,8 @@ func (dia *deploymentInitAwaiter) processPodEvent(event watch.Event) {
func (dia *deploymentInitAwaiter) processPersistentVolumeClaimsEvent(event watch.Event) {
pvc, isUnstructured := event.Object.(*unstructured.Unstructured)
if !isUnstructured {
logger.V(3).Infof("PersistentVolumeClaim watch received unknown object type %q",
reflect.TypeOf(pvc))
logger.V(3).Infof("PersistentVolumeClaim watch received unknown object type %T",
event.Object)
return
}

Expand Down Expand Up @@ -858,3 +888,39 @@ func (dia *deploymentInitAwaiter) makeClients() (

return
}

func (dia *deploymentInitAwaiter) makeInformer(
informerFactory dynamicinformer.DynamicSharedInformerFactory,
gvr schema.GroupVersionResource,
informChan chan<- watch.Event) informers.GenericInformer {

informer := informerFactory.ForResource(gvr)
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
informChan <- watch.Event{
Object: obj.(*unstructured.Unstructured),
Type: watch.Added,
}
},
UpdateFunc: func(_, newObj interface{}) {
informChan <- watch.Event{
Object: newObj.(*unstructured.Unstructured),
Type: watch.Modified,
}
},
DeleteFunc: func(obj interface{}) {
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
informChan <- watch.Event{
Object: unknown.Obj.(*unstructured.Unstructured),
Type: watch.Deleted,
}
} else {
informChan <- watch.Event{
Object: obj.(*unstructured.Unstructured),
Type: watch.Deleted,
}
}
},
})
return informer
}
18 changes: 8 additions & 10 deletions provider/pkg/await/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,14 +568,14 @@ func Test_Apps_Deployment(t *testing.T) {
deployments := make(chan watch.Event)
replicaSets := make(chan watch.Event)
pods := make(chan watch.Event)
pvcs := make(chan watch.Event)

timeout := make(chan time.Time)
period := make(chan time.Time)
go test.do(deployments, replicaSets, pods, timeout)

err := awaiter.await(
&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets}, &chanWatcher{results: pods},
&chanWatcher{}, timeout, period)
deployments, replicaSets, pods, pvcs, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -630,8 +630,7 @@ func Test_Apps_Deployment_With_PersistentVolumeClaims(t *testing.T) {
period := make(chan time.Time)
go test.do(deployments, replicaSets, pods, pvcs, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{results: pvcs}, timeout, period)
err := awaiter.await(deployments, replicaSets, pods, pvcs, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -684,8 +683,7 @@ func Test_Apps_Deployment_Without_PersistentVolumeClaims(t *testing.T) {
period := make(chan time.Time)
go test.do(deployments, replicaSets, pods, pvcs, timeout)

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{results: pvcs}, timeout, period)
err := awaiter.await(deployments, replicaSets, pods, pvcs, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down Expand Up @@ -734,6 +732,7 @@ func Test_Apps_Deployment_MultipleUpdates(t *testing.T) {
deployments := make(chan watch.Event)
replicaSets := make(chan watch.Event)
pods := make(chan watch.Event)
pvcs := make(chan watch.Event)

timeout := make(chan time.Time)
period := make(chan time.Time)
Expand All @@ -742,20 +741,19 @@ func Test_Apps_Deployment_MultipleUpdates(t *testing.T) {
awaiter.config.lastInputs = obj
})

err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
err := awaiter.await(deployments, replicaSets, pods, pvcs, timeout, period)
assert.Nil(t, err, test.description)

deployments = make(chan watch.Event)
replicaSets = make(chan watch.Event)
pods = make(chan watch.Event)
pvcs = make(chan watch.Event)

timeout = make(chan time.Time)
period = make(chan time.Time)
go test.secondUpdate(deployments, replicaSets, pods, timeout)

err = awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets},
&chanWatcher{results: pods}, &chanWatcher{}, timeout, period)
err = awaiter.await(deployments, replicaSets, pods, pvcs, timeout, period)
assert.Equal(t, test.expectedError, err, test.description)
}
}
Expand Down
Loading

0 comments on commit 059df9f

Please sign in to comment.