diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index 0ac5ed0770d4..337784eb3fd8 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -350,9 +350,11 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { req := structs.AllocSpecificRequest{ AllocID: p.prevAllocID, QueryOptions: structs.QueryOptions{ - Region: p.config.Region, - AllowStale: true, - AuthToken: p.config.Node.SecretID, + Region: p.config.Region, + AuthToken: p.config.Node.SecretID, + + // Initially get response from leader, then switch to stale + AllowStale: false, }, } @@ -369,15 +371,36 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { resp := structs.SingleAllocResponse{} err := p.rpc.RPC("Alloc.GetAlloc", &req, &resp) if err != nil { - p.logger.Error("error querying previous alloc", "error", err) retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) + timer, stop := helper.NewSafeTimer(retry) + p.logger.Error("error querying previous alloc", "error", err, "wait", retry) select { - case <-time.After(retry): + case <-timer.C: + continue + case <-ctx.Done(): + stop() + return ctx.Err() + } + } + + // Ensure that we didn't receive a stale response + if req.AllowStale && resp.Index < req.MinQueryIndex { + retry := getRemoteRetryIntv + helper.RandomStagger(getRemoteRetryIntv) + timer, stop := helper.NewSafeTimer(retry) + p.logger.Warn("received stale alloc; retrying", + "req_index", req.MinQueryIndex, + "resp_index", resp.Index, + "wait", retry, + ) + select { + case <-timer.C: continue case <-ctx.Done(): + stop() return ctx.Err() } } + if resp.Alloc == nil { p.logger.Debug("blocking alloc was GC'd") return nil @@ -389,6 +412,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { // Update the query index and requery. if resp.Index > req.MinQueryIndex { + req.AllowStale = true req.MinQueryIndex = resp.Index } } diff --git a/client/client.go b/client/client.go index 4e165354d6d7..6a1e797b5651 100644 --- a/client/client.go +++ b/client/client.go @@ -2242,6 +2242,7 @@ OUTER: // Node.GetClientAllocs which returns older results. if allocsResp.Index <= allocsReq.MinQueryIndex { retry := c.retryIntv(getAllocRetryIntv) + timer, stop := helper.NewSafeTimer(retry) c.logger.Warn("failed to retrieve updated allocs; retrying", "req_index", allocsReq.MinQueryIndex, "resp_index", allocsResp.Index, @@ -2249,9 +2250,10 @@ OUTER: "wait", retry, ) select { - case <-time.After(retry): + case <-timer.C: continue case <-c.shutdownCh: + stop() return } }