diff --git a/runtime_scan/pkg/orchestrator/common/queue.go b/runtime_scan/pkg/orchestrator/common/queue.go index 3df0f93fd..35e29fcf4 100644 --- a/runtime_scan/pkg/orchestrator/common/queue.go +++ b/runtime_scan/pkg/orchestrator/common/queue.go @@ -19,15 +19,18 @@ import ( "context" "fmt" "sync" + "time" ) type Enqueuer[T comparable] interface { Enqueue(item T) + EnqueueAfter(item T, d time.Duration) } type Dequeuer[T comparable] interface { Dequeue(ctx context.Context) (T, error) - Done(T) + Done(item T) + RequeueAfter(item T, d time.Duration) } type Queue[T comparable] struct { @@ -54,6 +57,11 @@ type Queue[T comparable] struct { // in the queue slice. processing map[T]struct{} + // A map to track items which have been scheduled to be queued at a + // later date. We keep track of these items to prevent them being + // enqueued earlier than their scheduled time through Enqueue. + waitingForEnqueue map[T]struct{} + // A mutex lock which protects the queue from simultaneous reads and // writes ensuring the queue can be used by multiple go routines safely. l sync.Mutex @@ -61,10 +69,11 @@ type Queue[T comparable] struct { func NewQueue[T comparable]() *Queue[T] { return &Queue[T]{ - itemAdded: make(chan struct{}), - queue: make([]T, 0), - inqueue: map[T]struct{}{}, - processing: map[T]struct{}{}, + itemAdded: make(chan struct{}), + queue: make([]T, 0), + inqueue: map[T]struct{}{}, + processing: map[T]struct{}{}, + waitingForEnqueue: map[T]struct{}{}, } } @@ -108,9 +117,49 @@ func (q *Queue[T]) Enqueue(item T) { q.l.Lock() defer q.l.Unlock() + q.enqueue(item) +} + +// EnqueueAfter will tell the queue to keep track of the item preventing it +// being enqueued before the specified duration through Enqueue. It will then +// Enqueue it after a defined duration. If item already in the queue, is +// processing or waiting to be enqueued, nothing will be done. +func (q *Queue[T]) EnqueueAfter(item T, d time.Duration) { + q.l.Lock() + defer q.l.Unlock() + + q.enqueueAfter(item, d) +} + +// Internal enqueueAfter function that it can be reused by the public +// EnqueueAfter and public RequeueAfter functions, these should not be called +// without obtaining a lock on Queue first. +func (q *Queue[T]) enqueueAfter(item T, d time.Duration) { + _, inQueue := q.inqueue[item] + _, isProcessing := q.processing[item] + _, isWaitingForEnqueue := q.waitingForEnqueue[item] + if inQueue || isProcessing || isWaitingForEnqueue { + // item is already known by the queue so there is nothing to do + return + } + + q.waitingForEnqueue[item] = struct{}{} + go func() { + <-time.After(d) + q.l.Lock() + defer q.l.Unlock() + delete(q.waitingForEnqueue, item) + q.enqueue(item) + }() +} + +// Internal enqueue function that it can be reused by public functions Enqueue +// and EnqueueAfter. +func (q *Queue[T]) enqueue(item T) { _, inQueue := q.inqueue[item] _, isProcessing := q.processing[item] - if !inQueue && !isProcessing { + _, isWaitingForEnqueue := q.waitingForEnqueue[item] + if !inQueue && !isProcessing && !isWaitingForEnqueue { q.queue = append(q.queue, item) q.inqueue[item] = struct{}{} @@ -140,7 +189,8 @@ func (q *Queue[T]) Has(item T) bool { _, inQueue := q.inqueue[item] _, isProcessing := q.processing[item] - return inQueue || isProcessing + _, isWaitingForEnqueue := q.waitingForEnqueue[item] + return inQueue || isProcessing || isWaitingForEnqueue } // Done will tell the queue that the processing for the item is completed and @@ -151,6 +201,18 @@ func (q *Queue[T]) Done(item T) { delete(q.processing, item) } +// RequeueAfter will mark a processing item as Done and then schedule it to be +// requeued after the specified duration. This should be used instead of +// calling Done and then EnqueueAfter to prevent someone Enqueuing the same +// item between Done and EnqueueAfter. +func (q *Queue[T]) RequeueAfter(item T, d time.Duration) { + q.l.Lock() + defer q.l.Unlock() + + delete(q.processing, item) + q.enqueueAfter(item, d) +} + // Returns the number of items currently being actively processed. func (q *Queue[T]) ProcessingCount() int { q.l.Lock() diff --git a/runtime_scan/pkg/orchestrator/common/reconciler.go b/runtime_scan/pkg/orchestrator/common/reconciler.go index d2744f0f6..bc5868ee7 100644 --- a/runtime_scan/pkg/orchestrator/common/reconciler.go +++ b/runtime_scan/pkg/orchestrator/common/reconciler.go @@ -17,11 +17,29 @@ package common import ( "context" + "errors" + "fmt" "time" log "github.com/sirupsen/logrus" ) +type RequeueAfterError struct { + d time.Duration + msg string +} + +func (rae RequeueAfterError) Error() string { + if rae.msg != "" { + return fmt.Sprintf("%v so requeuing after %v", rae.msg, rae.d) + } + return fmt.Sprintf("requeuing after %v", rae.d) +} + +func NewRequeueAfterError(d time.Duration, msg string) error { + return RequeueAfterError{d, msg} +} + type Reconciler[T comparable] struct { Logger *log.Entry @@ -50,8 +68,21 @@ func (r *Reconciler[T]) Start(ctx context.Context) { if err != nil { r.Logger.Errorf("Failed to reconcile item: %v", err) } + + // Make sure timeout context is canceled to + // prevent orphaned resources cancel() - r.Queue.Done(item) + + // If reconcile has requested that we requeue the item + // by returning a RequeueAfterError then requeue the + // item with the duration specified, otherwise mark the + // item as Done. + var requeueAfterError RequeueAfterError + if errors.As(err, &requeueAfterError) { + r.Queue.RequeueAfter(item, requeueAfterError.d) + } else { + r.Queue.Done(item) + } } // Check if the parent context done if so we also need