diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index 1f58d7e33644..77770d6b50ec 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -6,6 +6,7 @@ import ( "fmt" "path/filepath" "runtime" + "time" hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" @@ -13,7 +14,10 @@ import ( "github.com/hashicorp/nomad/client/logmon" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + bstructs "github.com/hashicorp/nomad/plugins/base/structs" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) const ( @@ -95,7 +99,39 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig, func (h *logmonHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { - // Attempt to reattach to logmon + attempts := 0 + for { + err := h.prestartOneLoop(ctx, req) + if err == bstructs.ErrPluginShutdown || grpc.Code(err) == codes.Unavailable { + h.logger.Warn("logmon shutdown while making request", "error", err) + + if attempts > 3 { + h.logger.Warn("logmon shutdown while making request; giving up", "attempts", attempts, "error", err) + return err + } + + // retry after killing process and ensure we start a new logmon process + attempts++ + h.logger.Warn("logmon shutdown while making request; retrying", "attempts", attempts, "error", err) + h.logmonPluginClient.Kill() + time.Sleep(1 * time.Second) + continue + } else if err != nil { + return err + } + + rCfg := pstructs.ReattachConfigFromGoPlugin(h.logmonPluginClient.ReattachConfig()) + jsonCfg, err := json.Marshal(rCfg) + if err != nil { + return err + } + resp.State = map[string]string{logmonReattachKey: string(jsonCfg)} + return nil + } +} + +func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error { + // attach to a running logmon if state indicates one if h.logmonPluginClient == nil { reattachConfig, err := reattachConfigFromHookData(req.PreviousState) if err != nil { @@ -105,12 +141,13 @@ func (h *logmonHook) Prestart(ctx context.Context, if reattachConfig != nil { if err := h.launchLogMon(reattachConfig); err != nil { h.logger.Warn("failed to reattach to logmon process", "error", err) + // if we failed to launch logmon, try again below } } } - // We did not reattach to a plugin and one is still not running. + // create a new client in initial starts, failed reattachment, or if we detect exits if h.logmonPluginClient == nil || h.logmonPluginClient.Exited() { if err := h.launchLogMon(nil); err != nil { // Retry errors launching logmon as logmon may have crashed on start and @@ -134,12 +171,6 @@ func (h *logmonHook) Prestart(ctx context.Context, return err } - rCfg := pstructs.ReattachConfigFromGoPlugin(h.logmonPluginClient.ReattachConfig()) - jsonCfg, err := json.Marshal(rCfg) - if err != nil { - return err - } - resp.State = map[string]string{logmonReattachKey: string(jsonCfg)} return nil } diff --git a/client/allocrunner/taskrunner/logmon_hook_unix_test.go b/client/allocrunner/taskrunner/logmon_hook_unix_test.go index 582692a27bde..4a8a461536ad 100644 --- a/client/allocrunner/taskrunner/logmon_hook_unix_test.go +++ b/client/allocrunner/taskrunner/logmon_hook_unix_test.go @@ -10,11 +10,13 @@ import ( "os" "syscall" "testing" + "time" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/testutil" + "github.com/shirou/gopsutil/process" "github.com/stretchr/testify/require" ) @@ -86,3 +88,89 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) { // Running stop should shutdown logmon require.NoError(t, hook.Stop(context.Background(), nil, nil)) } + +// TestTaskRunner_LogmonHook_ShutdownMidStart simulates logmon crashing while the +// Nomad client is calling Start() and asserts that we recover and spawn a new logmon. +func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + + dir, err := ioutil.TempDir("", "nomadtest") + require.NoError(t, err) + defer func() { + require.NoError(t, os.RemoveAll(dir)) + }() + + hookConf := newLogMonHookConfig(task.Name, dir) + hook := newLogMonHook(hookConf, testlog.HCLogger(t)) + + req := interfaces.TaskPrestartRequest{ + Task: task, + } + resp := interfaces.TaskPrestartResponse{} + + // First start + require.NoError(t, hook.Prestart(context.Background(), &req, &resp)) + defer hook.Stop(context.Background(), nil, nil) + + origState := resp.State + origHookData := resp.State[logmonReattachKey] + require.NotEmpty(t, origHookData) + + // Pluck PID out of reattach synthesize a crash + reattach := struct { + Pid int + }{} + require.NoError(t, json.Unmarshal([]byte(origHookData), &reattach)) + pid := reattach.Pid + require.NotZero(t, pid) + + proc, err := process.NewProcess(int32(pid)) + require.NoError(t, err) + + // Assert logmon is running + require.NoError(t, proc.SendSignal(syscall.Signal(0))) + + // SIGSTOP would freeze process without it being considered + // exited; so this causes process to be non-exited at beginning of call + // then we kill process while Start call is running + require.NoError(t, proc.SendSignal(syscall.SIGSTOP)) + testutil.WaitForResult(func() (bool, error) { + status, err := proc.Status() + if err != nil { + return false, err + } + + if status != "T" && status != "T+" { + return false, fmt.Errorf("process is not asleep yet: %v", status) + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + go func() { + time.Sleep(2 * time.Second) + + proc.SendSignal(syscall.SIGCONT) + proc.Kill() + }() + + req.PreviousState = map[string]string{ + logmonReattachKey: origHookData, + } + + initLogmon, initClient := hook.logmon, hook.logmonPluginClient + + resp = interfaces.TaskPrestartResponse{} + err = hook.Prestart(context.Background(), &req, &resp) + require.NoError(t, err) + require.NotEqual(t, origState, resp.State) + + // assert that we got a new client and logmon + require.True(t, initLogmon != hook.logmon) + require.True(t, initClient != hook.logmonPluginClient) +} diff --git a/client/logmon/client.go b/client/logmon/client.go index ed9cef160183..80b691842040 100644 --- a/client/logmon/client.go +++ b/client/logmon/client.go @@ -4,10 +4,14 @@ import ( "context" "github.com/hashicorp/nomad/client/logmon/proto" + "github.com/hashicorp/nomad/helper/pluginutils/grpcutils" ) type logmonClient struct { client proto.LogMonClient + + // doneCtx is closed when the plugin exits + doneCtx context.Context } func (c *logmonClient) Start(cfg *LogConfig) error { @@ -21,11 +25,11 @@ func (c *logmonClient) Start(cfg *LogConfig) error { StderrFifo: cfg.StderrFifo, } _, err := c.client.Start(context.Background(), req) - return err + return grpcutils.HandleGrpcErr(err, c.doneCtx) } func (c *logmonClient) Stop() error { req := &proto.StopRequest{} _, err := c.client.Stop(context.Background(), req) - return err + return grpcutils.HandleGrpcErr(err, c.doneCtx) } diff --git a/client/logmon/plugin.go b/client/logmon/plugin.go index 07c8dfbf1ba4..3234806c7614 100644 --- a/client/logmon/plugin.go +++ b/client/logmon/plugin.go @@ -73,5 +73,8 @@ func (p *Plugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { } func (p *Plugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { - return &logmonClient{client: proto.NewLogMonClient(c)}, nil + return &logmonClient{ + doneCtx: ctx, + client: proto.NewLogMonClient(c), + }, nil }