From d8a43b2066197a6b135b308438cf0ddf7053f5e6 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 11 Jul 2019 13:27:37 +0800 Subject: [PATCH 1/3] Signal plugin shutdown for driver.TaskStats The driver plugin stub client must call `grpcutils.HandleGrpcErr` to handle plugin shutdown similar to other functions. This ensures that TaskStats returns `ErrPluginShutdown` when plugin shutdown. --- plugins/drivers/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/drivers/client.go b/plugins/drivers/client.go index ff284de6cc0a..3bb81a4f4b64 100644 --- a/plugins/drivers/client.go +++ b/plugins/drivers/client.go @@ -269,7 +269,7 @@ func (d *driverPluginClient) TaskStats(ctx context.Context, taskID string, inter return nil, structs.NewRecoverableError(err, rec.Recoverable) } } - return nil, err + return nil, grpcutils.HandleGrpcErr(err, d.doneCtx) } ch := make(chan *cstructs.TaskResourceUsage, 1) From bbf8f90ecb273c7d75bcd5b6bc98d31948db240f Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 11 Jul 2019 13:35:44 +0800 Subject: [PATCH 2/3] client/taskrunner: fix stats stats retry logic Previously, if a channel is closed, we retry the Stats call. But, if that call fails, we go in a backoff loop without calling Stats ever again. Here, we use a utility function for calling driverHandle.Stats call that retries as one expects. I aimed to preserve the logging formats but made small improvements as I saw fit. --- client/allocrunner/taskrunner/stats_hook.go | 100 +++++++++++--------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index e227f9524db7..24dee30e5e95 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -88,56 +88,18 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte // Collection ends when the passed channel is closed func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) { - ch, err := handle.Stats(ctx, h.interval) +MAIN: + ch, err := h.callStatsWithRetry(ctx, handle) if err != nil { - // Check if the driver doesn't implement stats - if err.Error() == cstructs.DriverStatsNotImplemented.Error() { - h.logger.Debug("driver does not support stats") - return - } - h.logger.Error("failed to start stats collection for task", "error", err) + return } - var backoff time.Duration - var retry int - limit := time.Second * 5 for { - time.Sleep(backoff) select { case ru, ok := <-ch: - // Channel is closed + // if channel closes, re-establish a new one if !ok { - var re *structs.RecoverableError - ch, err = handle.Stats(ctx, h.interval) - if err == nil { - goto RETRY - } - - // We do not log when the plugin is shutdown since this is - // likely because the driver plugin has unexpectedly exited, - // in which case sleeping and trying again or returning based - // on the stop channel is the correct behavior - if err != bstructs.ErrPluginShutdown { - h.logger.Debug("error fetching stats of task", "error", err) - goto RETRY - } - // check if the error is terminal otherwise it's likely a - // transport error and we should retry - re, ok = err.(*structs.RecoverableError) - if ok && re.IsUnrecoverable() { - return - } - h.logger.Warn("stats collection for task failed", "error", err) - RETRY: - // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * time.Second - if backoff > limit { - backoff = limit - } - // Increment retry counter - retry++ - - continue + goto MAIN } // Update stats on TaskRunner and emit them @@ -149,6 +111,58 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf } } +// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established +// successfully. Returns an error if it encounters a permanent error. +// +// It logs the errors with appropriate log levels; don't log returned error +func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) { + var retry int + +MAIN: + if ctx.Err() != nil { + return nil, ctx.Err() + } + + ch, err := handle.Stats(ctx, h.interval) + if err == nil { + return ch, nil + } + + // Check if the driver doesn't implement stats + if err.Error() == cstructs.DriverStatsNotImplemented.Error() { + h.logger.Debug("driver does not support stats") + return nil, err + } + + // check if the error is terminal otherwise it's likely a + // transport error and we should retry + if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() { + return nil, err + } + + // We do not warn when the plugin is shutdown since this is + // likely because the driver plugin has unexpectedly exited, + // in which case sleeping and trying again or returning based + // on the stop channel is the correct behavior + if err == bstructs.ErrPluginShutdown { + h.logger.Debug("failed to fetching stats of task", "error", err) + } else { + h.logger.Error("failed to start stats collection for task", "error", err) + } + + limit := time.Second * 5 + backoff := 1 << (2 * uint64(retry)) * time.Second + if backoff > limit || retry > 5 { + backoff = limit + } + + // Increment retry counter + retry++ + + time.Sleep(backoff) + goto MAIN +} + func (h *statsHook) Shutdown() { h.mu.Lock() defer h.mu.Unlock() From 66bef39dd564899a15dcaf0afd41204d7b0555f0 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 17 Jul 2019 11:01:59 +0700 Subject: [PATCH 3/3] log unrecoverable errors --- client/allocrunner/taskrunner/stats_hook.go | 1 + 1 file changed, 1 insertion(+) diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index 24dee30e5e95..4dee3315cf82 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -137,6 +137,7 @@ MAIN: // check if the error is terminal otherwise it's likely a // transport error and we should retry if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() { + h.logger.Error("failed to start stats collection for task with unrecoverable error", "error", err) return nil, err }