-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor v2 #4656
executor v2 #4656
Conversation
e0afea5
to
f646852
Compare
ba95b44
to
2b03e8b
Compare
60980f3
to
000a67e
Compare
client/driver/docker.go
Outdated
Task: task, | ||
Driver: "docker", | ||
LogDir: ctx.TaskDir.LogDir, | ||
/*executorCtx := &executor.ExecutorContext{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
client/driver/docker.go
Outdated
|
||
// The user hasn't specified any logging options so launch our own syslog | ||
// server if possible. | ||
syslogAddr := "" | ||
if len(d.driverConfig.Logging) == 0 { | ||
/*if len(d.driverConfig.Logging) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
101ccf4
to
245f7b0
Compare
client/driver/docker.go
Outdated
if err := h.executor.UpdateTask(task); err != nil { | ||
h.logger.Printf("[DEBUG] driver.docker: failed to update log config: %v", err) | ||
} | ||
//if err := h.executor.UpdateTask(task); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
"unique.cgroup.mountpoint": "/sys/fs/cgroup", | ||
}, | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix or remove
client/driver/executor/executor.go
Outdated
@@ -172,101 +188,65 @@ type UniversalExecutor struct { | |||
processExited chan interface{} | |||
fsIsolationEnforced bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this anymore
client/driver/executor/executor.go
Outdated
lre *logRotatorWrapper | ||
lro *logRotatorWrapper | ||
rotatorLock sync.Mutex | ||
|
||
syslogServer *logging.SyslogServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove syslog
userProcExited chan struct{} | ||
exitState *ProcessState | ||
|
||
syslogServer *logging.SyslogServer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove syslog
fa8865a
to
f77dc59
Compare
} | ||
|
||
func NewExecutorWithIsolation(logger hclog.Logger) Executor { | ||
logger = logger.Named("executor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
named "isolated_executor"
|
||
// Launch creates a new container in libcontainer and starts a new process with it | ||
func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, error) { | ||
// Find the nomad executable to launch the executor process with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use logger.WithFields() to set alloc_id and task_name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we actually don't have the task_name or alloc_id available.
|
||
cfg.Devices = lconfigs.DefaultAutoCreatedDevices | ||
cfg.Mounts = []*lconfigs.Mount{ | ||
/*{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
Flags: defaultMountFlags, | ||
}, | ||
{ | ||
Source: "sysfs", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets think about this.
return nil, err | ||
} | ||
|
||
pidStats, err := l.pidCollector.pidStats() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use cgroups pid tracker instead of host pid
} | ||
|
||
// Kill | ||
func (l *LibcontainerExecutor) Kill() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove from interface
|
||
} | ||
|
||
func (l *LibcontainerExecutor) getIsolationConfig() *dstructs.IsolationConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove from ProcessState
client/logmon/cmd/main.go
Outdated
@@ -0,0 +1,75 @@ | |||
package main |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -106,15 +71,29 @@ type ExecCommand struct { | |||
// Args is the args of the command that the user wants to run. | |||
Args []string | |||
|
|||
// Resources defined by the task | |||
Resources *Resources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NoOp comment - not convinced replicating resources struct is the right idea. Lets get further into it before we decide
_ "github.com/opencontainers/runc/libcontainer/nsenter" | ||
) | ||
|
||
func init() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leave a comment on what this does
) | ||
|
||
const ( | ||
defaultCgroupParent = "nomad" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on this
defaultCgroupParent = "nomad" | ||
) | ||
|
||
var allCaps []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on this and prefer to keep them all in the same var block
var allCaps []string | ||
|
||
var ( | ||
// The statistics the executor exposes when using cgroups |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments should start with the method/struct/variable name
client/logmon/logmon.go
Outdated
Stop() error | ||
} | ||
|
||
func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now but we should probably add it to the plugin-launcher eventually
client/logmon/cmd/main.go
Outdated
HandshakeConfig: logmon.Handshake, | ||
Plugins: map[string]plugin.Plugin{ | ||
"logmon": logmon.NewPlugin(logmon.NewLogMon(hclog.Default().Named("logmon.test"))), | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get rid of .test
client/logmon/logmon.go
Outdated
Stop() error | ||
} | ||
|
||
func LaunchLogMon(logger hclog.Logger) (LogMon, *plugin.Client, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be I the plugin.go file
client/logmon/logmon.go
Outdated
} | ||
|
||
func (tl *TaskLogger) Close() { | ||
tl.lro.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if nil guard these?
l.container = container | ||
|
||
combined := append([]string{command.Cmd}, command.Args...) | ||
stdout, err := command.Stdout() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to close these?
0cd25ba
to
5472381
Compare
@dadgar comments addressed |
5472381
to
28695b0
Compare
client/driver/exec.go
Outdated
@@ -297,7 +298,7 @@ func (h *execHandle) run() { | |||
close(h.doneCh) | |||
|
|||
// Destroy the executor | |||
if err := h.executor.Destroy(); err != nil { | |||
if err := h.executor.Shutdown(h.taskShutdownSignal, h.killTimeout); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a force as well. The main process has exited
client/driver/java.go
Outdated
@@ -437,7 +443,7 @@ func (h *javaHandle) run() { | |||
} | |||
|
|||
// Destroy the executor | |||
h.executor.Destroy() | |||
h.executor.Shutdown(h.shutdownSignal, h.killTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments apply to all drivers
client/driver/executor/executor.go
Outdated
@@ -45,7 +49,7 @@ var ( | |||
type Executor interface { | |||
Launch(*ExecCommand) (*ProcessState, error) | |||
Wait() (*ProcessState, error) | |||
Destroy() error | |||
Shutdown(string, time.Duration) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to have parameter names and comments in the interface
client/driver/executor/executor.go
Outdated
return fmt.Errorf("executor failed to shutdown error: no process found") | ||
} | ||
|
||
if signal == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should 444-460 be put in an if grace != 0
block
signal = "SIGINT" | ||
} | ||
|
||
sig, ok := signals.SignalLookup[signal] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment for handling a force kill
@@ -499,3 +577,115 @@ func newLibcontainerConfig(command *ExecCommand) *lconfigs.Config { | |||
configureCgroups(cfg, command) | |||
return cfg | |||
} | |||
|
|||
// configureResourceContainer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment
@@ -499,3 +577,115 @@ func newLibcontainerConfig(command *ExecCommand) *lconfigs.Config { | |||
configureCgroups(cfg, command) | |||
return cfg | |||
} | |||
|
|||
// configureResourceContainer | |||
func (e *UniversalExecutor) configureResourceContainer(pid int) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I don't like mixing the universal and libcontainer code in one file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree. executor_universal_linux.go
?
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" | ||
) | ||
|
||
// resourceContainerContext is a platform-specific struct for managing a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it clear that it is only used by the universal executor
This will be so good! good job so far @nickethier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to review logmon but wanted to post these comments first. Overall looks good! Feel free to ignore nitpicks!
@@ -375,6 +384,17 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { | |||
} | |||
} | |||
|
|||
func (tr *TaskRunner) launchLogmon() error { | |||
l, c, err := logmon.LaunchLogMon(tr.logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] stick to LogMon or logmon
if runtime.GOOS == "windows" { | ||
id := uuid.Generate()[:8] | ||
stdoutFifo = fmt.Sprintf("//./pipe/%s.stdout.%s", id, tr.task.Name) | ||
stderrFifo = fmt.Sprintf("//./pipe/%s.stderr.%s", id, tr.task.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Putting the task name last seems a bit strange. I think we use this style elsewhere:
{task}-{uuid}.stdout
stderrFifo = fmt.Sprintf("//./pipe/%s.stderr.%s", id, tr.task.Name) | ||
} else { | ||
stdoutFifo = filepath.Join(tr.taskDir.LogDir, fmt.Sprintf("%s.stdout", tr.task.Name)) | ||
stderrFifo = filepath.Join(tr.taskDir.LogDir, fmt.Sprintf("%s.stderr", tr.task.Name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make these hidden files and add the name FIFO to them to discourage people trying to use them directly via the FS API.
err = tr.logmon.Start(&logmon.LogConfig{ | ||
LogDir: tr.taskDir.LogDir, | ||
StdoutLogFile: fmt.Sprintf("%s.stdout", tr.task.Name), | ||
StderrLogFile: fmt.Sprintf("%s.stderr", tr.task.Name), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, won't this get appended to LogDir and be the same file as the FIFO? The answer must be "no" because otherwise I don't think it would work at all! 😅
Changing the FIFO filename above will make this configuration less confusing, so I think this can stay put.
client/driver/executor/executor.go
Outdated
@@ -163,110 +197,86 @@ func (v *ExecutorVersion) GoString() string { | |||
// and file system isolation | |||
type UniversalExecutor struct { | |||
cmd exec.Cmd | |||
ctx *ExecutorContext | |||
command *ExecCommand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you're in here can you come up with a saner naming scheme for these two fields that both read "command exec command" in English? 😅
I always get confused in the rest of the executor which is which.
return l.userProc.Signal(s) | ||
} | ||
|
||
// Exec starts an additional process inside the executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...inside the container
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logmon needs tests -- but feel free to add them in a followup PR since this once is huge so close to done!
client/logmon/logmon.go
Outdated
UID int | ||
|
||
// GID is id for the desried group to write log files as | ||
GID int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just run the logmon process as this user and group? We wouldn't have to plumb these through or chown files on creation.
client/logmon/logmon.go
Outdated
} | ||
|
||
// newLogRotatorWrapper takes a rotator and returns a wrapper that has the | ||
// processOutWriter to attach to the processes stdout or stderr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
processes -> process's
client/logmon/logmon.go
Outdated
return wrap, nil | ||
} | ||
|
||
// start starts a go-routine that copies from the pipe into the rotator. This is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
goroutine
RE: tmpfs/shm Memory does count towards the total cgroup limit and is tracked in the Do we want to add it to the limit? This would mean we're setting the limit to more than what was scheduled for. |
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error { | ||
|
||
// Launch logmon instance for the task. | ||
err := h.launchLogMon() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := h.launchLogMon(); err != nil {
|
||
func (h *logmonHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error { | ||
|
||
h.logmon.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nil guard both
|
||
// taskLoggingFifoGetter is used to return the paths to the stdout and stderr | ||
// fifos to be passed to the driver for task logging | ||
taskLoggingFifoGetter func() (stdoutFifo, stderrFifo string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on hook
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry! I should have given you more guidance with the hooks! Definitely need to document them somewhere...
logger: logger, | ||
} | ||
|
||
tr.taskLoggingFifoGetter = hook.getFifos |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should invert this relationship to avoid taskLoggingFifoGetter
ever being a nil func. Put a setter on TaskRunner to receive the FIFOs:
nomad/client/allocrunnerv2/taskrunner/task_runner.go
Lines 689 to 697 in 22fa055
// UpdateStats updates and emits the latest stats from the driver. | |
func (tr *TaskRunner) UpdateStats(ru *cstructs.TaskResourceUsage) { | |
tr.resourceUsageLock.Lock() | |
tr.resourceUsage = ru | |
tr.resourceUsageLock.Unlock() | |
if ru != nil { | |
tr.emitStats(ru) | |
} | |
} |
You can ever pass it via an interface to make hook testing easy:
nomad/client/allocrunnerv2/taskrunner/stats_hook_test.go
Lines 32 to 36 in 22fa055
func (m *mockStatsUpdater) UpdateStats(ru *cstructs.TaskResourceUsage) { | |
if m.Ch != nil { | |
m.Ch <- ru | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I'm not tied to this approach. "Polluting" our runners with a bunch of callbacks has its own downsides, so I'm open to ideas.)
return nil | ||
} | ||
|
||
func (h *logmonHook) Stop(context.Context, *interfaces.TaskStopRequest, *interfaces.TaskStopResponse) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be Exited to destroy the logger after each run of a task.
Stop is called when the task is permanently stopped and should probably be called Destroy?
* client/executor: refactor client to remove interpolation * executor: POC libcontainer based executor * vendor: use hashicorp libcontainer fork * vendor: add libcontainer/nsenter dep * executor: updated executor interface to simplify operations * executor: implement logging pipe * logmon: new logmon plugin to manage task logs * driver/executor: use logmon for log management * executor: fix tests and windows build * executor: fix logging key names * executor: fix test failures * executor: add config field to toggle between using libcontainer and standard executors * logmon: use discover utility to discover nomad executable * executor: only call libcontainer-shim on main in linux * logmon: use seperate path configs for stdout/stderr fifos * executor: windows fixes * executor: created reusable pid stats collection utility that can be used in an executor * executor: update fifo.Open calls * executor: fix build * remove executor from docker driver * executor: Shutdown func to kill and cleanup executor and its children * executor: move linux specific universal executor funcs to seperate file * move logmon initialization to a task runner hook * client: doc fixes and renaming from code review * taskrunner: use shared config struct for logmon fifo fields * taskrunner: logmon only needs to be started once per task
* client/executor: refactor client to remove interpolation * executor: POC libcontainer based executor * vendor: use hashicorp libcontainer fork * vendor: add libcontainer/nsenter dep * executor: updated executor interface to simplify operations * executor: implement logging pipe * logmon: new logmon plugin to manage task logs * driver/executor: use logmon for log management * executor: fix tests and windows build * executor: fix logging key names * executor: fix test failures * executor: add config field to toggle between using libcontainer and standard executors * logmon: use discover utility to discover nomad executable * executor: only call libcontainer-shim on main in linux * logmon: use seperate path configs for stdout/stderr fifos * executor: windows fixes * executor: created reusable pid stats collection utility that can be used in an executor * executor: update fifo.Open calls * executor: fix build * remove executor from docker driver * executor: Shutdown func to kill and cleanup executor and its children * executor: move linux specific universal executor funcs to seperate file * move logmon initialization to a task runner hook * client: doc fixes and renaming from code review * taskrunner: use shared config struct for logmon fifo fields * taskrunner: logmon only needs to be started once per task
I'm going to lock this pull request because it has been closed for 120 days ⏳. This helps our maintainers find and focus on the active contributions. |
Dependent on #4655 so as to separate the vendor changes from interface/implementation.
Some cleanup is still needed as well as plumbing logging in.