Skip to content

Commit

Permalink
driver: allow disabling log collection
Browse files Browse the repository at this point in the history
Operators commonly have docker logs aggregated using various tools and
don't need nomad to manage their docker logs.  Worse, Nomad uses a
somewhat heavy docker api call to collect them and it seems to cause
problems when a client runs hundreds of log collections.

Here we add a knob to disable log aggregation completely for nomad.
When log collection is disabled, we avoid running logmon and
docker_logger for the docker tasks in this implementation.

The downside here is once disabled, `nomad logs ...` commands and API
no longer return logs and operators must corrolate alloc-ids with their
aggregated log info.

This is meant as a stop gap measure.  Ideally, we'd follow up with at
least two changes:

First, we should optimize behavior when we can such that operators don't
need to disable docker log collection.  Potentially by reverting to
using pre-0.9 syslog aggregation in linux environments, though with
different trade-offs.

Second, when/if logs are disabled, nomad logs endpoints should lookup
docker logs api on demand.  This ensures that the cost of log collection
is paid sparingly.
  • Loading branch information
Mahmood Ali committed Dec 8, 2019
1 parent ad2af25 commit 9438544
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 39 deletions.
23 changes: 21 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
bstructs "github.com/hashicorp/nomad/plugins/base/structs"
"github.com/hashicorp/nomad/plugins/drivers"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -28,6 +29,8 @@ const (

// logmonHook launches logmon and manages task logging
type logmonHook struct {
runner *TaskRunner

// logmon is the handle to the log monitor process for the task.
logmon logmon.LogMon
logmonPluginClient *plugin.Client
Expand All @@ -43,9 +46,10 @@ type logmonHookConfig struct {
stderrFifo string
}

func newLogMonHook(cfg *logmonHookConfig, logger hclog.Logger) *logmonHook {
func newLogMonHook(tr *TaskRunner, logger hclog.Logger) *logmonHook {
hook := &logmonHook{
config: cfg,
runner: tr,
config: tr.logmonHookConfig,
logger: logger,
}

Expand Down Expand Up @@ -99,6 +103,11 @@ func reattachConfigFromHookData(data map[string]string) (*plugin.ReattachConfig,
func (h *logmonHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {

if h.isLoggingDisabled() {
h.logger.Info("logging is disabled by driver")
return nil
}

attempts := 0
for {
err := h.prestartOneLoop(ctx, req)
Expand Down Expand Up @@ -130,6 +139,16 @@ func (h *logmonHook) Prestart(ctx context.Context,
}
}

func (h *logmonHook) isLoggingDisabled() bool {
ic, ok := h.runner.driver.(drivers.InternalCapabilitiesDriver)
if !ok {
return false
}

caps := ic.InternalCapabilities()
return caps.DisableLogCollection
}

func (h *logmonHook) prestartOneLoop(ctx context.Context, req *interfaces.TaskPrestartRequest) error {
// attach to a running logmon if state indicates one
if h.logmonPluginClient == nil {
Expand Down
3 changes: 2 additions & 1 deletion client/allocrunner/taskrunner/logmon_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func TestTaskRunner_LogmonHook_StartStop(t *testing.T) {
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
Expand Down
6 changes: 4 additions & 2 deletions client/allocrunner/taskrunner/logmon_hook_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestTaskRunner_LogmonHook_StartCrashStop(t *testing.T) {
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
Expand Down Expand Up @@ -104,7 +105,8 @@ func TestTaskRunner_LogmonHook_ShutdownMidStart(t *testing.T) {
}()

hookConf := newLogMonHookConfig(task.Name, dir)
hook := newLogMonHook(hookConf, testlog.HCLogger(t))
runner := &TaskRunner{logmonHookConfig: hookConf}
hook := newLogMonHook(runner, testlog.HCLogger(t))

req := interfaces.TaskPrestartRequest{
Task: task,
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (tr *TaskRunner) initHooks() {
tr.runnerHooks = []interfaces.TaskHook{
newValidateHook(tr.clientConfig, hookLogger),
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newLogMonHook(tr, hookLogger),
newDispatchHook(alloc, hookLogger),
newVolumeHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
Expand Down
29 changes: 20 additions & 9 deletions drivers/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ var (
hclspec.NewAttr("infra_image", "string", false),
hclspec.NewLiteral(`"gcr.io/google_containers/pause-amd64:3.0"`),
),

"disable_log_collection": hclspec.NewAttr("disable_log_collection", "bool", false),
})

// taskConfigSpec is the hcl specification for the driver config section of
Expand Down Expand Up @@ -549,15 +551,16 @@ type ContainerGCConfig struct {
}

type DriverConfig struct {
Endpoint string `codec:"endpoint"`
Auth AuthConfig `codec:"auth"`
TLS TLSConfig `codec:"tls"`
GC GCConfig `codec:"gc"`
Volumes VolumeConfig `codec:"volumes"`
AllowPrivileged bool `codec:"allow_privileged"`
AllowCaps []string `codec:"allow_caps"`
GPURuntimeName string `codec:"nvidia_runtime"`
InfraImage string `codec:"infra_image"`
Endpoint string `codec:"endpoint"`
Auth AuthConfig `codec:"auth"`
TLS TLSConfig `codec:"tls"`
GC GCConfig `codec:"gc"`
Volumes VolumeConfig `codec:"volumes"`
AllowPrivileged bool `codec:"allow_privileged"`
AllowCaps []string `codec:"allow_caps"`
GPURuntimeName string `codec:"nvidia_runtime"`
InfraImage string `codec:"infra_image"`
DisableLogCollection bool `codec:"disable_log_collection"`
}

type AuthConfig struct {
Expand Down Expand Up @@ -660,3 +663,11 @@ func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}

var _ drivers.InternalCapabilitiesDriver = (*Driver)(nil)

func (d *Driver) InternalCapabilities() drivers.InternalCapabilities {
return drivers.InternalCapabilities{
DisableLogCollection: d.config != nil && d.config.DisableLogCollection,
}
}
36 changes: 36 additions & 0 deletions drivers/docker/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -488,3 +489,38 @@ func TestConfig_DriverConfig_DanglingContainers(t *testing.T) {
})
}
}

func TestConfig_InternalCapabilities(t *testing.T) {
cases := []struct {
name string
config string
expected drivers.InternalCapabilities
}{
{
name: "pure default",
config: `{}`,
expected: drivers.InternalCapabilities{},
},
{
name: "disabled",
config: `{ disable_log_collection = true }`,
expected: drivers.InternalCapabilities{DisableLogCollection: true},
},
{
name: "enabled explicitly",
config: `{ disable_log_collection = false }`,
expected: drivers.InternalCapabilities{},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var tc DriverConfig
hclutils.NewConfigParser(configSpec).ParseHCL(t, "config "+c.config, &tc)

d := &Driver{config: &tc}
require.Equal(t, c.expected, d.InternalCapabilities())
})
}

}
51 changes: 31 additions & 20 deletions drivers/docker/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,23 +209,25 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
net: handleState.DriverNetwork,
}

h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
d.logger.Warn("failed to reattach to docker logger process", "error", err)

h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
if !d.config.DisableLogCollection {
h.dlogger, h.dloggerPluginClient, err = d.reattachToDockerLogger(handleState.ReattachConfig)
if err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
d.logger.Warn("failed to reattach to docker logger process", "error", err)

h.dlogger, h.dloggerPluginClient, err = d.setupNewDockerLogger(container, handle.Config, time.Now())
if err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
}
return fmt.Errorf("failed to setup replacement docker logger: %v", err)
}
return fmt.Errorf("failed to setup replacement docker logger: %v", err)
}

if err := handle.SetDriverState(h.buildState()); err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
if err := handle.SetDriverState(h.buildState()); err != nil {
if err := client.StopContainer(handleState.ContainerID, 0); err != nil {
d.logger.Warn("failed to stop container during cleanup", "container_id", handleState.ContainerID, "error", err)
}
return fmt.Errorf("failed to store driver state: %v", err)
}
return fmt.Errorf("failed to store driver state: %v", err)
}
}

Expand Down Expand Up @@ -334,11 +336,18 @@ CREATE:
container.ID, "container_state", container.State.String())
}

dlogger, pluginClient, err := d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
if err != nil {
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err
collectingLogs := !d.config.DisableLogCollection

var dlogger docklog.DockerLogger
var pluginClient *plugin.Client

if collectingLogs {
dlogger, pluginClient, err = d.setupNewDockerLogger(container, cfg, time.Unix(0, 0))
if err != nil {
d.logger.Error("an error occurred after container startup, terminating container", "container_id", container.ID)
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err
}
}

// Detect container address
Expand Down Expand Up @@ -368,8 +377,10 @@ CREATE:

if err := handle.SetDriverState(h.buildState()); err != nil {
d.logger.Error("error encoding container occurred after startup, terminating container", "container_id", container.ID, "error", err)
dlogger.Stop()
pluginClient.Kill()
if collectingLogs {
dlogger.Stop()
pluginClient.Kill()
}
client.RemoveContainer(docker.RemoveContainerOptions{ID: container.ID, Force: true})
return nil, nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions drivers/docker/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ type taskHandleState struct {
}

func (h *taskHandle) buildState() *taskHandleState {
return &taskHandleState{
ReattachConfig: pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig()),
ContainerID: h.containerID,
DriverNetwork: h.net,
s := &taskHandleState{
ContainerID: h.containerID,
DriverNetwork: h.net,
}
if h.dloggerPluginClient != nil {
s.ReattachConfig = pstructs.ReattachConfigFromGoPlugin(h.dloggerPluginClient.ReattachConfig())
}
return s
}

func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*drivers.ExecTaskResult, error) {
Expand Down Expand Up @@ -171,6 +174,10 @@ func (h *taskHandle) Kill(killTimeout time.Duration, signal os.Signal) error {
}

func (h *taskHandle) shutdownLogger() {
if h.dlogger == nil {
return
}

if err := h.dlogger.Stop(); err != nil {
h.logger.Error("failed to stop docker logger process during StopTask",
"error", err, "logger_pid", h.dloggerPluginClient.ReattachConfig().Pid)
Expand Down
15 changes: 15 additions & 0 deletions plugins/drivers/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,18 @@ type ExecTaskStream interface {

type ExecTaskStreamingRequestMsg = proto.ExecTaskStreamingRequest
type ExecTaskStreamingResponseMsg = proto.ExecTaskStreamingResponse

// InternalCapabilitiesDriver is an experimental interface enabling a driver
// to disable some nomad functionality (e.g. logs or metrics).
//
// Intended for internal drivers only while the interface is stabalized.
type InternalCapabilitiesDriver interface {
InternalCapabilities() InternalCapabilities
}

// InternalCapabilities flags disabled functionality.
// Zero value means all is supported.
type InternalCapabilities struct {
DisableLogCollection bool
DisableMetricsCollection bool
}

0 comments on commit 9438544

Please sign in to comment.