From c23d673b7e2c551c137827b4bc33e48341fa1f2e Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 14:32:24 -0400 Subject: [PATCH 1/7] logmon client to handle grpc closing errors --- client/logmon/client.go | 8 ++++++-- client/logmon/plugin.go | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/client/logmon/client.go b/client/logmon/client.go index ed9cef160183..80b691842040 100644 --- a/client/logmon/client.go +++ b/client/logmon/client.go @@ -4,10 +4,14 @@ import ( "context" "github.com/hashicorp/nomad/client/logmon/proto" + "github.com/hashicorp/nomad/helper/pluginutils/grpcutils" ) type logmonClient struct { client proto.LogMonClient + + // doneCtx is closed when the plugin exits + doneCtx context.Context } func (c *logmonClient) Start(cfg *LogConfig) error { @@ -21,11 +25,11 @@ func (c *logmonClient) Start(cfg *LogConfig) error { StderrFifo: cfg.StderrFifo, } _, err := c.client.Start(context.Background(), req) - return err + return grpcutils.HandleGrpcErr(err, c.doneCtx) } func (c *logmonClient) Stop() error { req := &proto.StopRequest{} _, err := c.client.Stop(context.Background(), req) - return err + return grpcutils.HandleGrpcErr(err, c.doneCtx) } diff --git a/client/logmon/plugin.go b/client/logmon/plugin.go index 07c8dfbf1ba4..3234806c7614 100644 --- a/client/logmon/plugin.go +++ b/client/logmon/plugin.go @@ -73,5 +73,8 @@ func (p *Plugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { } func (p *Plugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { - return &logmonClient{client: proto.NewLogMonClient(c)}, nil + return &logmonClient{ + doneCtx: ctx, + client: proto.NewLogMonClient(c), + }, nil } From b21849cb028707037a07b788f946d452991151b1 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 15:07:08 -0400 Subject: [PATCH 2/7] logmon: retry starting logmon if it exits Retry if we detect shutting down during Start() api call is started, locally. --- client/allocrunner/taskrunner/logmon_hook.go | 43 ++++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index 1f58d7e33644..b3e146d6d273 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -6,6 +6,7 @@ import ( "fmt" "path/filepath" "runtime" + "time" hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" @@ -13,6 +14,7 @@ import ( "github.com/hashicorp/nomad/client/logmon" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" ) @@ -95,7 +97,37 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig, func (h *logmonHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { - // Attempt to reattach to logmon + tries := 0 + for { + err := h.prestartOneLoop(ctx, req) + if err == bstructs.ErrPluginShutdown { + h.logger.Warn("logmon shutdown while making request", "error", err) + + if tries > 3 { + return err + } + + // retry after killing process and ensure we start a new logmon process + tries++ + h.logmonPluginClient.Kill() + time.Sleep(1 * time.Second) + continue + } else if err != nil { + return err + } + + rCfg := pstructs.ReattachConfigFromGoPlugin(h.logmonPluginClient.ReattachConfig()) + jsonCfg, err := json.Marshal(rCfg) + if err != nil { + return err + } + resp.State = map[string]string{logmonReattachKey: string(jsonCfg)} + return nil + } +} + +func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error { + // attach to a running logmon if state indicates one if h.logmonPluginClient == nil { reattachConfig, err := reattachConfigFromHookData(req.PreviousState) if err != nil { @@ -105,12 +137,13 @@ func (h *logmonHook) Prestart(ctx context.Context, if reattachConfig != nil { if err := h.launchLogMon(reattachConfig); err != nil { h.logger.Warn("failed to reattach to logmon process", "error", err) + // if we failed to launch logmon, try again below } } } - // We did not reattach to a plugin and one is still not running. + // create a new client in initial starts, failed reattachment, or if we detect exits if h.logmonPluginClient == nil || h.logmonPluginClient.Exited() { if err := h.launchLogMon(nil); err != nil { // Retry errors launching logmon as logmon may have crashed on start and @@ -134,12 +167,6 @@ func (h *logmonHook) Prestart(ctx context.Context, return err } - rCfg := pstructs.ReattachConfigFromGoPlugin(h.logmonPluginClient.ReattachConfig()) - jsonCfg, err := json.Marshal(rCfg) - if err != nil { - return err - } - resp.State = map[string]string{logmonReattachKey: string(jsonCfg)} return nil } From 978fc65a2b07f8ab49856368225a4111f71c8ebd Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 16:26:01 -0400 Subject: [PATCH 3/7] add a test that simulates logmon dying during Start() call --- .../taskrunner/logmon_hook_unix_test.go | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/client/allocrunner/taskrunner/logmon_hook_unix_test.go b/client/allocrunner/taskrunner/logmon_hook_unix_test.go index 582692a27bde..1d9e66c1656b 100644 --- a/client/allocrunner/taskrunner/logmon_hook_unix_test.go +++ b/client/allocrunner/taskrunner/logmon_hook_unix_test.go @@ -10,6 +10,7 @@ import ( "os" "syscall" "testing" + "time" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/helper/testlog" @@ -86,3 +87,74 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) { // Running stop should shutdown logmon require.NoError(t, hook.Stop(context.Background(), nil, nil)) } + +// TestTaskRunner_LogmonHook_ShutdownMidStart simulates logmon crashing while the +// Nomad client is calling Start() and asserts that we recover and spawn a new logmon. +func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + hookConf := newLogMonHookConfig(task.Name, dir) + hook := newLogMonHook(hookConf, testlog.HCLogger(t)) + + req := interfaces.TaskPrestartRequest{ + Task: task, + } + resp := interfaces.TaskPrestartResponse{} + + // First start + require.NoError(t, hook.Prestart(context.Background(), &req, &resp)) + defer hook.Stop(context.Background(), nil, nil) + + origState := resp.State + origHookData := resp.State[logmonReattachKey] + require.NotEmpty(t, origHookData) + + // Pluck PID out of reattach synthesize a crash + reattach := struct { + Pid int + }{} + require.NoError(t, json.Unmarshal([]byte(origHookData), &reattach)) + pid := reattach.Pid + require.NotZero(t, pid) + + proc, _ := os.FindProcess(pid) + + // Assert logmon is running + require.NoError(t, proc.Signal(syscall.Signal(0))) + + // SIGSTOP would freeze process without it being considered + // exited; so this causes process to be non-exited at beginning of call + // then we kill process while Start call is running + require.NoError(t, proc.Signal(syscall.SIGSTOP)) + + go func() { + time.Sleep(2 * time.Second) + + proc.Signal(syscall.SIGCONT) + proc.Signal(os.Kill) + }() + + req.PreviousState = map[string]string{ + logmonReattachKey: origHookData, + } + + initLogmon, initClient := hook.logmon, hook.logmonPluginClient + + resp = interfaces.TaskPrestartResponse{} + err = hook.Prestart(context.Background(), &req, &resp) + require.NoError(t, err) + require.NotEqual(t, origState, resp.State) + + // assert that we got a new client and logmon + require.True(t, initLogmon != hook.logmon) + require.True(t, initClient != hook.logmonPluginClient) +} From ba373fee2aa4be5b09aa3bc3c1440aad9ab1346b Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 17:16:29 -0400 Subject: [PATCH 4/7] try sleeping for stop signal to take effect --- client/allocrunner/taskrunner/logmon_hook_unix_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/allocrunner/taskrunner/logmon_hook_unix_test.go b/client/allocrunner/taskrunner/logmon_hook_unix_test.go index 1d9e66c1656b..890b675014c6 100644 --- a/client/allocrunner/taskrunner/logmon_hook_unix_test.go +++ b/client/allocrunner/taskrunner/logmon_hook_unix_test.go @@ -135,6 +135,8 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) { // exited; so this causes process to be non-exited at beginning of call // then we kill process while Start call is running require.NoError(t, proc.Signal(syscall.SIGSTOP)) + // sleep for the signal to take effect + time.Sleep(1 * time.Second) go func() { time.Sleep(2 * time.Second) From 1f1551a4aecd1de2cda377dba864bb5cd3a8dc9c Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 18:09:36 -0400 Subject: [PATCH 5/7] add logging about attempts --- client/allocrunner/taskrunner/logmon_hook.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index b3e146d6d273..9efc6221850b 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -97,18 +97,20 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig, func (h *logmonHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { - tries := 0 + attempts := 0 for { err := h.prestartOneLoop(ctx, req) if err == bstructs.ErrPluginShutdown { h.logger.Warn("logmon shutdown while making request", "error", err) - if tries > 3 { + if attempts > 3 { + h.logger.Warn("logmon shutdown while making request; giving up", "attempts", attempts, "error", err) return err } // retry after killing process and ensure we start a new logmon process - tries++ + attempts++ + h.logger.Warn("logmon shutdown while making request; retrying", "attempts", attempts, "error", err) h.logmonPluginClient.Kill() time.Sleep(1 * time.Second) continue From 658a734912e4355675729becb86dd149ac80b067 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 18:16:13 -0400 Subject: [PATCH 6/7] try checking process status --- .../taskrunner/logmon_hook_unix_test.go | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/client/allocrunner/taskrunner/logmon_hook_unix_test.go b/client/allocrunner/taskrunner/logmon_hook_unix_test.go index 890b675014c6..4a8a461536ad 100644 --- a/client/allocrunner/taskrunner/logmon_hook_unix_test.go +++ b/client/allocrunner/taskrunner/logmon_hook_unix_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/testutil" + "github.com/shirou/gopsutil/process" "github.com/stretchr/testify/require" ) @@ -126,23 +127,36 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) { pid := reattach.Pid require.NotZero(t, pid) - proc, _ := os.FindProcess(pid) + proc, err := process.NewProcess(int32(pid)) + require.NoError(t, err) // Assert logmon is running - require.NoError(t, proc.Signal(syscall.Signal(0))) + require.NoError(t, proc.SendSignal(syscall.Signal(0))) // SIGSTOP would freeze process without it being considered // exited; so this causes process to be non-exited at beginning of call // then we kill process while Start call is running - require.NoError(t, proc.Signal(syscall.SIGSTOP)) - // sleep for the signal to take effect - time.Sleep(1 * time.Second) + require.NoError(t, proc.SendSignal(syscall.SIGSTOP)) + testutil.WaitForResult(func() (bool, error) { + status, err := proc.Status() + if err != nil { + return false, err + } + + if status != "T" && status != "T+" { + return false, fmt.Errorf("process is not asleep yet: %v", status) + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) go func() { time.Sleep(2 * time.Second) - proc.Signal(syscall.SIGCONT) - proc.Signal(os.Kill) + proc.SendSignal(syscall.SIGCONT) + proc.Kill() }() req.PreviousState = map[string]string{ From a321901ad89281b6edbbbc069b79231e0790fcbe Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Thu, 25 Apr 2019 18:39:17 -0400 Subject: [PATCH 7/7] retry grpc unavailable errors even if not shutting down --- client/allocrunner/taskrunner/logmon_hook.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index 9efc6221850b..77770d6b50ec 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -16,6 +16,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" bstructs "github.com/hashicorp/nomad/plugins/base/structs" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) const ( @@ -100,7 +102,7 @@ func (h *logmonHook) Prestart(ctx context.Context, attempts := 0 for { err := h.prestartOneLoop(ctx, req) - if err == bstructs.ErrPluginShutdown { + if err == bstructs.ErrPluginShutdown || grpc.Code(err) == codes.Unavailable { h.logger.Warn("logmon shutdown while making request", "error", err) if attempts > 3 {