Skip to content

Commit

Permalink
client: prevent watching stale alloc state (#18612)
Browse files Browse the repository at this point in the history
When waiting on a previous alloc we must query against the leader before
switching to a stale query with index set.

Also check to ensure the response is fresh before using it like #18269
  • Loading branch information
schmichael committed Sep 29, 2023
1 parent d2cd6db commit e531bf1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
34 changes: 29 additions & 5 deletions client/allocwatcher/alloc_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,11 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
req := structs.AllocSpecificRequest{
AllocID: p.prevAllocID,
QueryOptions: structs.QueryOptions{
Region: p.config.Region,
AllowStale: true,
AuthToken: p.config.Node.SecretID,
Region: p.config.Region,
AuthToken: p.config.Node.SecretID,

// Initially get response from leader, then switch to stale
AllowStale: false,
},
}

Expand All @@ -369,15 +371,36 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
resp := structs.SingleAllocResponse{}
err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp)
if err != nil {
p.logger.Error("error querying previous alloc", "error", err)
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
p.logger.Error("error querying previous alloc", "error", err, "wait", retry)
select {
case <-time.After(retry):
case <-timer.C:
continue
case <-ctx.Done():
stop()
return ctx.Err()
}
}

// Ensure that we didn't receive a stale response
if req.AllowStale && resp.Index < req.MinQueryIndex {
retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
p.logger.Warn("received stale alloc; retrying",
"req_index", req.MinQueryIndex,
"resp_index", resp.Index,
"wait", retry,
)
select {
case <-timer.C:
continue
case <-ctx.Done():
stop()
return ctx.Err()
}
}

if resp.Alloc == nil {
p.logger.Debug("blocking alloc was GC'd")
return nil
Expand All @@ -389,6 +412,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {

// Update the query index and requery.
if resp.Index > req.MinQueryIndex {
req.AllowStale = true
req.MinQueryIndex = resp.Index
}
}
Expand Down
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2242,16 +2242,18 @@ OUTER:
// Node.GetClientAllocs which returns older results.
if allocsResp.Index <= allocsReq.MinQueryIndex {
retry := c.retryIntv(getAllocRetryIntv)
timer, stop := helper.NewSafeTimer(retry)
c.logger.Warn("failed to retrieve updated allocs; retrying",
"req_index", allocsReq.MinQueryIndex,
"resp_index", allocsResp.Index,
"num_allocs", len(pull),
"wait", retry,
)
select {
case <-time.After(retry):
case <-timer.C:
continue
case <-c.shutdownCh:
stop()
return
}
}
Expand Down

0 comments on commit e531bf1

Please sign in to comment.