Skip to content

Commit

Permalink
drivers: Add/Use go-getter to fetch remote binaries
Browse files Browse the repository at this point in the history
Updates Qemu, Java drivers to use go-getter to fetch binaries
Adds remote artifact support for Exec, Raw Exec drivers
  • Loading branch information
catsby committed Oct 22, 2015
1 parent 1e9416a commit f422ef0
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 70 deletions.
39 changes: 38 additions & 1 deletion client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package driver

import (
"fmt"
"log"
"path"
"path/filepath"
"runtime"
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -41,12 +46,44 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
}

func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the command
// Get the command to be ran, or if omitted, download an artifact for
// execution. Currently a supplied command takes precedence, and an artifact
// is only downloaded if no command is supplied
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for exec driver")
}

source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
// We use go-getter to support a variety of protocols, but need to change
// file permissions of the resulted download to be executable

// Create a location to download the artifact.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("[Err] driver.Exec: Could not find task directory for task: %v", d.DriverContext.taskName)
}
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

artifactName := path.Base(source)
artifactFile := filepath.Join(destDir, artifactName)
if err := getter.GetFile(artifactFile, source); err != nil {
return nil, fmt.Errorf("[Err] driver.Exec: Error downloading source for Exec driver: %s", err)
}

// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
if err := syscall.Chmod(artifactFile, 0655); err != nil {
log.Printf("[Err] driver.Exec: Error making artifact executable: %s", err)
}
}

// re-assign the command to be the local execution path
command = filepath.Join(allocdir.TaskLocal, command)
}

// Get the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)

Expand Down
50 changes: 50 additions & 0 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -120,6 +121,55 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}

func TestExecDriver_Start_Artifact_Wait(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}

task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": file,
},
Resources: basicResources,
}

driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}

func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
ctestutils.ExecCompatible(t)

Expand Down
39 changes: 8 additions & 31 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package driver
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
Expand All @@ -14,6 +11,7 @@ import (
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
Expand Down Expand Up @@ -97,37 +95,18 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("missing jar source for Java Jar driver")
}

// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Create a location to download the binary.
fName := path.Base(source)
fPath := filepath.Join(taskLocal, fName)
f, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}

defer f.Close()
defer resp.Body.Close()
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

// Copy remote file to local directory for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(f, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying jar from source: %s", ioErr)
// Create a location to download the binary.
jarName := path.Base(source)
jarPath := filepath.Join(destDir, jarName)
if err := getter.GetFile(jarPath, source); err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// Get the environment variables.
Expand All @@ -141,10 +120,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
args = append(args, jvm_options)
}

// Build the argument list
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, fName))

// Build the argument list.
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName))
if argRaw, ok := task.Config["args"]; ok {
args = append(args, argRaw)
}
Expand Down
38 changes: 9 additions & 29 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand All @@ -19,6 +18,7 @@ import (
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -94,45 +94,25 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Missing required Task Resource: Memory")
}

// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Qemu driver: %s", err)
}

// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Create a location in the local directory to download and store the image.
// TODO: Caching
// Create a location to download the binary.
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
vmID := fmt.Sprintf("qemu-vm-%s-%s", structs.GenerateUUID(), filepath.Base(source))
fPath := filepath.Join(taskLocal, vmID)
vmPath, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}

defer vmPath.Close()
defer resp.Body.Close()

// Copy remote file to local AllocDir for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(vmPath, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying Qemu image from source: %s", ioErr)
vmPath := filepath.Join(destDir, vmID)
if err := getter.GetFile(vmPath, source); err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// compute and check checksum
if check, ok := task.Config["checksum"]; ok {
d.logger.Printf("[DEBUG] Running checksum on (%s)", vmID)
hasher := sha256.New()
file, err := os.Open(vmPath.Name())
file, err := os.Open(vmPath)
if err != nil {
return nil, fmt.Errorf("Failed to open file for checksum")
}
Expand Down Expand Up @@ -163,7 +143,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
"-machine", "type=pc,accel=" + accelerator,
"-name", vmID,
"-m", mem,
"-drive", "file=" + vmPath.Name(),
"-drive", "file=" + vmPath,
"-nodefconfig",
"-nodefaults",
"-nographic",
Expand Down Expand Up @@ -240,7 +220,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Create and Return Handle
h := &qemuHandle{
proc: cmd.Process,
vmID: vmPath.Name(),
vmID: vmPath,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
}
Expand Down
44 changes: 38 additions & 6 deletions client/driver/raw_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package driver

import (
"fmt"
"log"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/args"
Expand Down Expand Up @@ -61,12 +65,6 @@ func (d *RawExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (boo
}

func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the command
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for raw_exec driver")
}

// Get the tasks local directory.
taskName := d.DriverContext.taskName
taskDir, ok := ctx.AllocDir.TaskDirs[taskName]
Expand All @@ -75,6 +73,40 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Get the command to be ran, or if omitted, download an artifact for
// execution. Currently a supplied command takes precedence, and an artifact
// is only downloaded if no command is supplied
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for exec driver")
}

source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
// We use go-getter to support a variety of protocols, but need to change
// file permissions of the resulted download to be executable

// Create a location to download the artifact.
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

artifactName := path.Base(source)
artifactFile := filepath.Join(destDir, artifactName)
if err := getter.GetFile(artifactFile, source); err != nil {
return nil, fmt.Errorf("[Err] driver.Exec: Error downloading source for Exec driver: %s", err)
}

// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
if err := syscall.Chmod(artifactFile, 0655); err != nil {
log.Printf("[Err] driver.Exec: Error making artifact executable: %s", err)
}
}

// re-assign the command to be the local execution path
command = filepath.Join(allocdir.TaskLocal, command)
}

// Get the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)

Expand Down
Loading

0 comments on commit f422ef0

Please sign in to comment.