Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent deferred queue from growing #4017

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/pillar/cmd/zedagent/zedagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func queueInfoToDest(ctx *zedagentContext, dest destinationBitset,
devUUID, "info")
// Ignore errors for all the LOC info messages
const ignoreErr = true
zedcloudCtx.DeferredPeriodicCtx.SetDeferred(key, buf, size, url,
zedcloudCtx.DeferredLOCPeriodicCtx.SetDeferred(key, buf, size, url,
bailOnHTTPErr, withNetTracing, ignoreErr, itemType)
}
}
Expand Down Expand Up @@ -437,6 +437,9 @@ func Run(ps *pubsub.PubSub, loggerArg *logrus.Logger, logArg *base.LogObject, ar
zedcloudCtx.DeferredPeriodicCtx = zedcloud.CreateDeferredCtx(zedcloudCtx,
zedagentCtx.ps, agentName, "DeferredPeriodic",
warningTime, errorTime, nil)
zedcloudCtx.DeferredLOCPeriodicCtx = zedcloud.CreateDeferredCtx(zedcloudCtx,
zedagentCtx.ps, agentName, "DeferredLOCPeriodic",
warningTime, errorTime, nil)
// XXX defer this until we have some config from cloud or saved copy
getconfigCtx.pubAppInstanceConfig.SignalRestarted()

Expand Down
45 changes: 35 additions & 10 deletions pkg/pillar/zedcloud/deferred.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ func (ctx *DeferredContext) processQueueTask(ps *pubsub.PubSub,
}
}

// mergeQueuesNoLock merges requests which were not sent (argument)
// with incoming requests, accumulated in the `ctx.deferredItems`.
// Context: `ctx.deferredItemsLock` held.
func (ctx *DeferredContext) mergeQueuesNoLock(notSentReqs []*deferredItem) {
if len(ctx.deferredItems) > 0 {
// During the send new items land into the `ctx.deferredItems`
// queue, which keys can exist in the `notSentReqs` queue.
// Traverse requests which were not sent, find items with same
// keys in the `ctx.deferredItems` and replace item in the
// `notSentReqs`.
for i, oldItem := range notSentReqs {
for j, newItem := range ctx.deferredItems {
if oldItem.key == newItem.key {
// Replace item in head
notSentReqs[i] = newItem
// Remove from tail
ctx.deferredItems =
append(ctx.deferredItems[:j], ctx.deferredItems[j+1:]...)
break
eriknordmark marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
// Merge the rest adding new items to the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
}

// handleDeferred try to send all deferred items
func (ctx *DeferredContext) handleDeferred() bool {
ctx.deferredItemsLock.Lock()
Expand Down Expand Up @@ -236,8 +263,7 @@ func (ctx *DeferredContext) handleDeferred() bool {
}

ctx.deferredItemsLock.Lock()
// Merge with the incoming requests, recently added are in the tail
ctx.deferredItems = append(notSentReqs, ctx.deferredItems...)
ctx.mergeQueuesNoLock(notSentReqs)
if len(ctx.deferredItems) == 0 {
stopTimer(log, ctx)
}
Expand All @@ -249,13 +275,12 @@ func (ctx *DeferredContext) handleDeferred() bool {
}

// SetDeferred sets or replaces any item for the specified key and
// starts the timer. Key and url are used for identifying the
// channel. Please note that for deviceUUID key is used for attestUrl,
// which is not the same for other Urls, where in other case, the key
// is very specific for the object. If @ignoreErr is true the queue
// processing is not stopped on any error and will continue, although
// all errors will be passed to @sentHandler callback (see the
// CreateDeferredCtx()).
// starts the timer. Key is used for identifying the channel. Please
// note that for deviceUUID key is used for attestUrl, which is not the
// same for other Urls, where in other case, the key is very specific
// for the object. If @ignoreErr is true the queue processing is not
// stopped on any error and will continue, although all errors will be
// passed to @sentHandler callback (see the CreateDeferredCtx()).
func (ctx *DeferredContext) SetDeferred(
key string, buf *bytes.Buffer, size int64, url string, bailOnHTTPErr,
withNetTracing, ignoreErr bool, itemType interface{}) {
Expand All @@ -282,7 +307,7 @@ func (ctx *DeferredContext) SetDeferred(
ind := 0
var itemList *deferredItem
for ind, itemList = range ctx.deferredItems {
if itemList.key == key && itemList.url == url {
if itemList.key == key {
found = true
break
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/pillar/zedcloud/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,20 @@ type ZedCloudContext struct {
serverSigningCertHash []byte
onBoardCertBytes []byte
log *base.LogObject
// All HTTP requests which can't be dropped and send should be
// repeated in case of a transmission error are added to this
// queue.
// All controller HTTP requests which can't be dropped and send
// should be repeated in case of a transmission error are added to
// this queue.
DeferredEventCtx *DeferredContext
// All periodic HTTP requests are added to this queue, sending
// errors of which can be ignored. This means even the request has
// failed, it will be removed from the queue, so there is no need
// to `kick` this queue once connectivity has restored.
// All periodic controller HTTP requests are added to this queue,
// sending errors of which can be ignored. This means even the
// request has failed, it will be removed from the queue, so there
// is no need to `kick` this queue once connectivity has restored.
DeferredPeriodicCtx *DeferredContext
// All periodic LOC HTTP requests are added to this queue,
// sending errors of which can be ignored. This means even the
// request has failed, it will be removed from the queue, so there
// is no need to `kick` this queue once connectivity has restored.
DeferredLOCPeriodicCtx *DeferredContext
}

// ContextOptions - options to be passed at NewContext
Expand Down
Loading