From 468f9cd5d06670359458c565bbdc8c48adb8ce1e Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Fri, 18 Oct 2024 17:19:00 +0200 Subject: [PATCH] refactor(promotion)!: remove priority queue Signed-off-by: Hidde Beydals --- internal/controller/promotions/promoqueues.go | 150 -------------- .../controller/promotions/promoqueues_test.go | 190 ------------------ internal/controller/promotions/promotions.go | 85 +------- .../controller/promotions/promotions_test.go | 53 +++-- internal/controller/promotions/watches.go | 132 ------------ internal/controller/runtime/queues.go | 177 ---------------- internal/controller/runtime/queues_test.go | 158 --------------- 7 files changed, 44 insertions(+), 901 deletions(-) delete mode 100644 internal/controller/promotions/promoqueues.go delete mode 100644 internal/controller/promotions/promoqueues_test.go delete mode 100644 internal/controller/runtime/queues.go delete mode 100644 internal/controller/runtime/queues_test.go diff --git a/internal/controller/promotions/promoqueues.go b/internal/controller/promotions/promoqueues.go deleted file mode 100644 index d56a2a716..000000000 --- a/internal/controller/promotions/promoqueues.go +++ /dev/null @@ -1,150 +0,0 @@ -package promotions - -import ( - "context" - "sync" - - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - - kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/controller/runtime" - "github.com/akuity/kargo/internal/logging" -) - -// promoQueues is a data structure to hold priority queues of all Stages -// as well as the "active" promotion for each stage -type promoQueues struct { - // activePromoByStage holds the active promotion for a given stage (if any) - activePromoByStage map[types.NamespacedName]string - // pendingPromoQueuesByStage holds a priority queue of promotions, per Stage. We allow one - // promotion to run at a time, ordered by creationTimestamp. - pendingPromoQueuesByStage map[types.NamespacedName]runtime.PriorityQueue - // promoQueuesByStageMu protects access to the above maps - promoQueuesByStageMu sync.RWMutex -} - -func newPriorityQueue() runtime.PriorityQueue { - // We can safely ignore errors here because the only error that can happen - // involves initializing the queue with a nil priority function, which we - // know we aren't doing. - pq, _ := runtime.NewPriorityQueue(func(left, right client.Object) bool { - if left.GetCreationTimestamp().Time.Equal( - right.GetCreationTimestamp().Time, - ) { - return left.GetName() < right.GetName() - } - return left.GetCreationTimestamp().Time. - Before(right.GetCreationTimestamp().Time) - }) - return pq -} - -// initializeQueues adds the promotion list to relevant priority queues. -// This is intended to be invoked ONCE and the caller MUST ensure that. -func (pqs *promoQueues) initializeQueues(ctx context.Context, promos kargoapi.PromotionList) { - pqs.promoQueuesByStageMu.Lock() - defer pqs.promoQueuesByStageMu.Unlock() - logger := logging.LoggerFromContext(ctx) - for _, promo := range promos.Items { - if promo.Status.Phase.IsTerminal() || len(promo.Spec.Stage) == 0 { - continue - } - stage := types.NamespacedName{ - Namespace: promo.Namespace, - Name: promo.Spec.Stage, - } - pq, ok := pqs.pendingPromoQueuesByStage[stage] - if !ok { - pq = newPriorityQueue() - pqs.pendingPromoQueuesByStage[stage] = pq - } - if promo.Status.Phase == kargoapi.PromotionPhaseRunning { - if pqs.activePromoByStage[stage] == "" { - pqs.activePromoByStage[stage] = promo.Name - } - continue - } - pq.Push(&promo) - logger.Debug( - "pushed Promotion onto Stage-specific Promotion queue", - "promotion", promo.Name, - "namespace", promo.Namespace, - "stage", promo.Spec.Stage, - "phase", promo.Status.Phase, - ) - } - for stage, pq := range pqs.pendingPromoQueuesByStage { - logger.Debug( - "Stage-specific Promotion queue initialized", - "stage", stage.Name, - "namespace", stage.Namespace, - "depth", pq.Depth(), - ) - } -} - -// tryBegin tries to mark the given Pending promotion as the active one, so it can reconcile. -// Returns true if the promo is already active or became active as a result of this call. -// Returns false if it should not reconcile (another promo is active, or next in line). -func (pqs *promoQueues) tryBegin(ctx context.Context, promo *kargoapi.Promotion) bool { - if promo == nil || len(promo.Spec.Stage) == 0 { - return false - } - stageKey := types.NamespacedName{ - Namespace: promo.Namespace, - Name: promo.Spec.Stage, - } - logger := logging.LoggerFromContext(ctx) - - pqs.promoQueuesByStageMu.Lock() - defer pqs.promoQueuesByStageMu.Unlock() - - pq, ok := pqs.pendingPromoQueuesByStage[stageKey] - if !ok { - // PriorityQueue for the stage has not been initialized - pq = newPriorityQueue() - pqs.pendingPromoQueuesByStage[stageKey] = pq - } - - activePromoName := pqs.activePromoByStage[stageKey] - if activePromoName == promo.Name { - // This promo is already active - return true - } - - // Push this promo to the queue in case it doesn't exist in the queue. Note that we - // deduplicate pushes on the same object, so this is safe to call repeatedly - if pq.Push(promo) { - logger.Debug("promo added to priority queue") - } - if activePromoName == "" { - // If we get here, the Stage does not have any active Promotions Running against it. - // Now check if it is this promo is the one that should run next. - // NOTE: first will never be empty because of the push call above - first := pq.Peek() - if first.GetNamespace() == promo.Namespace && first.GetName() == promo.Name { - // This promo is the first in the queue. Mark it as active and pop it off the pending queue. - popped := pq.Pop() - pqs.activePromoByStage[stageKey] = popped.GetName() - logger.Debug("begin promo") - return true - } - } - return false -} - -// conclude removes the given active promotion entry for the given stage key. -// This should only be called after the active promotion has become terminal. -func (pqs *promoQueues) conclude(ctx context.Context, stageKey types.NamespacedName, promoName string) { - pqs.promoQueuesByStageMu.RLock() - defer pqs.promoQueuesByStageMu.RUnlock() - if pqs.activePromoByStage[stageKey] == promoName { - delete(pqs.activePromoByStage, stageKey) - logging.LoggerFromContext(ctx).Debug( - "concluded promo", - "namespace", stageKey.Namespace, - "promotion", promoName, - ) - } -} diff --git a/internal/controller/promotions/promoqueues_test.go b/internal/controller/promotions/promoqueues_test.go deleted file mode 100644 index 80b34450a..000000000 --- a/internal/controller/promotions/promoqueues_test.go +++ /dev/null @@ -1,190 +0,0 @@ -package promotions - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - - kargoapi "github.com/akuity/kargo/api/v1alpha1" - "github.com/akuity/kargo/internal/controller/runtime" -) - -var ( - now = metav1.Now() - before = metav1.Time{Time: now.Add(time.Second * -1)} - after = metav1.Time{Time: now.Add(time.Second)} - - fooStageKey = types.NamespacedName{Namespace: testNamespace, Name: "foo"} - barStageKey = types.NamespacedName{Namespace: testNamespace, Name: "bar"} - - testNamespace = "default" - testPromos = kargoapi.PromotionList{ - Items: []kargoapi.Promotion{ - // foo stage. two have same creation timestamp but different names - *newPromo(testNamespace, "d", "foo", "", after), - *newPromo(testNamespace, "b", "foo", "", now), - *newPromo(testNamespace, "c", "foo", "", now), - *newPromo(testNamespace, "a", "foo", "", before), - // bar stage. two are Running (possibly because of bad bookkeeping). - // one needs to be deduplicated. one promo is invalid - *newPromo(testNamespace, "x", "bar", "", before), - *newPromo(testNamespace, "x", "bar", "", before), - *newPromo(testNamespace, "y", "bar", kargoapi.PromotionPhaseRunning, now), - *newPromo(testNamespace, "z", "bar", "", after), - { - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: now, - Name: "w", - Namespace: testNamespace, - }, - }, - }, - } -) - -func newPromo(namespace, name, stage string, - phase kargoapi.PromotionPhase, - creationTimestamp metav1.Time, -) *kargoapi.Promotion { - return &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - CreationTimestamp: creationTimestamp, - Name: name, - Namespace: namespace, - }, - Spec: kargoapi.PromotionSpec{ - Stage: stage, - }, - Status: kargoapi.PromotionStatus{ - Phase: phase, - }, - } -} - -func TestInitializeQueues(t *testing.T) { - pqs := promoQueues{ - activePromoByStage: map[types.NamespacedName]string{}, - pendingPromoQueuesByStage: map[types.NamespacedName]runtime.PriorityQueue{}, - } - pqs.initializeQueues(context.Background(), testPromos) - - // foo stage checks - require.Equal(t, "", pqs.activePromoByStage[fooStageKey]) - require.Equal(t, 4, pqs.pendingPromoQueuesByStage[fooStageKey].Depth()) - require.Equal(t, "a", pqs.pendingPromoQueuesByStage[fooStageKey].Pop().GetName()) - require.Equal(t, "b", pqs.pendingPromoQueuesByStage[fooStageKey].Pop().GetName()) - require.Equal(t, "c", pqs.pendingPromoQueuesByStage[fooStageKey].Pop().GetName()) - require.Equal(t, "d", pqs.pendingPromoQueuesByStage[fooStageKey].Pop().GetName()) - require.Nil(t, pqs.pendingPromoQueuesByStage[fooStageKey].Pop()) - - // bar stage checks - require.Equal(t, "y", pqs.activePromoByStage[barStageKey]) - // We expect 2 instead of 4 (one was deduped, one went to activePromoByStage) - require.Equal(t, 2, pqs.pendingPromoQueuesByStage[barStageKey].Depth()) - require.Equal(t, "x", pqs.pendingPromoQueuesByStage[barStageKey].Pop().GetName()) - require.Equal(t, "z", pqs.pendingPromoQueuesByStage[barStageKey].Pop().GetName()) - require.Nil(t, pqs.pendingPromoQueuesByStage[barStageKey].Pop()) -} - -func TestNewPromotionsQueue(t *testing.T) { - // runtime.PriorityQueue is already tested pretty well, so what we mainly - // want to assert here is that our function for establishing relative priority - // is correct. - pq := newPriorityQueue() - - // The last added should be the first out if our priority logic is correct - now := time.Now() - for i := 0; i < 100; i++ { - added := pq.Push(&kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%d", i), - CreationTimestamp: metav1.NewTime( - now.Add(-1 * time.Duration(i) * time.Minute), - ), - }, - }) - require.True(t, added) - } - - // Verify objects are prioritized by creation time - var lastTime *time.Time - for { - object := pq.Pop() - if object == nil { - break - } - promo := object.(*kargoapi.Promotion) // nolint: forcetypeassert - if lastTime != nil { - require.Greater(t, promo.CreationTimestamp.Time, *lastTime) - } - lastTime = &promo.CreationTimestamp.Time - } -} - -func TestTryBegin(t *testing.T) { - pqs := promoQueues{ - activePromoByStage: map[types.NamespacedName]string{}, - pendingPromoQueuesByStage: map[types.NamespacedName]runtime.PriorityQueue{}, - } - pqs.initializeQueues(context.Background(), testPromos) - - ctx := context.TODO() - - // 1. nil promotion - require.False(t, pqs.tryBegin(ctx, nil)) - - // 2. invalid promotion - require.False(t, pqs.tryBegin(ctx, &kargoapi.Promotion{})) - - // 3. Try to begin promos not first in queue - for _, promoName := range []string{"b", "c", "d"} { - require.False(t, pqs.tryBegin(ctx, newPromo(testNamespace, promoName, "foo", "", now))) - require.Equal(t, "", pqs.activePromoByStage[fooStageKey]) - require.Equal(t, 4, pqs.pendingPromoQueuesByStage[fooStageKey].Depth()) - } - - // 4. Now try to begin highest priority. this should succeed - require.True(t, pqs.tryBegin(ctx, newPromo(testNamespace, "a", "foo", "", now))) - require.Equal(t, "a", pqs.activePromoByStage[fooStageKey]) - require.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth()) - - // 5. Begin an already active promo, this should be a no-op - require.True(t, pqs.tryBegin(ctx, newPromo(testNamespace, "a", "foo", "", now))) - require.Equal(t, "a", pqs.activePromoByStage[fooStageKey]) - require.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth()) - - // 5. Begin a promo with something else active, this should be a no-op - require.False(t, pqs.tryBegin(ctx, newPromo(testNamespace, "b", "foo", "", now))) - require.Equal(t, "a", pqs.activePromoByStage[fooStageKey]) - require.Equal(t, 3, pqs.pendingPromoQueuesByStage[fooStageKey].Depth()) -} - -func TestConclude(t *testing.T) { - pqs := promoQueues{ - activePromoByStage: map[types.NamespacedName]string{}, - pendingPromoQueuesByStage: map[types.NamespacedName]runtime.PriorityQueue{}, - } - pqs.initializeQueues(context.Background(), testPromos) - - ctx := context.TODO() - - // Test setup - require.True(t, pqs.tryBegin(ctx, newPromo(testNamespace, "a", "foo", "", now))) - - // 1. conclude something not even active. it should be a no-op - pqs.conclude(ctx, fooStageKey, "not-active") - require.Equal(t, "a", pqs.activePromoByStage[fooStageKey]) - - // 2. Conclude the active one - pqs.conclude(ctx, fooStageKey, "a") - require.Equal(t, "", pqs.activePromoByStage[fooStageKey]) - - // 3. Conclude the same key, should be a noop - pqs.conclude(ctx, fooStageKey, "a") - require.Equal(t, "", pqs.activePromoByStage[fooStageKey]) -} diff --git a/internal/controller/promotions/promotions.go b/internal/controller/promotions/promotions.go index 69726ec33..de719f124 100644 --- a/internal/controller/promotions/promotions.go +++ b/internal/controller/promotions/promotions.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "strconv" - "sync" "time" "github.com/kelseyhightower/envconfig" @@ -26,7 +25,6 @@ import ( kargoapi "github.com/akuity/kargo/api/v1alpha1" "github.com/akuity/kargo/internal/controller" argocd "github.com/akuity/kargo/internal/controller/argocd/api/v1alpha1" - "github.com/akuity/kargo/internal/controller/runtime" "github.com/akuity/kargo/internal/directives" "github.com/akuity/kargo/internal/indexer" "github.com/akuity/kargo/internal/kargo" @@ -64,9 +62,6 @@ type reconciler struct { recorder record.EventRecorder - pqs *promoQueues - initializeOnce sync.Once - // The following behaviors are overridable for testing purposes: getStageFn func( @@ -168,25 +163,6 @@ func SetupReconcilerWithManager( } } - // Watch Promotions that complete and enqueue the next highest promotion key - priorityQueueHandler := &EnqueueHighestPriorityPromotionHandler[*kargoapi.Promotion]{ - ctx: ctx, - logger: logger, - kargoClient: reconciler.kargoClient, - pqs: reconciler.pqs, - } - promoWentTerminal := kargo.NewPromoWentTerminalPredicate(logger) - if err = c.Watch( - source.Kind( - kargoMgr.GetCache(), - &kargoapi.Promotion{}, - priorityQueueHandler, - promoWentTerminal, - ), - ); err != nil { - return fmt.Errorf("unable to watch Promotions: %w", err) - } - return nil } @@ -196,16 +172,11 @@ func newReconciler( directivesEngine directives.Engine, cfg ReconcilerConfig, ) *reconciler { - pqs := promoQueues{ - activePromoByStage: map[types.NamespacedName]string{}, - pendingPromoQueuesByStage: map[types.NamespacedName]runtime.PriorityQueue{}, - } r := &reconciler{ kargoClient: kargoClient, directivesEngine: directivesEngine, recorder: recorder, cfg: cfg, - pqs: &pqs, } r.getStageFn = kargoapi.GetStage r.promoteFn = r.promote @@ -225,25 +196,6 @@ func (r *reconciler) Reconcile( ctx = logging.ContextWithLogger(ctx, logger) logger.Debug("reconciling Promotion") - // Note that initialization occurs here because we basically know that the - // controller runtime client's cache is ready at this point. We cannot attempt - // to list Promotions prior to that point. - var err error - r.initializeOnce.Do(func() { - promos := kargoapi.PromotionList{} - if err = r.kargoClient.List(ctx, &promos); err != nil { - err = fmt.Errorf("error listing promotions: %w", err) - } else { - r.pqs.initializeQueues(ctx, promos) - logger.Debug( - "initialized Stage-specific Promotion queues from list of existing Promotions", - ) - } - }) - if err != nil { - return ctrl.Result{}, fmt.Errorf("error initializing Promotion queues: %w", err) - } - // Find the Promotion promo, err := kargoapi.GetPromotion(ctx, r.kargoClient, req.NamespacedName) if err != nil { @@ -286,19 +238,13 @@ func (r *reconciler) Reconcile( } // If the Promotion does not have a Phase, it must be new and (initially) - // pending. Mark it as such, and confirm we are actually allowed to start - // in case multiple are queued. - if isPending := promo.Status.Phase == kargoapi.PromotionPhasePending; isPending || promo.Status.Phase == "" { - if !isPending { - err = kubeclient.PatchStatus(ctx, r.kargoClient, promo, func(status *kargoapi.PromotionStatus) { - status.Phase = kargoapi.PromotionPhasePending - }) + // pending. Mark it as such. + if promo.Status.Phase == "" { + if err = kubeclient.PatchStatus(ctx, r.kargoClient, promo, func(status *kargoapi.PromotionStatus) { + status.Phase = kargoapi.PromotionPhasePending + }); err != nil { return ctrl.Result{}, err } - - if !r.pqs.tryBegin(ctx, promo) { - return ctrl.Result{}, nil - } } // Retrieve the Stage associated with the Promotion. @@ -324,24 +270,13 @@ func (r *reconciler) Reconcile( } // Confirm that the Stage is awaiting this Promotion. - // - // This is a temporary measure to ensure that the Promotion is only - // allowed to proceed if the Stage is expecting it. This is necessary - // to ensure we can derive Freight from the previous Promotion in the - // Stage's status to construct the Freight collection for the current - // Promotion. - // - // TODO(hidde): This adds tight coupling between the Promotion and the - // Stage (again, but without patching the Stage this time). We should - // explore a more loosely-coupled approach, perhaps by making the - // Freight self-aware of the Stages it has been promoted to, or even - // more radically, by making the Promotion self-aware of the Freight - // collection it is promoting. + // This effectively prevents the Promotion from running until the Stage + // decides it is the next Promotion to run. if stage.Status.CurrentPromotion == nil || stage.Status.CurrentPromotion.Name != promo.Name { + // The watch on the Stage will requeue the Promotion if the Stage + // acknowledges it. logger.Debug("Stage is not awaiting Promotion", "stage", stage.Name, "promotion", promo.Name) - // Our watch will catch this and requeue the Promotion when the Stage - // acknowledges it. Which typically should be faster. - return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil + return ctrl.Result{}, nil } // Update promo status as Running to give visibility in UI. Also, a promo which diff --git a/internal/controller/promotions/promotions_test.go b/internal/controller/promotions/promotions_test.go index 2dbd2a8d7..69d310ebe 100644 --- a/internal/controller/promotions/promotions_test.go +++ b/internal/controller/promotions/promotions_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -20,6 +21,11 @@ import ( fakeevent "github.com/akuity/kargo/internal/kubernetes/event/fake" ) +var ( + now = metav1.Now() + before = metav1.Time{Time: now.Add(time.Second * -1)} +) + func TestNewPromotionReconciler(t *testing.T) { kubeClient := fake.NewClientBuilder().Build() r := newReconciler( @@ -31,7 +37,6 @@ func TestNewPromotionReconciler(t *testing.T) { require.NotNil(t, r.kargoClient) require.NotNil(t, r.recorder) require.NotNil(t, r.directivesEngine) - require.NotNil(t, r.pqs.pendingPromoQueuesByStage) require.NotNil(t, r.getStageFn) require.NotNil(t, r.promoteFn) } @@ -132,6 +137,17 @@ func TestReconcile(t *testing.T) { promoToReconcile: &types.NamespacedName{Namespace: "fake-namespace", Name: "fake-promo2"}, expectedPhase: kargoapi.PromotionPhasePending, promos: []client.Object{ + &kargoapi.Stage{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fake-stage", + Namespace: "fake-namespace", + }, + Status: kargoapi.StageStatus{ + CurrentPromotion: &kargoapi.PromotionReference{ + Name: "other-fake-promo1", + }, + }, + }, newPromo("fake-namespace", "fake-promo1", "fake-stage", kargoapi.PromotionPhasePending, before), newPromo("fake-namespace", "fake-promo2", "fake-stage", "", now), // intentionally empty string phase }, @@ -447,23 +463,22 @@ func Test_reconciler_terminatePromotion(t *testing.T) { } } -// Tests that initializeQueues is called properly -func TestReconcileInitializeQueues(t *testing.T) { - ctx := context.TODO() - promos := []client.Object{ - newPromo("fake-namespace", "fake-promo1", "fake-stage", kargoapi.PromotionPhasePending, before), - newPromo("fake-namespace", "fake-promo2", "fake-stage", kargoapi.PromotionPhasePending, now), +// nolint: unparam +func newPromo(namespace, name, stage string, + phase kargoapi.PromotionPhase, + creationTimestamp metav1.Time, +) *kargoapi.Promotion { + return &kargoapi.Promotion{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: creationTimestamp, + Name: name, + Namespace: namespace, + }, + Spec: kargoapi.PromotionSpec{ + Stage: stage, + }, + Status: kargoapi.PromotionStatus{ + Phase: phase, + }, } - recorder := &fakeevent.EventRecorder{} - r := newFakeReconciler(t, recorder, promos...) - - // reconcile a non-existent promo to trigger initializeQueues - req := ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "does-not-exist", Name: "does-not-exist"}} - - _, err := r.Reconcile(ctx, req) - require.NoError(t, err) - - // Verifies queues got set up - stageKey := types.NamespacedName{Namespace: "fake-namespace", Name: "fake-stage"} - require.Equal(t, 2, r.pqs.pendingPromoQueuesByStage[stageKey].Depth()) } diff --git a/internal/controller/promotions/watches.go b/internal/controller/promotions/watches.go index ce7df09d7..4eddca862 100644 --- a/internal/controller/promotions/watches.go +++ b/internal/controller/promotions/watches.go @@ -18,138 +18,6 @@ import ( "github.com/akuity/kargo/internal/logging" ) -// EnqueueHighestPriorityPromotionHandler is an event handler that enqueues the next -// highest priority Promotion for reconciliation when an active Promotion becomes terminal -type EnqueueHighestPriorityPromotionHandler[T any] struct { - logger *logging.Logger - ctx context.Context - pqs *promoQueues - kargoClient client.Client -} - -// Create implements TypedEventHandler. -func (e *EnqueueHighestPriorityPromotionHandler[T]) Create( - context.Context, - event.TypedCreateEvent[T], - workqueue.TypedRateLimitingInterface[reconcile.Request], -) { - // No-op -} - -// Delete implements TypedEventHandler. In case a Running promotion -// becomes deleted, we should enqueue the next one -func (e *EnqueueHighestPriorityPromotionHandler[T]) Delete( - _ context.Context, - evt event.TypedDeleteEvent[T], - wq workqueue.TypedRateLimitingInterface[reconcile.Request], -) { - if promo, ok := any(evt.Object).(*kargoapi.Promotion); ok { - stageKey := types.NamespacedName{ - Namespace: promo.Namespace, - Name: promo.Spec.Stage, - } - e.pqs.conclude(e.ctx, stageKey, promo.Name) - e.enqueueNext(stageKey, wq) - } -} - -// Generic implements TypedEventHandler. -func (e *EnqueueHighestPriorityPromotionHandler[T]) Generic( - context.Context, - event.TypedGenericEvent[T], - workqueue.TypedRateLimitingInterface[reconcile.Request], -) { - // No-op -} - -// Update implements TypedEventHandler. This should only be called with -// a promo that transitioned from non-terminal to terminal. -func (e *EnqueueHighestPriorityPromotionHandler[T]) Update( - _ context.Context, - evt event.TypedUpdateEvent[T], - wq workqueue.TypedRateLimitingInterface[reconcile.Request], -) { - promo := any(evt.ObjectNew).(*kargoapi.Promotion) // nolint: forcetypeassert - if promo == nil { - e.logger.Error( - nil, "Update event has no new object to update", - "event", evt, - ) - return - } - if promo.Status.Phase.IsTerminal() { - stageKey := types.NamespacedName{ - Namespace: promo.Namespace, - Name: promo.Spec.Stage, - } - // This promo just went terminal. Deactivate it and enqueue - // the next highest priority promo for reconciliation - e.pqs.conclude(e.ctx, stageKey, promo.Name) - e.enqueueNext(stageKey, wq) - } -} - -// enqueueNext enqueues the next highest priority promotion for reconciliation to the workqueue. -// Also discards pending promotions in the queue that no longer exist -func (e *EnqueueHighestPriorityPromotionHandler[T]) enqueueNext( - stageKey types.NamespacedName, - wq workqueue.TypedRateLimitingInterface[reconcile.Request], -) { - e.pqs.promoQueuesByStageMu.RLock() - defer e.pqs.promoQueuesByStageMu.RUnlock() - if e.pqs.activePromoByStage[stageKey] != "" { - // there's already an active promotion. don't need to enqueue the next one - return - } - pq, ok := e.pqs.pendingPromoQueuesByStage[stageKey] - if !ok { - return - } - - // NOTE: at first glance, this for loop appears to be expensive to do while holding - // the pqs mutex. But it isn't as bad as it looks, since we count on the fact that - // GetPromotion calls pull from the informer cache and do not involve an HTTP call. - // and in the common case, we only do a single iteration - for { - first := pq.Peek() - if first == nil { - // pending queue is empty - return - } - // Check if promo exists, and enqueue it if it does - firstKey := types.NamespacedName{Namespace: first.GetNamespace(), Name: first.GetName()} - promo, err := kargoapi.GetPromotion(e.ctx, e.kargoClient, firstKey) - if err != nil { - e.logger.Error( - err, "Failed to get next highest priority Promotion for enqueue", - "firstKey", firstKey, - ) - return - } - if promo == nil || promo.Status.Phase.IsTerminal() { - // Found a promotion in the pending queue that no longer exists - // or terminal. Pop it and loop to the next item in the queue - _ = pq.Pop() - continue - } - wq.AddRateLimited( - reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: promo.Namespace, - Name: promo.Name, - }, - }, - ) - e.logger.Debug( - "enqueued promo", - "promotion", promo.Name, - "namespace", promo.Namespace, - "stage", promo.Spec.Stage, - ) - return - } -} - // UpdatedArgoCDAppHandler is an event handler that enqueues Promotions for // reconciliation when an associated ArgoCD Application is updated. type UpdatedArgoCDAppHandler[T any] struct { diff --git a/internal/controller/runtime/queues.go b/internal/controller/runtime/queues.go deleted file mode 100644 index f8b7dae3a..000000000 --- a/internal/controller/runtime/queues.go +++ /dev/null @@ -1,177 +0,0 @@ -package runtime - -import ( - "container/heap" - "errors" - "sync" - - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// PriorityQueue is an interface for any priority queue containing -// runtime.Objects. -type PriorityQueue interface { - // Push adds a copy of the provided client.Object to the priority queue. - // Returns true if the item was added to the queue, false if it already existed - // Pushing of nil objects have no effect - Push(client.Object) bool - // Pop removes the highest priority client.Object from the priority queue and - // returns it. Implementations MUST return nil if the priority queue is empty. - Pop() client.Object - // Peek returns the highest priority client.Object from the priority queue - // without removing it. - Peek() client.Object - // Depth returns the depth of the PriorityQueue. - Depth() int -} - -// ObjectCompareFn is the signature for any function that can compare the -// relative priorities of two client.Objects. Implementations MUST return true -// when the first argument is of higher priority than the second and MUST return -// false otherwise. Implementors of such functions may safely assume that -// neither argument can ever be nil. -type ObjectPriorityFn func(client.Object, client.Object) bool - -// priorityQueue is an implementation of the PriorityQueue interface. This -// encapsulates the low-level details of the priority queue implementation found -// at https://pkg.go.dev/container/heap and is also safe for concurrent use by -// multiple goroutines. -type priorityQueue struct { - // internalQueue is priorityQueue's underlying data structure. It implements - // heap.Interface. - internalQueue *internalPriorityQueue - - // objectsByNamespaceName is used to deduplicate pushes of the same object - // to the queue, allowing Push() to be idempotent - objectsByNamespaceName map[types.NamespacedName]bool - - // mu is a mutex used to ensure only a single goroutine is executing critical - // sections of code at any time. - mu sync.RWMutex -} - -// NewPriorityQueue takes a function for comparing the relative priority of two -// client.Objects (which MUST return true when the first argument is of higher -// priority than the second and MUST return false otherwise) and, optionally, -// any number of client.Objects (which do NOT needs to be pre-ordered) and -// returns an implementation of the PriorityQueue interface that is safe for -// concurrent use by multiple goroutines. This function will also return an -// error if initialized with a nil comparison function or any nil -// runtime.Objects. -func NewPriorityQueue( - higherFn ObjectPriorityFn, - objects ...client.Object, -) (PriorityQueue, error) { - if higherFn == nil { - return nil, errors.New( - "the priority queue was initialized with a nil client.Object " + - "comparison function", - ) - } - filteredObjs := []client.Object{} - objectsByNamespaceName := make(map[types.NamespacedName]bool) - // filter out duplicates and nils - for i, object := range objects { - if object == nil { - continue - } - key := types.NamespacedName{ - Namespace: object.GetNamespace(), - Name: object.GetName(), - } - if objectsByNamespaceName[key] { - continue - } - objectsByNamespaceName[key] = true - filteredObjs = append(filteredObjs, objects[i]) - } - internalQueue := &internalPriorityQueue{ - objects: filteredObjs, - higherFn: higherFn, - } - heap.Init(internalQueue) - return &priorityQueue{ - objectsByNamespaceName: objectsByNamespaceName, - internalQueue: internalQueue, - }, nil -} - -func (p *priorityQueue) Push(item client.Object) bool { - p.mu.Lock() - defer p.mu.Unlock() - if item == nil { - return false - } - key := types.NamespacedName{ - Namespace: item.GetNamespace(), - Name: item.GetName(), - } - if p.objectsByNamespaceName[key] { - return false - } - heap.Push(p.internalQueue, item.DeepCopyObject()) - p.objectsByNamespaceName[key] = true - return true -} - -func (p *priorityQueue) Pop() client.Object { - p.mu.Lock() - defer p.mu.Unlock() - if p.internalQueue.Len() == 0 { - return nil - } - obj := heap.Pop(p.internalQueue).(client.Object) // nolint: forcetypeassert - key := types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - } - delete(p.objectsByNamespaceName, key) - return obj -} - -func (p *priorityQueue) Peek() client.Object { - p.mu.RLock() - defer p.mu.RUnlock() - n := len(p.internalQueue.objects) - if n == 0 { - return nil - } - return p.internalQueue.objects[0] -} - -func (p *priorityQueue) Depth() int { - p.mu.RLock() - defer p.mu.RUnlock() - return p.internalQueue.Len() -} - -// internalPriorityQueue is the underlying data structure for priorityQueue. It -// implements heap.Interface, which allows priorityQueue to offload ordering of -// its client.Objects by priority to the heap package. -type internalPriorityQueue struct { - objects []client.Object - higherFn ObjectPriorityFn -} - -func (i *internalPriorityQueue) Len() int { return len(i.objects) } - -func (i *internalPriorityQueue) Less(n, m int) bool { - return i.higherFn(i.objects[n], i.objects[m]) -} - -func (i *internalPriorityQueue) Swap(n, m int) { - i.objects[n], i.objects[m] = i.objects[m], i.objects[n] -} - -func (i *internalPriorityQueue) Push(item any) { - i.objects = append(i.objects, item.(client.Object)) // nolint: forcetypeassert -} - -func (i *internalPriorityQueue) Pop() any { - n := len(i.objects) - item := i.objects[n-1] - i.objects[n-1] = nil // avoid memory leak - i.objects = i.objects[:n-1] - return item -} diff --git a/internal/controller/runtime/queues_test.go b/internal/controller/runtime/queues_test.go deleted file mode 100644 index 51a92b2f6..000000000 --- a/internal/controller/runtime/queues_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package runtime - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - - kargoapi "github.com/akuity/kargo/api/v1alpha1" -) - -func TestPriorityQueue(t *testing.T) { - _, err := NewPriorityQueue(nil) - require.Error(t, err) - require.Equal( - t, - "the priority queue was initialized with a nil client.Object "+ - "comparison function", - err.Error(), - ) - - objects := make([]client.Object, 50) - for i := range objects { - objects[i] = &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - // UUIDs contain enough randomness that we know we're not accidentally - // creating a list that's already ordered by name. - Name: uuid.New().String(), - }, - } - } - objects[0] = nil // This will be ignored - - pq, err := NewPriorityQueue( - func(client.Object, client.Object) bool { - return true // Implementation doesn't matter for this test - }, - objects..., - ) - require.NoError(t, err) - require.Equal(t, 49, pq.Depth()) // make sure the nil object was not added - - objects = objects[1:] // Remove the problematic 0 element - - pq, err = NewPriorityQueue( - func(lhs client.Object, rhs client.Object) bool { - // lhs has higher priority than rhs if lexically less than rhs - return lhs.GetName() < rhs.GetName() - }, - objects..., - ) - require.NoError(t, err) - - // Now push a bunch... - for i := 0; i < 50; i++ { - added := pq.Push( - &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - Name: uuid.New().String(), - }, - }, - ) - require.True(t, added) - } - - // Now pop until we get a nil - objects = nil - for { - object := pq.Pop() - if object == nil { - break - } - objects = append(objects, object) - } - - // Verify objects are ordered lexically by object name - var lastName string - for _, object := range objects { - if lastName != "" { - require.GreaterOrEqual(t, object.GetName(), lastName) - } - lastName = object.GetName() - } -} - -func TestPeek(t *testing.T) { - objects := []client.Object{ - &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "bbb", - Namespace: "default", - }, - }, - &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "aaa", - Namespace: "default", - }, - }, - &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "ccc", - Namespace: "default", - }, - }, - } - pq, err := NewPriorityQueue( - func(lhs client.Object, rhs client.Object) bool { - return lhs.GetName() < rhs.GetName() - }, - objects..., - ) - require.NoError(t, err) - require.Equal(t, 3, pq.Depth()) - - require.Equal(t, "aaa", pq.Peek().GetName()) - require.Equal(t, "aaa", pq.Peek().GetName()) - require.Equal(t, "aaa", pq.Pop().GetName()) - require.Equal(t, 2, pq.Depth()) - - require.Equal(t, "bbb", pq.Peek().GetName()) - require.Equal(t, "bbb", pq.Pop().GetName()) - require.Equal(t, 1, pq.Depth()) - - require.Equal(t, "ccc", pq.Peek().GetName()) - require.Equal(t, "ccc", pq.Pop().GetName()) - require.Equal(t, 0, pq.Depth()) - require.Nil(t, pq.Pop()) -} - -// TestDuplicatePush verifies when we push the same object, second one is a no-op -func TestDuplicatePush(t *testing.T) { - obj1 := &kargoapi.Promotion{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", - }, - } - obj2 := obj1.DeepCopy() - pq, err := NewPriorityQueue( - func(lhs client.Object, rhs client.Object) bool { - return lhs.GetName() < rhs.GetName() - }, - ) - require.NoError(t, err) - - require.Equal(t, 0, pq.Depth()) - require.True(t, pq.Push(obj1)) - require.Equal(t, 1, pq.Depth()) - require.False(t, pq.Push(obj2)) - require.Equal(t, 1, pq.Depth()) - - require.Equal(t, "foo", pq.Pop().GetName()) - require.Equal(t, 0, pq.Depth()) - require.Nil(t, pq.Pop()) -}