Skip to content

Commit

Permalink
Merge pull request #1651 from deads2k/fix-ordering-bug
Browse files Browse the repository at this point in the history
Fix ordering bug
  • Loading branch information
openshift-merge-bot[bot] authored Jan 8, 2024
2 parents 77eeebc + a21cc97 commit 5674ec6
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 31 deletions.
89 changes: 59 additions & 30 deletions pkg/operator/revisioncontroller/revision_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ func (c RevisionController) createRevisionIfNeeded(ctx context.Context, recorder

// check to make sure that the latestRevision has the exact content we expect. No mutation here, so we start creating the next Revision only when it is required
if isLatestRevisionCurrent {
klog.V(4).Infof("Returning early, %d triggered and up to date", latestAvailableRevision)
return false, nil
}

nextRevision := latestAvailableRevision + 1
recorder.Eventf("RevisionTriggered", "new revision %d triggered by %q", nextRevision, reason)
if err := c.createNewRevision(ctx, recorder, nextRevision, reason); err != nil {
recorder.Eventf("StartingNewRevision", "new revision %d triggered by %q", nextRevision, reason)
createdNewRevision, err := c.createNewRevision(ctx, recorder, nextRevision, reason)
if err != nil {
cond := operatorv1.OperatorCondition{
Type: "RevisionControllerDegraded",
Status: operatorv1.ConditionTrue,
Expand All @@ -108,6 +110,12 @@ func (c RevisionController) createRevisionIfNeeded(ctx context.Context, recorder
return true, nil
}

if !createdNewRevision {
klog.V(4).Infof("Revision %v not created", nextRevision)
return false, nil
}
recorder.Eventf("RevisionTriggered", "new revision %d triggered by %q", nextRevision, reason)

cond := operatorv1.OperatorCondition{
Type: "RevisionControllerDegraded",
Status: operatorv1.ConditionFalse,
Expand Down Expand Up @@ -208,55 +216,80 @@ func (c RevisionController) isLatestRevisionCurrent(ctx context.Context, revisio
return true, ""
}

func (c RevisionController) createNewRevision(ctx context.Context, recorder events.Recorder, revision int32, reason string) error {
// returns true if we created a revision
func (c RevisionController) createNewRevision(ctx context.Context, recorder events.Recorder, revision int32, reason string) (bool, error) {
// Create a new InProgress status configmap
statusConfigMap := &corev1.ConfigMap{
desiredStatusConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: c.targetNamespace,
Name: nameFor("revision-status", revision),
Annotations: map[string]string{
"operator.openshift.io/revision-ready": "false",
},
},
Data: map[string]string{
"revision": fmt.Sprintf("%d", revision),
"reason": reason,
},
}
statusConfigMap, _, err := resourceapply.ApplyConfigMap(ctx, c.configMapGetter, recorder, statusConfigMap)
if err != nil {
return err
createdStatus, err := c.configMapGetter.ConfigMaps(desiredStatusConfigMap.Namespace).Create(ctx, desiredStatusConfigMap, metav1.CreateOptions{})
switch {
case apierrors.IsAlreadyExists(err):
if createdStatus == nil || len(createdStatus.UID) == 0 {
createdStatus, err = c.configMapGetter.ConfigMaps(desiredStatusConfigMap.Namespace).Get(ctx, desiredStatusConfigMap.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
}
// take a live GET here to get current status to check the annotation
if createdStatus.Annotations["operator.openshift.io/revision-ready"] == "true" {
// no work to do because our cache is out of date and when we're updated, we will be able to see the result
klog.Infof("down the branch indicating that our cache was out of date and we're trying to recreate a revision.")
return false, nil
}
// update the sync and continue
case err != nil:
return false, err
}

ownerRefs := []metav1.OwnerReference{{
APIVersion: "v1",
Kind: "ConfigMap",
Name: statusConfigMap.Name,
UID: statusConfigMap.UID,
Name: createdStatus.Name,
UID: createdStatus.UID,
}}

for _, cm := range c.configMaps {
obj, _, err := resourceapply.SyncConfigMap(ctx, c.configMapGetter, recorder, c.targetNamespace, cm.Name, c.targetNamespace, nameFor(cm.Name, revision), ownerRefs)
if err != nil {
return err
return false, err
}
if obj == nil && !cm.Optional {
return apierrors.NewNotFound(corev1.Resource("configmaps"), cm.Name)
return false, apierrors.NewNotFound(corev1.Resource("configmaps"), cm.Name)
}
}
for _, s := range c.secrets {
obj, _, err := resourceapply.SyncSecret(ctx, c.secretGetter, recorder, c.targetNamespace, s.Name, c.targetNamespace, nameFor(s.Name, revision), ownerRefs)
if err != nil {
return err
return false, err
}
if obj == nil && !s.Optional {
return apierrors.NewNotFound(corev1.Resource("secrets"), s.Name)
return false, apierrors.NewNotFound(corev1.Resource("secrets"), s.Name)
}
}

return nil
createdStatus.Annotations["operator.openshift.io/revision-ready"] = "true"
if _, err := c.configMapGetter.ConfigMaps(createdStatus.Namespace).Update(ctx, createdStatus, metav1.UpdateOptions{}); err != nil {
return false, err
}

return true, nil
}

// getLatestAvailableRevision returns the latest known revision to the operator
// This is determined by checking revision status configmaps.
func (c RevisionController) getLatestAvailableRevision(ctx context.Context) (int32, error) {
// this appears to use a cached getter. I conceded that past-David should have explicitly used Listers
configMaps, err := c.configMapGetter.ConfigMaps(c.targetNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
return 0, err
Expand All @@ -281,7 +314,7 @@ func (c RevisionController) getLatestAvailableRevision(ctx context.Context) (int
}

func (c RevisionController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
operatorSpec, _, latestAvailableRevision, resourceVersion, err := c.operatorClient.GetLatestRevisionState()
operatorSpec, _, latestAvailableRevisionSeenByOperator, resourceVersion, err := c.operatorClient.GetLatestRevisionState()
if err != nil {
return err
}
Expand All @@ -290,26 +323,22 @@ func (c RevisionController) sync(ctx context.Context, syncCtx factory.SyncContex
return nil
}

// If the operator status has 0 as its latest available revision, this is either the first revision
// or possibly the operator resource was deleted and reset back to 0, which is not what we want so check configmaps
if latestAvailableRevision == 0 {
// Check to see if current revision is accurate and if not, search through configmaps for latest revision
latestRevision, err := c.getLatestAvailableRevision(ctx)
// If the operator status's latest available revision is not the same as the observed latest revision, update the operator.
latestObservedRevision, err := c.getLatestAvailableRevision(ctx)
if err != nil {
return err
}
if latestObservedRevision != 0 && latestAvailableRevisionSeenByOperator != latestObservedRevision {
// Then make sure that revision number is what's in the operator status
_, _, err := c.operatorClient.UpdateLatestRevisionOperatorStatus(ctx, latestObservedRevision)
if err != nil {
return err
}
if latestRevision != 0 {
// Then make sure that revision number is what's in the operator status
_, _, err := c.operatorClient.UpdateLatestRevisionOperatorStatus(ctx, latestRevision)
if err != nil {
return err
}
// regardless of whether we made a change, requeue to rerun the sync with updated status
return factory.SyntheticRequeueError
}
// regardless of whether we made a change, requeue to rerun the sync with updated status
return factory.SyntheticRequeueError
}

requeue, syncErr := c.createRevisionIfNeeded(ctx, syncCtx.Recorder(), latestAvailableRevision, resourceVersion)
requeue, syncErr := c.createRevisionIfNeeded(ctx, syncCtx.Recorder(), latestAvailableRevisionSeenByOperator, resourceVersion)
if requeue && syncErr == nil {
return factory.SyntheticRequeueError
}
Expand Down
117 changes: 116 additions & 1 deletion pkg/operator/revisioncontroller/revision_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package revisioncontroller

import (
"context"
"fmt"
"reflect"
"strings"
"testing"
"time"
Expand All @@ -15,6 +17,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/klog/v2"

operatorv1 "github.com/openshift/api/operator/v1"
"github.com/openshift/library-go/pkg/operator/events"
Expand All @@ -27,6 +30,11 @@ func filterCreateActions(actions []clienttesting.Action) []runtime.Object {
if !isCreate {
continue
}
// Filter out Update actions as both clienttesting.CreateAction
// and clienttesting.UpdateAction implement the same interface
if reflect.TypeOf(a) == reflect.TypeOf(clienttesting.UpdateActionImpl{}) {
continue
}
_, isEvent := createAction.GetObject().(*v1.Event)
if isEvent {
continue
Expand Down Expand Up @@ -101,7 +109,7 @@ func TestRevisionController(t *testing.T) {
},
validateActions: func(t *testing.T, actions []clienttesting.Action, kclient *fake.Clientset) {
updatedObjects := filterUpdateActions(actions)
if len(updatedObjects) != 3 {
if len(updatedObjects) != 4 {
t.Errorf("expected 4 updated objects, but got %v", len(updatedObjects))
}
_, err := kclient.CoreV1().ConfigMaps(targetNamespace).Get(context.TODO(), "revision-status-2", metav1.GetOptions{})
Expand Down Expand Up @@ -532,3 +540,110 @@ func TestRevisionController(t *testing.T) {
})
}
}

type fakeStaticPodLatestRevisionClient struct {
v1helpers.StaticPodOperatorClient
client *StaticPodLatestRevisionClient
updateLatestRevisionOperatorStatusErrs bool
}

var _ LatestRevisionClient = &fakeStaticPodLatestRevisionClient{}

func (c fakeStaticPodLatestRevisionClient) GetLatestRevisionState() (*operatorv1.OperatorSpec, *operatorv1.OperatorStatus, int32, string, error) {
return c.client.GetLatestRevisionState()
}

func (c fakeStaticPodLatestRevisionClient) UpdateLatestRevisionOperatorStatus(ctx context.Context, latestAvailableRevision int32, updateFuncs ...v1helpers.UpdateStatusFunc) (*operatorv1.OperatorStatus, bool, error) {
if c.updateLatestRevisionOperatorStatusErrs {
return nil, false, fmt.Errorf("Operation cannot be fulfilled on kubeapiservers.operator.openshift.io \"cluster\": the object has been modified; please apply your changes to the latest version and try again")
}
return c.client.UpdateLatestRevisionOperatorStatus(ctx, latestAvailableRevision, updateFuncs...)
}

func TestRevisionControllerRevisionCreatedFailedStatusUpdate(t *testing.T) {
startingObjects := []runtime.Object{
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: targetNamespace}},
&v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "test-secret-2", Namespace: targetNamespace}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test-config", Namespace: targetNamespace}, Data: map[string]string{"key": "value", "key2": "value"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test-config-2", Namespace: targetNamespace}, Data: map[string]string{"key": "value"}},
&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "revision-status-2", Namespace: targetNamespace, Annotations: map[string]string{"operator.openshift.io/revision-ready": "false"}}},
}

testConfigs := []RevisionResource{{Name: "test-config"}, {Name: "test-config-opt", Optional: true}}
testSecrets := []RevisionResource{{Name: "test-secret"}, {Name: "test-secret-opt", Optional: true}}

staticPodOperatorClient := v1helpers.NewFakeStaticPodOperatorClient(
&operatorv1.StaticPodOperatorSpec{
OperatorSpec: operatorv1.OperatorSpec{
ManagementState: operatorv1.Managed,
},
},
&operatorv1.StaticPodOperatorStatus{
LatestAvailableRevision: 2,
NodeStatuses: []operatorv1.NodeStatus{
{
NodeName: "test-node-1",
CurrentRevision: 0,
TargetRevision: 0,
},
},
},
nil,
nil,
)

fakeStaticPodOperatosClient := &fakeStaticPodLatestRevisionClient{client: &StaticPodLatestRevisionClient{StaticPodOperatorClient: staticPodOperatorClient}, StaticPodOperatorClient: staticPodOperatorClient}

kubeClient := fake.NewSimpleClientset(startingObjects...)
eventRecorder := events.NewRecorder(kubeClient.CoreV1().Events("test"), "test-operator", &v1.ObjectReference{})

c := NewRevisionController(
targetNamespace,
testConfigs,
testSecrets,
informers.NewSharedInformerFactoryWithOptions(kubeClient, 1*time.Minute, informers.WithNamespace(targetNamespace)),
fakeStaticPodOperatosClient,
kubeClient.CoreV1(),
kubeClient.CoreV1(),
eventRecorder,
)

klog.Infof("Running NewRevisionController.Sync with UpdateLatestRevisionOperatorStatus returning an error")
// make the first UpdateLatestRevisionOperatorStatus call fail
fakeStaticPodOperatosClient.updateLatestRevisionOperatorStatusErrs = true
syncErr := c.Sync(context.TODO(), factory.NewSyncContext("RevisionController", eventRecorder))
klog.Infof("Validating NewRevisionController.Sync returned an error: %v", syncErr)
if syncErr == nil {
t.Errorf("expected error after running NewRevisionController.Sync, got nil")
return
}
_, status, _, statusErr := staticPodOperatorClient.GetStaticPodOperatorState()
if statusErr != nil {
t.Errorf("unexpected status err: %v", statusErr)
return
}
klog.Infof("Validating status.LatestAvailableRevision (%v) has not changed", status.LatestAvailableRevision)
if status.LatestAvailableRevision != 2 {
t.Errorf("unexpected status.LatestAvailableRevision: %v, expected 2", status.LatestAvailableRevision)
return
}

klog.Infof("Running NewRevisionController.Sync with UpdateLatestRevisionOperatorStatus succeeding")
// make the second UpdateLatestRevisionOperatorStatus call to succeed
fakeStaticPodOperatosClient.updateLatestRevisionOperatorStatusErrs = false
syncErr = c.Sync(context.TODO(), factory.NewSyncContext("RevisionController", eventRecorder))
if syncErr != nil && syncErr != factory.SyntheticRequeueError {
t.Errorf("unexpected error after running NewRevisionController.Sync: %v", syncErr)
return
}
_, status, _, statusErr = staticPodOperatorClient.GetStaticPodOperatorState()
if statusErr != nil {
t.Errorf("unexpected status err: %v", statusErr)
return
}
klog.Infof("Validating status.LatestAvailableRevision (%v) changed", status.LatestAvailableRevision)
if status.LatestAvailableRevision != 3 {
t.Errorf("unexpected status.LatestAvailableRevision: %v, expected 3", status.LatestAvailableRevision)
return
}
}

0 comments on commit 5674ec6

Please sign in to comment.