Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qemu driver: graceful shutdown feature #3411

Merged
merged 12 commits into from
Nov 3, 2017
Merged
149 changes: 127 additions & 22 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package driver
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"os"
"os/exec"
"path/filepath"
Expand All @@ -13,7 +15,8 @@ import (
"strings"
"time"

"github.com/hashicorp/go-plugin"
"github.com/coreos/go-semver/semver"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
Expand All @@ -26,12 +29,27 @@ import (

var (
reQemuVersion = regexp.MustCompile(`version (\d[\.\d+]+)`)

// Prior to qemu 2.10.1, monitor socket paths are truncated to 108 bytes.
// We should consider this if driver.qemu.version is < 2.10.1 and the
// generated monitor path is too long.

//
// Relevant fix is here:
// https://github.com/qemu/qemu/commit/ad9579aaa16d5b385922d49edac2c96c79bcfb6
qemuVersionLongSocketPathFix = semver.New("2.10.1")
)

const (
// The key populated in Node Attributes to indicate presence of the Qemu
// driver
qemuDriverAttr = "driver.qemu"
// The key populated in Node Attributes to indicate presence of the Qemu driver
qemuDriverAttr = "driver.qemu"
qemuDriverVersionAttr = "driver.qemu.version"
// Represents an ACPI shutdown request to the VM (emulates pressing a physical power button)
// Reference: https://en.wikibooks.org/wiki/QEMU/Monitor
qemuGracefulShutdownMsg = "system_powerdown\n"
qemuMonitorSocketName = "qemu-monitor.sock"
// Maximum socket path length prior to qemu 2.10.1
qemuLegacyMaxMonitorPathLen = 108
)

// QemuDriver is a driver for running images via Qemu
Expand All @@ -45,17 +63,19 @@ type QemuDriver struct {
}

type QemuDriverConfig struct {
ImagePath string `mapstructure:"image_path"`
Accelerator string `mapstructure:"accelerator"`
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports.
Args []string `mapstructure:"args"` // extra arguments to qemu executable
ImagePath string `mapstructure:"image_path"`
Accelerator string `mapstructure:"accelerator"`
GracefulShutdown bool `mapstructure:"graceful_shutdown"`
PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports.
Args []string `mapstructure:"args"` // extra arguments to qemu executable
}

// qemuHandle is returned from Start/Open as a handle to the PID
type qemuHandle struct {
pluginClient *plugin.Client
userPid int
executor executor.Executor
monitorPath string
killTimeout time.Duration
maxKillTimeout time.Duration
logger *log.Logger
Expand All @@ -64,6 +84,29 @@ type qemuHandle struct {
doneCh chan struct{}
}

// getMonitorPath is used to determine whether a qemu monitor socket can be
// safely created and accessed in the task directory by the version of qemu
// present on the host. If it is safe to use, the socket's full path is
// returned along with a nil error. Otherwise, an empty string is returned
// along with a descriptive error.
func (d *QemuDriver) getMonitorPath(dir string) (string, error) {
var longPathSupport bool
currentQemuVer := d.DriverContext.node.Attributes[qemuDriverVersionAttr]
currentQemuSemver := semver.New(currentQemuVer)
if currentQemuSemver.LessThan(*qemuVersionLongSocketPathFix) {
longPathSupport = false
d.logger.Printf("[DEBUG] driver.qemu: long socket paths are not available in this version of QEMU (%s)", currentQemuVer)
} else {
longPathSupport = true
d.logger.Printf("[DEBUG] driver.qemu: long socket paths available in this version of QEMU (%s)", currentQemuVer)
}
fullSocketPath := fmt.Sprintf("%s/%s", dir, qemuMonitorSocketName)
if len(fullSocketPath) > qemuLegacyMaxMonitorPathLen && longPathSupport == false {
return "", fmt.Errorf("monitor path is too long for this version of qemu")
}
return fullSocketPath, nil
}

// NewQemuDriver is used to create a new exec driver
func NewQemuDriver(ctx *DriverContext) Driver {
return &QemuDriver{DriverContext: *ctx}
Expand All @@ -81,6 +124,10 @@ func (d *QemuDriver) Validate(config map[string]interface{}) error {
"accelerator": {
Type: fields.TypeString,
},
"graceful_shutdown": {
Type: fields.TypeBool,
Required: false,
},
"port_map": {
Type: fields.TypeArray,
},
Expand Down Expand Up @@ -127,9 +174,11 @@ func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
delete(node.Attributes, qemuDriverAttr)
return false, fmt.Errorf("Unable to parse Qemu version string: %#v", matches)
}
currentQemuVersion := matches[1]

node.Attributes[qemuDriverAttr] = "1"
node.Attributes["driver.qemu.version"] = matches[1]
node.Attributes[qemuDriverVersionAttr] = currentQemuVersion

return true, nil
}

Expand Down Expand Up @@ -190,6 +239,22 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
"-nographic",
}

var monitorPath string
if d.driverConfig.GracefulShutdown {
if runtime.GOOS == "windows" {
return nil, errors.New("QEMU graceful shutdown is unsupported on the Windows platform")
}
// This socket will be used to manage the virtual machine (for example,
// to perform graceful shutdowns)
monitorPath, err := d.getMonitorPath(ctx.TaskDir.Dir)
if err != nil {
d.logger.Printf("[ERR] driver.qemu: could not get qemu monitor path: %s", err)
return nil, err
}
d.logger.Printf("[DEBUG] driver.qemu: got monitor path OK: %s", monitorPath)
args = append(args, "-monitor", fmt.Sprintf("unix:%s,server,nowait", monitorPath))
}

// Add pass through arguments to qemu executable. A user can specify
// these arguments in driver task configuration. These arguments are
// passed directly to the qemu driver as command line options.
Expand Down Expand Up @@ -231,6 +296,9 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse

// If using KVM, add optimization args
if accelerator == "kvm" {
if runtime.GOOS == "windows" {
return nil, errors.New("KVM accelerator is unsupported on the Windows platform")
}
args = append(args,
"-enable-kvm",
"-cpu", "host",
Expand All @@ -239,7 +307,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
)
}

d.logger.Printf("[DEBUG] Starting QemuVM command: %q", strings.Join(args, " "))
d.logger.Printf("[DEBUG] driver.qemu: starting QemuVM command: %q", strings.Join(args, " "))
pluginLogFile := filepath.Join(ctx.TaskDir.Dir, "executor.out")
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
Expand Down Expand Up @@ -272,7 +340,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
pluginClient.Kill()
return nil, err
}
d.logger.Printf("[INFO] Started new QemuVM: %s", vmID)
d.logger.Printf("[INFO] driver.qemu: started new QemuVM: %s", vmID)

// Create and Return Handle
maxKill := d.DriverContext.config.MaxKillTimeout
Expand All @@ -282,6 +350,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
userPid: ps.Pid,
killTimeout: GetKillTimeout(task.KillTimeout, maxKill),
maxKillTimeout: maxKill,
monitorPath: monitorPath,
version: d.config.Version.VersionNumber(),
logger: d.logger,
doneCh: make(chan struct{}),
Expand All @@ -308,7 +377,7 @@ type qemuId struct {
func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
id := &qemuId{}
if err := json.Unmarshal([]byte(handleID), id); err != nil {
return nil, fmt.Errorf("Failed to parse handle '%s': %v", handleID, err)
return nil, fmt.Errorf("Failed to parse handle %q: %v", handleID, err)
}

pluginConfig := &plugin.ClientConfig{
Expand All @@ -317,9 +386,9 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro

exec, pluginClient, err := createExecutorWithConfig(pluginConfig, d.config.LogOutput)
if err != nil {
d.logger.Println("[ERR] driver.qemu: error connecting to plugin so destroying plugin pid and user pid")
d.logger.Printf("[ERR] driver.qemu: error connecting to plugin so destroying plugin pid %d and user pid %d", id.PluginConfig.Pid, id.UserPid)
if e := destroyPlugin(id.PluginConfig.Pid, id.UserPid); e != nil {
d.logger.Printf("[ERR] driver.qemu: error destroying plugin and userpid: %v", e)
d.logger.Printf("[ERR] driver.qemu: error destroying plugin pid %d and userpid %d: %v", id.PluginConfig.Pid, id.UserPid, e)
}
return nil, fmt.Errorf("error connecting to plugin: %v", err)
}
Expand Down Expand Up @@ -381,27 +450,43 @@ func (h *qemuHandle) Signal(s os.Signal) error {
return fmt.Errorf("Qemu driver can't send signals")
}

// TODO: allow a 'shutdown_command' that can be executed over a ssh connection
// to the VM
func (h *qemuHandle) Kill() error {
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
gracefulShutdownSent := false
// Attempt a graceful shutdown only if it was configured in the job
if h.monitorPath != "" {
if err := sendQemuShutdown(h.logger, h.monitorPath, h.userPid); err == nil {
gracefulShutdownSent = true
} else {
h.logger.Printf("[DEBUG] driver.qemu: error sending graceful shutdown for user process pid %d: %s", h.userPid, err)
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}

// If Nomad did not send a graceful shutdown signal, issue an interrupt to
// the qemu process as a last resort
if gracefulShutdownSent == false {
if err := h.executor.ShutDown(); err != nil {
if h.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}
}

// If the qemu process exits before the kill timeout is reached, doneChan
// will close and we'll exit without an error. If it takes too long, the
// timer will fire and we'll attempt to kill the process.
select {
case <-h.doneCh:
return nil
case <-time.After(h.killTimeout):
h.logger.Printf("[DEBUG] driver.qemu: kill timeout of %s exceeded for user process pid %d", h.killTimeout.String(), h.userPid)

if h.pluginClient.Exited() {
return nil
}
if err := h.executor.Exit(); err != nil {
return fmt.Errorf("executor Exit failed: %v", err)
}

return nil
}
}
Expand All @@ -414,7 +499,7 @@ func (h *qemuHandle) run() {
ps, werr := h.executor.Wait()
if ps.ExitCode == 0 && werr != nil {
if e := killProcess(h.userPid); e != nil {
h.logger.Printf("[ERR] driver.qemu: error killing user process: %v", e)
h.logger.Printf("[ERR] driver.qemu: error killing user process pid %d: %v", h.userPid, e)
}
}
close(h.doneCh)
Expand All @@ -427,3 +512,23 @@ func (h *qemuHandle) run() {
h.waitCh <- &dstructs.WaitResult{ExitCode: ps.ExitCode, Signal: ps.Signal, Err: werr}
close(h.waitCh)
}

// sendQemuShutdown attempts to issue an ACPI power-off command via the qemu
// monitor
func sendQemuShutdown(logger *log.Logger, monitorPath string, userPid int) error {
if monitorPath == "" {
return errors.New("monitorPath not set")
}
monitorSocket, err := net.Dial("unix", monitorPath)
if err != nil {
logger.Printf("[WARN] driver.qemu: could not connect to qemu monitor %q for user process pid %d: %s", monitorPath, userPid, err)
return err
}
defer monitorSocket.Close()
logger.Printf("[DEBUG] driver.qemu: sending graceful shutdown command to qemu monitor socket %q for user process pid %d", monitorPath, userPid)
_, err = monitorSocket.Write([]byte(qemuGracefulShutdownMsg))
if err != nil {
logger.Printf("[WARN] driver.qemu: failed to send shutdown message %q to monitor socket %q for user process pid %d: %s", qemuGracefulShutdownMsg, monitorPath, userPid, err)
}
return err
}
Loading