Skip to content

Commit

Permalink
Do not consider async ops that finalize *during* reconciliation as done
Browse files Browse the repository at this point in the history
Async ops that finalize *during* reconciliation are left to be processed
in the next reconciliation run.
This prevents race-conditions and simplifies the reconciliation algorithm.

Signed-off-by: Milan Lenco <milan@zededa.com>
  • Loading branch information
milan-zededa committed Jul 3, 2023
1 parent 00d3091 commit ea56afb
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions libs/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ func (r *reconciler) syncDeletedSubgraphs(
// reconcileItems reconciles the state of items.
// Intended state can be nil, meaning that all items from the current state should be removed.
// Create/Modify/Delete operations are performed in two stages:
// 1. First all Delete + Modify operations are executed (incl. the first half of the Recreate).
// 2. Next all (Re)Create operations are carried out.
// 1. First all Delete + Modify operations are executed (incl. the first half of the Recreate).
// 2. Next all (Re)Create operations are carried out.
//
// In both cases, the items are traversed using DFS and the operations are executed
// in the forward or reverse topological order with respect to the dependencies.
// In the first stage, Delete/Modify operations are run in the DFS post-order, while
Expand All @@ -213,6 +214,10 @@ func (r *reconciler) reconcileItems(ctx context.Context, asyncManager *asyncMana
for _, itemRef := range currentState.DiffItems(intendedState) {
stage1Stack.push(stackElem{itemRef: itemRef})
}
// Do not consider async ops that finalize *during* reconciliation as done.
// Let them be processed in the next reconciliation run.
// This prevents race-conditions and simplifies the reconciliation algorithm.
asyncEndLimit := time.Now()
for _, asyncOp := range asyncManager.listAllOps() {
if _, _, _, found := r.getItem(currentState, asyncOp.params.itemRef); found {
if asyncOp.status.done || asyncOp.status.cancelTimeout() {
Expand Down Expand Up @@ -340,7 +345,7 @@ func (r *reconciler) reconcileItems(ctx context.Context, asyncManager *asyncMana
// Check if there is an asynchronous operation still running for this item.
inProgress, asyncDeleted, err := r.checkAsyncOp(
currentFullState, intendedFullState,
itemRef, asyncManager, failed, stage2Stack, status)
itemRef, asyncManager, failed, stage2Stack, status, asyncEndLimit)
if err != nil {
globalErr = err
continue
Expand Down Expand Up @@ -761,7 +766,8 @@ func (r *reconciler) runOperation(ctx context.Context, graphName string,
// Function can also post-process and log completed async operation.
func (r *reconciler) checkAsyncOp(currentFullState dg.Graph, intendedFullState dg.GraphR,
itemRef dg.ItemRef, asyncManager *asyncManager, failed map[dg.ItemRef]struct{},
stage2Stack *stack, status *Status) (asyncInProgress, deleted bool, err error) {
stage2Stack *stack, status *Status, endLimit time.Time) (
asyncInProgress, deleted bool, err error) {

// Check if async operation is/was running.
item, stateData, path, found := r.getItem(currentFullState, itemRef)
Expand All @@ -780,6 +786,11 @@ func (r *reconciler) checkAsyncOp(currentFullState dg.Graph, intendedFullState d
// still running
return true, false, nil
}
if asyncOp.status.endTime.After(endLimit) {
// Finalized during reconciliation.
// Process it in the next reconciliation run instead to avoid race conditions.
return true, false, nil
}

// Async operation has finalized.
opErr := asyncOp.status.err
Expand Down

0 comments on commit ea56afb

Please sign in to comment.