From 943854469db945631e79ccd0f8014cdb1e584963 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 6 Dec 2019 22:11:41 -0500 Subject: [PATCH 1/2] driver: allow disabling log collection Operators commonly have docker logs aggregated using various tools and don't need nomad to manage their docker logs. Worse, Nomad uses a somewhat heavy docker api call to collect them and it seems to cause problems when a client runs hundreds of log collections. Here we add a knob to disable log aggregation completely for nomad. When log collection is disabled, we avoid running logmon and docker_logger for the docker tasks in this implementation. The downside here is once disabled, `nomad logs ...` commands and API no longer return logs and operators must corrolate alloc-ids with their aggregated log info. This is meant as a stop gap measure. Ideally, we'd follow up with at least two changes: First, we should optimize behavior when we can such that operators don't need to disable docker log collection. Potentially by reverting to using pre-0.9 syslog aggregation in linux environments, though with different trade-offs. Second, when/if logs are disabled, nomad logs endpoints should lookup docker logs api on demand. This ensures that the cost of log collection is paid sparingly. --- client/allocrunner/taskrunner/logmon_hook.go | 23 ++++++++- .../taskrunner/logmon_hook_test.go | 3 +- .../taskrunner/logmon_hook_unix_test.go | 6 ++- .../taskrunner/task_runner_hooks.go | 2 +- drivers/docker/config.go | 29 +++++++---- drivers/docker/config_test.go | 36 +++++++++++++ drivers/docker/driver.go | 51 +++++++++++-------- drivers/docker/handle.go | 15 ++++-- plugins/drivers/driver.go | 15 ++++++ 9 files changed, 141 insertions(+), 39 deletions(-) diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index 77770d6b50ec..c4c77f002c86 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" bstructs "github.com/hashicorp/nomad/plugins/base/structs" + "github.com/hashicorp/nomad/plugins/drivers" pstructs "github.com/hashicorp/nomad/plugins/shared/structs" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -28,6 +29,8 @@ const ( // logmonHook launches logmon and manages task logging type logmonHook struct { + runner *TaskRunner + // logmon is the handle to the log monitor process for the task. logmon logmon.LogMon logmonPluginClient *plugin.Client @@ -43,9 +46,10 @@ type logmonHookConfig struct { stderrFifo string } -func newLogMonHook(cfg *logmonHookConfig, logger hclog.Logger) *logmonHook { +func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook { hook := &logmonHook{ - config: cfg, + runner: tr, + config: tr.logmonHookConfig, logger: logger, } @@ -99,6 +103,11 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig, func (h *logmonHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { + if h.isLoggingDisabled() { + h.logger.Info("logging is disabled by driver") + return nil + } + attempts := 0 for { err := h.prestartOneLoop(ctx, req) @@ -130,6 +139,16 @@ func (h *logmonHook) Prestart(ctx context.Context, } } +func (h *logmonHook) isLoggingDisabled() bool { + ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver) + if !ok { + return false + } + + caps := ic.InternalCapabilities() + return caps.DisableLogCollection +} + func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error { // attach to a running logmon if state indicates one if h.logmonPluginClient == nil { diff --git a/client/allocrunner/taskrunner/logmon_hook_test.go b/client/allocrunner/taskrunner/logmon_hook_test.go index 426a9ea2fd2d..b3a087995437 100644 --- a/client/allocrunner/taskrunner/logmon_hook_test.go +++ b/client/allocrunner/taskrunner/logmon_hook_test.go @@ -72,7 +72,8 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) { }() hookConf := newLogMonHookConfig(task.Name, dir) - hook := newLogMonHook(hookConf, testlog.HCLogger(t)) + runner := &TaskRunner{logmonHookConfig: hookConf} + hook := newLogMonHook(runner, testlog.HCLogger(t)) req := interfaces.TaskPrestartRequest{ Task: task, diff --git a/client/allocrunner/taskrunner/logmon_hook_unix_test.go b/client/allocrunner/taskrunner/logmon_hook_unix_test.go index 4a8a461536ad..09392d6d2bbf 100644 --- a/client/allocrunner/taskrunner/logmon_hook_unix_test.go +++ b/client/allocrunner/taskrunner/logmon_hook_unix_test.go @@ -36,7 +36,8 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) { }() hookConf := newLogMonHookConfig(task.Name, dir) - hook := newLogMonHook(hookConf, testlog.HCLogger(t)) + runner := &TaskRunner{logmonHookConfig: hookConf} + hook := newLogMonHook(runner, testlog.HCLogger(t)) req := interfaces.TaskPrestartRequest{ Task: task, @@ -104,7 +105,8 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) { }() hookConf := newLogMonHookConfig(task.Name, dir) - hook := newLogMonHook(hookConf, testlog.HCLogger(t)) + runner := &TaskRunner{logmonHookConfig: hookConf} + hook := newLogMonHook(runner, testlog.HCLogger(t)) req := interfaces.TaskPrestartRequest{ Task: task, diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 374f29f42ede..2f5723197e51 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -61,7 +61,7 @@ func (tr *TaskRunner) initHooks() { tr.runnerHooks = []interfaces.TaskHook{ newValidateHook(tr.clientConfig, hookLogger), newTaskDirHook(tr, hookLogger), - newLogMonHook(tr.logmonHookConfig, hookLogger), + newLogMonHook(tr, hookLogger), newDispatchHook(alloc, hookLogger), newVolumeHook(tr, hookLogger), newArtifactHook(tr, hookLogger), diff --git a/drivers/docker/config.go b/drivers/docker/config.go index 3b31f0da6108..ec4ccf7b5ece 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -257,6 +257,8 @@ var ( hclspec.NewAttr("infra_image", "string", false), hclspec.NewLiteral(`"gcr.io/google_containers/pause-amd64:3.0"`), ), + + "disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false), }) // taskConfigSpec is the hcl specification for the driver config section of @@ -549,15 +551,16 @@ type ContainerGCConfig struct { } type DriverConfig struct { - Endpoint string `codec:"endpoint"` - Auth AuthConfig `codec:"auth"` - TLS TLSConfig `codec:"tls"` - GC GCConfig `codec:"gc"` - Volumes VolumeConfig `codec:"volumes"` - AllowPrivileged bool `codec:"allow_privileged"` - AllowCaps []string `codec:"allow_caps"` - GPURuntimeName string `codec:"nvidia_runtime"` - InfraImage string `codec:"infra_image"` + Endpoint string `codec:"endpoint"` + Auth AuthConfig `codec:"auth"` + TLS TLSConfig `codec:"tls"` + GC GCConfig `codec:"gc"` + Volumes VolumeConfig `codec:"volumes"` + AllowPrivileged bool `codec:"allow_privileged"` + AllowCaps []string `codec:"allow_caps"` + GPURuntimeName string `codec:"nvidia_runtime"` + InfraImage string `codec:"infra_image"` + DisableLogCollection bool `codec:"disable_log_collection"` } type AuthConfig struct { @@ -660,3 +663,11 @@ func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { func (d *Driver) Capabilities() (*drivers.Capabilities, error) { return capabilities, nil } + +var _ drivers.InternalCapabilitiesDriver = (*Driver)(nil) + +func (d *Driver) InternalCapabilities() drivers.InternalCapabilities { + return drivers.InternalCapabilities{ + DisableLogCollection: d.config != nil && d.config.DisableLogCollection, + } +} diff --git a/drivers/docker/config_test.go b/drivers/docker/config_test.go index 57f6725d8c95..65cf3f17ef7f 100644 --- a/drivers/docker/config_test.go +++ b/drivers/docker/config_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" + "github.com/hashicorp/nomad/plugins/drivers" "github.com/stretchr/testify/require" ) @@ -488,3 +489,38 @@ func TestConfig_DriverConfig_DanglingContainers(t *testing.T) { }) } } + +func TestConfig_InternalCapabilities(t *testing.T) { + cases := []struct { + name string + config string + expected drivers.InternalCapabilities + }{ + { + name: "pure default", + config: `{}`, + expected: drivers.InternalCapabilities{}, + }, + { + name: "disabled", + config: `{ disable_log_collection = true }`, + expected: drivers.InternalCapabilities{DisableLogCollection: true}, + }, + { + name: "enabled explicitly", + config: `{ disable_log_collection = false }`, + expected: drivers.InternalCapabilities{}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var tc DriverConfig + hclutils.NewConfigParser(configSpec).ParseHCL(t, "config "+c.config, &tc) + + d := &Driver{config: &tc} + require.Equal(t, c.expected, d.InternalCapabilities()) + }) + } + +} diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index c6d448619c80..e9ebeb7d91f6 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -209,23 +209,25 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { net: handleState.DriverNetwork, } - h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig) - if err != nil { - d.logger.Warn("failed to reattach to docker logger process", "error", err) - - h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now()) + if !d.config.DisableLogCollection { + h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig) if err != nil { - if err := client.StopContainer(handleState.ContainerID, 0); err != nil { - d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) + d.logger.Warn("failed to reattach to docker logger process", "error", err) + + h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now()) + if err != nil { + if err := client.StopContainer(handleState.ContainerID, 0); err != nil { + d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) + } + return fmt.Errorf("failed to setup replacement docker logger: %v", err) } - return fmt.Errorf("failed to setup replacement docker logger: %v", err) - } - if err := handle.SetDriverState(h.buildState()); err != nil { - if err := client.StopContainer(handleState.ContainerID, 0); err != nil { - d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) + if err := handle.SetDriverState(h.buildState()); err != nil { + if err := client.StopContainer(handleState.ContainerID, 0); err != nil { + d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err) + } + return fmt.Errorf("failed to store driver state: %v", err) } - return fmt.Errorf("failed to store driver state: %v", err) } } @@ -334,11 +336,18 @@ CREATE: container.ID, "container_state", container.State.String()) } - dlogger, pluginClient, err := d.setupNewDockerLogger(container, cfg, time.Unix(0, 0)) - if err != nil { - d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID) - client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) - return nil, nil, err + collectingLogs := !d.config.DisableLogCollection + + var dlogger docklog.DockerLogger + var pluginClient *plugin.Client + + if collectingLogs { + dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0)) + if err != nil { + d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID) + client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) + return nil, nil, err + } } // Detect container address @@ -368,8 +377,10 @@ CREATE: if err := handle.SetDriverState(h.buildState()); err != nil { d.logger.Error("error encoding container occurred after startup, terminating container", "container_id", container.ID, "error", err) - dlogger.Stop() - pluginClient.Kill() + if collectingLogs { + dlogger.Stop() + pluginClient.Kill() + } client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true}) return nil, nil, err } diff --git a/drivers/docker/handle.go b/drivers/docker/handle.go index 398b53d74b89..3e1854a76572 100644 --- a/drivers/docker/handle.go +++ b/drivers/docker/handle.go @@ -51,11 +51,14 @@ type taskHandleState struct { } func (h *taskHandle) buildState() *taskHandleState { - return &taskHandleState{ - ReattachConfig: pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig()), - ContainerID: h.containerID, - DriverNetwork: h.net, + s := &taskHandleState{ + ContainerID: h.containerID, + DriverNetwork: h.net, } + if h.dloggerPluginClient != nil { + s.ReattachConfig = pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig()) + } + return s } func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*drivers.ExecTaskResult, error) { @@ -171,6 +174,10 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal os.Signal) error { } func (h *taskHandle) shutdownLogger() { + if h.dlogger == nil { + return + } + if err := h.dlogger.Stop(); err != nil { h.logger.Error("failed to stop docker logger process during StopTask", "error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid) diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index a4259a6f8ee0..19e5650fffe8 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -527,3 +527,18 @@ type ExecTaskStream interface { type ExecTaskStreamingRequestMsg = proto.ExecTaskStreamingRequest type ExecTaskStreamingResponseMsg = proto.ExecTaskStreamingResponse + +// InternalCapabilitiesDriver is an experimental interface enabling a driver +// to disable some nomad functionality (e.g. logs or metrics). +// +// Intended for internal drivers only while the interface is stabalized. +type InternalCapabilitiesDriver interface { + InternalCapabilities() InternalCapabilities +} + +// InternalCapabilities flags disabled functionality. +// Zero value means all is supported. +type InternalCapabilities struct { + DisableLogCollection bool + DisableMetricsCollection bool +} From e82dad732beaa6e4113b84c25d35757639f2e333 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 13 Dec 2019 11:08:12 -0500 Subject: [PATCH 2/2] address review comments --- client/allocrunner/taskrunner/logmon_hook.go | 2 +- drivers/docker/config.go | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/taskrunner/logmon_hook.go b/client/allocrunner/taskrunner/logmon_hook.go index c4c77f002c86..b983929599dd 100644 --- a/client/allocrunner/taskrunner/logmon_hook.go +++ b/client/allocrunner/taskrunner/logmon_hook.go @@ -104,7 +104,7 @@ func (h *logmonHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { if h.isLoggingDisabled() { - h.logger.Info("logging is disabled by driver") + h.logger.Debug("logging is disabled by driver") return nil } diff --git a/drivers/docker/config.go b/drivers/docker/config.go index ec4ccf7b5ece..33c8b5ff0a82 100644 --- a/drivers/docker/config.go +++ b/drivers/docker/config.go @@ -258,6 +258,8 @@ var ( hclspec.NewLiteral(`"gcr.io/google_containers/pause-amd64:3.0"`), ), + // disable_log_collection indicates whether docker driver should collect logs of docker + // task containers. If true, nomad doesn't start docker_logger/logmon processes "disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false), })