Skip to content

Commit

Permalink
feat: review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Suen <jesse@akuity.io>
  • Loading branch information
jessesuen committed Oct 30, 2023
1 parent f370a6f commit 2f89d34
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 55 deletions.
50 changes: 23 additions & 27 deletions internal/controller/promotions/promoqueues.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func (pqs *promoQueues) initializeQueues(ctx context.Context, promos kargoapi.Pr
}
}

// tryActivate tries to mark the given Pending promotion as the active one so it can reconcile.
// tryBegin tries to mark the given Pending promotion as the active one so it can reconcile.
// Returns true if the promo is already active or became active as a result of this call.
// Returns false if it should not reconcile (another promo is active, or next in line).
func (pqs *promoQueues) tryActivate(ctx context.Context, promo *kargoapi.Promotion) bool {
func (pqs *promoQueues) tryBegin(ctx context.Context, promo *kargoapi.Promotion) bool {
if promo == nil || promo.Spec == nil {
return false
}
Expand All @@ -109,40 +109,36 @@ func (pqs *promoQueues) tryActivate(ctx context.Context, promo *kargoapi.Promoti
pqs.pendingPromoQueuesByStage[stageKey] = pq
}

activePromoName := pqs.activePromoByStage[stageKey]
if activePromoName == promo.Name {
// This promo is already active
return true
}

// Push this promo to the queue in case it doesn't exist in the queue. Note that we
// deduplicate pushes on the same object, so this is safe to call repeatedly
if pq.Push(promo) {
logger.Debug("promo added to priority queue")
}

if activePromoName := pqs.activePromoByStage[stageKey]; activePromoName != "" {
// There is already an active promo. It's either this promo or someone else.
return activePromoName == promo.Name
}

// If we get here, the Stage does not have any Promotions Running against it.
// Now check if it this promo is the one that should run next.
first := pq.Peek()
if first == nil {
// This promo exists but nothing exists in the PriorityQueue. This should not happen.
// But since there appears to be no other promos, allow this one to become the active one.
pqs.activePromoByStage[stageKey] = promo.Name
logger.Debug("activated promo (empty queue)")
return true
}
if first.GetNamespace() == promo.Namespace && first.GetName() == promo.Name {
// This promo is the first in the queue. Mark it as active and pop it off the pending queue.
popped := pq.Pop()
pqs.activePromoByStage[stageKey] = popped.GetName()
logger.Debug("activated promo")
return true
if activePromoName == "" {
// If we get here, the Stage does not have any active Promotions Running against it.
// Now check if it this promo is the one that should run next.
// NOTE: first will never be empty because of the push call above
first := pq.Peek()
if first.GetNamespace() == promo.Namespace && first.GetName() == promo.Name {
// This promo is the first in the queue. Mark it as active and pop it off the pending queue.
popped := pq.Pop()
pqs.activePromoByStage[stageKey] = popped.GetName()
logger.Debug("begin promo")
return true
}
}
return false
}

// deactivate removes the active entry for the given stage key.
// tryConclude removes the active promotion entry for the given stage key.
// This should only be called after the active promotion has become terminal.
func (pqs *promoQueues) deactivate(ctx context.Context, stageKey types.NamespacedName, promoName string) {
func (pqs *promoQueues) tryConclude(ctx context.Context, stageKey types.NamespacedName, promoName string) {
pqs.promoQueuesByStageMu.RLock()
defer pqs.promoQueuesByStageMu.RUnlock()
if pqs.activePromoByStage[stageKey] == promoName {
Expand All @@ -151,6 +147,6 @@ func (pqs *promoQueues) deactivate(ctx context.Context, stageKey types.Namespace
"promotion": promoName,
})
delete(pqs.activePromoByStage, stageKey)
logger.Debug("deactivated promo")
logger.Debug("conclude promo")
}
}
55 changes: 34 additions & 21 deletions internal/controller/promotions/promoqueues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -120,15 +119,15 @@ func TestNewPromotionsQueue(t *testing.T) {
if object == nil {
break
}
promo := object.(*kargoapi.Promotion) // nolint: forcetypeassert
promo := object.(*kargoapi.Promotion) // nolint: forcetyperequire
if lastTime != nil {
require.Greater(t, promo.CreationTimestamp.Time, *lastTime)
}
lastTime = &promo.CreationTimestamp.Time
}
}

func TestTryActivate(t *testing.T) {
func TestTryBegin(t *testing.T) {
pqs := promoQueues{
activePromoByStage: map[types.NamespacedName]string{},
pendingPromoQueuesByStage: map[types.NamespacedName]runtime.PriorityQueue{},
Expand All @@ -138,25 +137,35 @@ func TestTryActivate(t *testing.T) {
ctx := context.TODO()

// 1. nil promotion
assert.False(t, pqs.tryActivate(ctx, nil))
require.False(t, pqs.tryBegin(ctx, nil))

// 2. invalid promotion
assert.False(t, pqs.tryActivate(ctx, &kargoapi.Promotion{}))
require.False(t, pqs.tryBegin(ctx, &kargoapi.Promotion{}))

// 3. Try to activate promos not first in queue
// 3. Try to begin promos not first in queue
for _, promoName := range []string{"b", "c", "d"} {
assert.False(t, pqs.tryActivate(ctx, newPromo(testNamespace, promoName, "foo", "", now)))
assert.Equal(t, "", pqs.activePromoByStage[fooStageKey])
assert.Equal(t, 4, pqs.pendingPromoQueuesByStage[fooStageKey].Depth())
require.False(t, pqs.tryBegin(ctx, newPromo(testNamespace, promoName, "foo", "", now)))
require.Equal(t, "", pqs.activePromoByStage[fooStageKey])
require.Equal(t, 4, pqs.pendingPromoQueuesByStage[fooStageKey].Depth())
}

// 4. Now try to activate highest priority. this should succeed
assert.True(t, pqs.tryActivate(ctx, newPromo(testNamespace, "a", "foo", "", now)))
assert.Equal(t, "a", pqs.activePromoByStage[fooStageKey])
assert.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth())
// 4. Now try to begin highest priority. this should succeed
require.True(t, pqs.tryBegin(ctx, newPromo(testNamespace, "a", "foo", "", now)))
require.Equal(t, "a", pqs.activePromoByStage[fooStageKey])
require.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth())

// 5. Begin an already active promo, this should be a no-op
require.True(t, pqs.tryBegin(ctx, newPromo(testNamespace, "a", "foo", "", now)))
require.Equal(t, "a", pqs.activePromoByStage[fooStageKey])
require.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth())

// 5. Begin a promo with something else active, this should be a no-op
require.False(t, pqs.tryBegin(ctx, newPromo(testNamespace, "b", "foo", "", now)))
require.Equal(t, "a", pqs.activePromoByStage[fooStageKey])
require.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth())
}

func TestDeactivate(t *testing.T) {
func TestConclude(t *testing.T) {
pqs := promoQueues{
activePromoByStage: map[types.NamespacedName]string{},
pendingPromoQueuesByStage: map[types.NamespacedName]runtime.PriorityQueue{},
Expand All @@ -166,13 +175,17 @@ func TestDeactivate(t *testing.T) {
ctx := context.TODO()

// Test setup
assert.True(t, pqs.tryActivate(ctx, newPromo(testNamespace, "a", "foo", "", now)))
require.True(t, pqs.tryBegin(ctx, newPromo(testNamespace, "a", "foo", "", now)))

// 1. deactivate something not even active. it should be a no-op
pqs.deactivate(ctx, fooStageKey, "not-active")
assert.Equal(t, "a", pqs.activePromoByStage[fooStageKey])
// 1. conclude something not even active. it should be a no-op
pqs.tryConclude(ctx, fooStageKey, "not-active")
require.Equal(t, "a", pqs.activePromoByStage[fooStageKey])

// 2. Deactivate the active one
pqs.deactivate(ctx, fooStageKey, "a")
assert.Equal(t, "", pqs.activePromoByStage[fooStageKey])
// 2. Conclude the active one
pqs.tryConclude(ctx, fooStageKey, "a")
require.Equal(t, "", pqs.activePromoByStage[fooStageKey])

// 3. Conclude the same key, should be a noop
pqs.tryConclude(ctx, fooStageKey, "a")
require.Equal(t, "", pqs.activePromoByStage[fooStageKey])
}
8 changes: 6 additions & 2 deletions internal/controller/promotions/promotions.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (r *reconciler) Reconcile(
return result, nil
} else {
// promo is Pending. Try to activate it.
if !r.pqs.tryActivate(ctx, promo) {
if !r.pqs.tryBegin(ctx, promo) {
// It wasn't our turn. Mark this promo as pending (if it wasn't already)
if promo.Status.Phase != v1alpha1.PromotionPhasePending {
err = kubeclient.PatchStatus(ctx, r.kargoClient, promo, func(status *kargoapi.PromotionStatus) {
Expand All @@ -193,7 +193,8 @@ func (r *reconciler) Reconcile(
})
logger.Debug("executing Promotion")

// Update promo status as Running to give visibility in UI
// Update promo status as Running to give visibility in UI. Also, a promo which
// has already entered Running status will be allowed to continue to reconcile.
if promo.Status.Phase != v1alpha1.PromotionPhaseRunning {
if err = kubeclient.PatchStatus(ctx, r.kargoClient, promo, func(status *kargoapi.PromotionStatus) {
status.Phase = v1alpha1.PromotionPhaseRunning
Expand All @@ -207,6 +208,9 @@ func (r *reconciler) Reconcile(
phase := kargoapi.PromotionPhaseSucceeded
phaseError := ""

// Wrap the promoteFn() call in an anonymous function to recover() any panics, so
// we can update the promo's phase with Error if it does. This breaks an infinite
// cycle of a bad promo continuously failing to reconcile, and surfaces the error.
func() {
defer func() {
if err := recover(); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/promotions/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (e *EnqueueHighestPriorityPromotionHandler) Delete(
Namespace: promo.Namespace,
Name: promo.Spec.Stage,
}
e.pqs.deactivate(e.ctx, stageKey, promo.Name)
e.pqs.tryConclude(e.ctx, stageKey, promo.Name)
e.enqueueNext(stageKey, wq)
}
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (e *EnqueueHighestPriorityPromotionHandler) Update(
}
// This promo just went terminal. Deactivate it and enqueue
// the next highest priority promo for reconciliation
e.pqs.deactivate(e.ctx, stageKey, promo.Name)
e.pqs.tryConclude(e.ctx, stageKey, promo.Name)
e.enqueueNext(stageKey, wq)
}
}
Expand Down Expand Up @@ -115,9 +115,9 @@ func (e *EnqueueHighestPriorityPromotionHandler) enqueueNext(
e.logger.Errorf("Failed to get next highest priority Promotion (%s) for enqueue: %v", firstKey, err)
return
}
if promo == nil {
// Found a promotion in the pending queue that no longer exists.
// Pop it and loop to the next item in the queue
if promo == nil || promo.Status.Phase.IsTerminal() {
// Found a promotion in the pending queue that no longer exists
// or terminal. Pop it and loop to the next item in the queue
_ = pq.Pop()
continue
}
Expand Down

0 comments on commit 2f89d34

Please sign in to comment.