Skip to content

Commit

Permalink
add nomad streaming exec core data structures and interfaces
Browse files Browse the repository at this point in the history
In this commit, we add two driver interfaces for supporting `nomad exec`
invocation:

* A high level `ExecTaskStreamingDriver`, that operates on io reader/writers.
  Drivers should prefer using this interface
* A low level `ExecTaskStreamingRawDriver` that operates on the raw stream of
  input structs; useful when a driver delegates handling to driver backend (e.g.
  across RPC/grpc).

The interfaces are optional for a driver, as `nomad exec` support is opt-in.
Existing drivers continue to compile without exec support, until their
maintainer add such support.

Furthermore, we create protobuf structures to represent exec stream entities:
`ExecTaskStreamingRequest` and `ExecTaskStreamingResponse`.  We aim to reuse the
protobuf generated code as much as possible, without translation to avoid
conversion overhead.

`ExecTaskStream` abstract fetching and sending stream entities.  It's influenced
by the grpc bi-directional stream interface, to avoid needing any adapter.  I
considered using channels, but the asynchronisity and concurrency makes buffer
reuse too complicated, which would put more pressure on GC and slows exec operation.
  • Loading branch information
Mahmood Ali committed Apr 30, 2019
1 parent 4f7bd68 commit 6d711d0
Show file tree
Hide file tree
Showing 3 changed files with 620 additions and 246 deletions.
65 changes: 65 additions & 0 deletions plugins/drivers/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers/proto"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/zclconf/go-cty/cty"
Expand Down Expand Up @@ -56,6 +57,28 @@ type DriverPlugin interface {
ExecTask(taskID string, cmd []string, timeout time.Duration) (*ExecTaskResult, error)
}

// ExecTaskStreamingDriver marks that a driver supports streaming exec task. This represents a user friendly
// interface to implement, as an alternative to the ExecTaskStreamingRawDriver, the low level interface.
type ExecTaskStreamingDriver interface {
ExecTaskStreaming(ctx context.Context, taskID string, execOptions *ExecOptions) (*ExitResult, error)
}

type ExecOptions struct {
// Command is command to run
Command []string

// Tty indicates whether pseudo-terminal is to be allocated
Tty bool

// streams
Stdin io.ReadCloser
Stdout io.WriteCloser
Stderr io.WriteCloser

// terminal size channel
ResizeCh <-chan TerminalSize
}

// InternalDriverPlugin is an interface that exposes functions that are only
// implemented by internal driver plugins.
type InternalDriverPlugin interface {
Expand Down Expand Up @@ -127,6 +150,11 @@ type Capabilities struct {
FSIsolation FSIsolation
}

type TerminalSize struct {
Height int
Width int
}

type TaskConfig struct {
ID string
JobName string
Expand Down Expand Up @@ -406,3 +434,40 @@ func (d *DriverNetwork) Hash() []byte {
}
return h.Sum(nil)
}

//// helper types for operating on raw exec operation
// we alias proto instances as much as possible to avoid conversion overhead

// ExecTaskStreamingRawDriver represents a low-level interface for executing a streaming exec
// call, and is intended to be used when driver instance is to delegate exec handling to another
// backend, e.g. to a executor or a driver behind a grpc/rpc protocol
//
// Nomad client would prefer this interface method over `ExecTaskStreaming` if driver implements it.
type ExecTaskStreamingRawDriver interface {
ExecTaskStreamingRaw(
ctx context.Context,
taskID string,
command []string,
tty bool,
stream ExecTaskStream) error
}

// ExecTaskStream represents a stream of exec streaming messages,
// and is a handle to get stdin and tty size and send back
// stdout/stderr and exit operations.
//
// The methods are not concurrent safe; callers must ensure that methods are called
// from at most one goroutine.
type ExecTaskStream interface {
// Send relays response message back to API.
//
// The call is synchronous and no references to message is held: once
// method call completes, the message reference can be reused or freed.
Send(*ExecTaskStreamingResponseMsg) error

// Receive exec streaming messages from API. Returns `io.EOF` on completion of stream.
Recv() (*ExecTaskStreamingRequestMsg, error)
}

type ExecTaskStreamingRequestMsg = proto.ExecTaskStreamingRequest
type ExecTaskStreamingResponseMsg = proto.ExecTaskStreamingResponse
Loading

0 comments on commit 6d711d0

Please sign in to comment.