Skip to content

Commit

Permalink
pillar/deferred: properly merge deferred requests
Browse files Browse the repository at this point in the history
This fixes the growing deferred queue issue caused by missing
request merges. Commit 9f2c9ad ("zedcloud/deferred: don't
take lock while processing the deferred queue") relaxes locks
and makes it possible to add new requests while queue processing
waits in send. New requests should land in the same `deferredItems`
list, which should be processed later at the end of the
`handleDeferred()` operation. The bug lies in the actual merging
of two queues: the one that has not been completed due to a possible
error and the other queue, which was populated with new requests
during the send.

Proper queue merging is not just concatenation of queues (which
would cause the resulting queue to always grow), but involves item
replacement by key, so the queue size stays the same, with the item
being replaced.

Signed-off-by: Roman Penyaev <r.peniaev@gmail.com>
Fixes: 9f2c9ad ("zedcloud/deferred: don't take lock while processing the deferred queue")
Reported-by: Milan Lenco <milan@zededa.com>
  • Loading branch information
rouming committed Jul 1, 2024
1 parent 0184977 commit 582d40f
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions pkg/pillar/zedcloud/deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ func (ctx *DeferredContext) processQueueTask(ps *pubsub.PubSub,
}
}

// mergeQueuesNoLock merges requests which were not sent (argument)
// with incoming requests, accumulated in the `ctx.deferredItems`.
// Context: `ctx.lock` held.
func (ctx *DeferredContext) mergeQueuesNoLock(notSentReqs []*deferredItem) {
if len(ctx.deferredItems) > 0 {
// During the send new items land into the `ctx.deferredItems`
// queue, which keys can exist in the `notSentReqs` queue.
// Traverse requests which were not sent, find items with same
// keys in the `ctx.deferredItems` and replace item in the
// `notSentReqs`.
for i, oldItem := range notSentReqs {
for j, newItem := range ctx.deferredItems {
if oldItem.key == newItem.key {
// Replace item in head
notSentReqs[i] = newItem
// Remove from tail
ctx.deferredItems =
append(ctx.deferredItems[:j], ctx.deferredItems[j+1:]...)
break
}
}
}
}
// Merge the rest adding new items to the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
}

// handleDeferred try to send all deferred items
func (ctx *DeferredContext) handleDeferred() bool {
ctx.lock.Lock()
Expand Down Expand Up @@ -236,8 +263,7 @@ func (ctx *DeferredContext) handleDeferred() bool {
}

ctx.lock.Lock()
// Merge with the incoming requests, recently added are in the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
ctx.mergeQueuesNoLock(notSentReqs)
if len(ctx.deferredItems) == 0 {
stopTimer(log, ctx)
}
Expand Down

0 comments on commit 582d40f

Please sign in to comment.