Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10.4 stable] Prevent deferred queue from growing #4030

Merged
merged 2 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/pillar/cmd/zedagent/zedagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func queueInfoToDest(ctx *zedagentContext, dest destinationBitset,
devUUID, "info")
// Ignore errors for all the LOC info messages
const ignoreErr = true
zedcloudCtx.DeferredPeriodicCtx.SetDeferred(key, buf, size, url,
zedcloudCtx.DeferredLOCPeriodicCtx.SetDeferred(key, buf, size, url,
bailOnHTTPErr, withNetTracing, ignoreErr, itemType)
}
}
Expand Down Expand Up @@ -426,6 +426,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
zedcloudCtx.DeferredPeriodicCtx = zedcloud.CreateDeferredCtx(zedcloudCtx,
zedagentCtx.ps, agentName, "DeferredPeriodic",
warningTime, errorTime, nil)
zedcloudCtx.DeferredLOCPeriodicCtx = zedcloud.CreateDeferredCtx(zedcloudCtx,
zedagentCtx.ps, agentName, "DeferredLOCPeriodic",
warningTime, errorTime, nil)
// XXX defer this until we have some config from cloud or saved copy
getconfigCtx.pubAppInstanceConfig.SignalRestarted()

Expand Down
30 changes: 28 additions & 2 deletions pkg/pillar/zedcloud/deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ func (ctx *DeferredContext) processQueueTask(ps *pubsub.PubSub,
}
}

// mergeQueuesNoLock merges requests which were not sent (argument)
// with incoming requests, accumulated in the `ctx.deferredItems`.
// Context: `ctx.lock` held.
func (ctx *DeferredContext) mergeQueuesNoLock(notSentReqs []*deferredItem) {
if len(ctx.deferredItems) > 0 {
// During the send new items land into the `ctx.deferredItems`
// queue, which keys can exist in the `notSentReqs` queue.
// Traverse requests which were not sent, find items with same
// keys in the `ctx.deferredItems` and replace item in the
// `notSentReqs`.
for i, oldItem := range notSentReqs {
for j, newItem := range ctx.deferredItems {
if oldItem.key == newItem.key {
// Replace item in head
notSentReqs[i] = newItem
// Remove from tail
ctx.deferredItems =
append(ctx.deferredItems[:j], ctx.deferredItems[j+1:]...)
break
}
}
}
}
// Merge the rest adding new items to the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
}

// handleDeferred try to send all deferred items
func (ctx *DeferredContext) handleDeferred() bool {
ctx.lock.Lock()
Expand Down Expand Up @@ -236,8 +263,7 @@ func (ctx *DeferredContext) handleDeferred() bool {
}

ctx.lock.Lock()
// Merge with the incoming requests, recently added are in the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
ctx.mergeQueuesNoLock(notSentReqs)
if len(ctx.deferredItems) == 0 {
stopTimer(log, ctx)
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/pillar/zedcloud/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,20 @@ type ZedCloudContext struct {
serverSigningCertHash []byte
onBoardCertBytes []byte
log *base.LogObject
// All HTTP requests which can't be dropped and send should be
// repeated in case of a transmission error are added to this
// queue.
// All controller HTTP requests which can't be dropped and send
// should be repeated in case of a transmission error are added to
// this queue.
DeferredEventCtx *DeferredContext
// All periodic HTTP requests are added to this queue, sending
// errors of which can be ignored. This means even the request has
// failed, it will be removed from the queue, so there is no need
// to `kick` this queue once connectivity has restored.
// All periodic controller HTTP requests are added to this queue,
// sending errors of which can be ignored. This means even the
// request has failed, it will be removed from the queue, so there
// is no need to `kick` this queue once connectivity has restored.
DeferredPeriodicCtx *DeferredContext
// All periodic LOC HTTP requests are added to this queue,
// sending errors of which can be ignored. This means even the
// request has failed, it will be removed from the queue, so there
// is no need to `kick` this queue once connectivity has restored.
DeferredLOCPeriodicCtx *DeferredContext
}

// ContextOptions - options to be passed at NewContext
Expand Down
Loading