Skip to content

Commit

Permalink
Merge pull request #3892 from hashicorp/f-tunnel
Browse files Browse the repository at this point in the history
Client RPC Endpoints, Server Routing and Streaming RPCs
  • Loading branch information
dadgar committed Feb 21, 2018
2 parents ff7d3c3 + 42a920a commit 6a96b82
Show file tree
Hide file tree
Showing 104 changed files with 12,973 additions and 3,911 deletions.
8 changes: 2 additions & 6 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,9 @@ func (a *Allocations) Info(allocID string, q *QueryOptions) (*Allocation, *Query
}

func (a *Allocations) Stats(alloc *Allocation, q *QueryOptions) (*AllocResourceUsage, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, err
}

var resp AllocResourceUsage
_, err = nodeClient.query("/v1/client/allocation/"+alloc.ID+"/stats", &resp, nil)
path := fmt.Sprintf("/v1/client/allocation/%s/stats", alloc.ID)
_, err := a.client.query(path, &resp, q)
return &resp, err
}

Expand Down
54 changes: 52 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -18,6 +19,13 @@ import (
rootcerts "github.com/hashicorp/go-rootcerts"
)

var (
// ClientConnTimeout is the timeout applied when attempting to contact a
// client directly before switching to a connection through the Nomad
// server.
ClientConnTimeout = 1 * time.Second
)

// QueryOptions are used to parameterize a query
type QueryOptions struct {
// Providing a datacenter overwrites the region provided
Expand Down Expand Up @@ -145,6 +153,8 @@ func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config {
WaitTime: c.WaitTime,
TLSConfig: c.TLSConfig.Copy(),
}

// Update the tls server name for connecting to a client
if tlsEnabled && config.TLSConfig != nil {
config.TLSConfig.TLSServerName = fmt.Sprintf("client.%s.nomad", region)
}
Expand Down Expand Up @@ -249,6 +259,34 @@ func DefaultConfig() *Config {
return config
}

// SetTimeout is used to place a timeout for connecting to Nomad. A negative
// duration is ignored, a duration of zero means no timeout, and any other value
// will add a timeout.
func (c *Config) SetTimeout(t time.Duration) error {
if c == nil {
return fmt.Errorf("nil config")
} else if c.httpClient == nil {
return fmt.Errorf("nil HTTP client")
} else if c.httpClient.Transport == nil {
return fmt.Errorf("nil HTTP client transport")
}

// Apply a timeout.
if t.Nanoseconds() >= 0 {
transport, ok := c.httpClient.Transport.(*http.Transport)
if !ok {
return fmt.Errorf("unexpected HTTP transport: %T", c.httpClient.Transport)
}

transport.DialContext = (&net.Dialer{
Timeout: t,
KeepAlive: 30 * time.Second,
}).DialContext
}

return nil
}

// ConfigureTLS applies a set of TLS configurations to the the HTTP client.
func (c *Config) ConfigureTLS() error {
if c.TLSConfig == nil {
Expand Down Expand Up @@ -343,7 +381,15 @@ func (c *Client) SetNamespace(namespace string) {
// GetNodeClient returns a new Client that will dial the specified node. If the
// QueryOptions is set, its region will be used.
func (c *Client) GetNodeClient(nodeID string, q *QueryOptions) (*Client, error) {
return c.getNodeClientImpl(nodeID, q, c.Nodes().Info)
return c.getNodeClientImpl(nodeID, -1, q, c.Nodes().Info)
}

// GetNodeClientWithTimeout returns a new Client that will dial the specified
// node using the specified timeout. If the QueryOptions is set, its region will
// be used.
func (c *Client) GetNodeClientWithTimeout(
nodeID string, timeout time.Duration, q *QueryOptions) (*Client, error) {
return c.getNodeClientImpl(nodeID, timeout, q, c.Nodes().Info)
}

// nodeLookup is the definition of a function used to lookup a node. This is
Expand All @@ -353,7 +399,7 @@ type nodeLookup func(nodeID string, q *QueryOptions) (*Node, *QueryMeta, error)
// getNodeClientImpl is the implementation of creating a API client for
// contacting a node. It takes a function to lookup the node such that it can be
// mocked during tests.
func (c *Client) getNodeClientImpl(nodeID string, q *QueryOptions, lookup nodeLookup) (*Client, error) {
func (c *Client) getNodeClientImpl(nodeID string, timeout time.Duration, q *QueryOptions, lookup nodeLookup) (*Client, error) {
node, _, err := lookup(nodeID, q)
if err != nil {
return nil, err
Expand All @@ -380,6 +426,10 @@ func (c *Client) getNodeClientImpl(nodeID string, q *QueryOptions, lookup nodeLo

// Get an API client for the node
conf := c.config.ClientConfig(region, node.HTTPAddr, node.TLSEnabled)

// Set the timeout
conf.SetTimeout(timeout)

return NewClient(conf)
}

Expand Down
2 changes: 1 addition & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestClient_NodeClient(t *testing.T) {
name := fmt.Sprintf("%s__%s__%s", c.ExpectedAddr, c.ExpectedRegion, c.ExpectedTLSServerName)
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
nodeClient, err := c.Client.getNodeClientImpl("testID", c.QueryOptions, c.Node)
nodeClient, err := c.Client.getNodeClientImpl("testID", -1, c.QueryOptions, c.Node)
assert.Nil(err)
assert.Equal(c.ExpectedRegion, nodeClient.config.Region)
assert.Equal(c.ExpectedAddr, nodeClient.config.Address)
Expand Down
89 changes: 61 additions & 28 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -51,22 +52,16 @@ func (c *Client) AllocFS() *AllocFS {

// List is used to list the files at a given path of an allocation directory
func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["path"] = path

var resp []*AllocFileInfo
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
if err != nil {
return nil, nil, err
}
Expand All @@ -76,11 +71,6 @@ func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*Allo

// Stat is used to stat a file at a given path of an allocation directory
func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if q == nil {
q = &QueryOptions{}
}
Expand All @@ -91,7 +81,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
q.Params["path"] = path

var resp AllocFileInfo
qm, err := nodeClient.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
if err != nil {
return nil, nil, err
}
Expand All @@ -101,7 +91,7 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}
Expand All @@ -117,17 +107,28 @@ func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
return nil, err
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
}

// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}
Expand All @@ -140,11 +141,21 @@ func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadC
}

q.Params["path"] = path

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
return nil, err
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}

return r, nil
}

Expand All @@ -160,7 +171,7 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
Expand All @@ -177,10 +188,21 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
Expand Down Expand Up @@ -236,7 +258,7 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, errCh
Expand All @@ -255,10 +277,21 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)

r, err := nodeClient.rawQuery(fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), q)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}

// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}

// Create the output channel
Expand Down
22 changes: 12 additions & 10 deletions api/nodes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"fmt"
"sort"
"strconv"
)
Expand Down Expand Up @@ -72,25 +73,26 @@ func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMet
}

func (n *Nodes) Stats(nodeID string, q *QueryOptions) (*HostStats, error) {
nodeClient, err := n.client.GetNodeClient(nodeID, q)
if err != nil {
return nil, err
}
var resp HostStats
if _, err := nodeClient.query("/v1/client/stats", &resp, nil); err != nil {
path := fmt.Sprintf("/v1/client/stats?node_id=%s", nodeID)
if _, err := n.client.query(path, &resp, q); err != nil {
return nil, err
}
return &resp, nil
}

func (n *Nodes) GC(nodeID string, q *QueryOptions) error {
nodeClient, err := n.client.GetNodeClient(nodeID, q)
if err != nil {
return err
}
var resp struct{}
path := fmt.Sprintf("/v1/client/gc?node_id=%s", nodeID)
_, err := n.client.query(path, &resp, q)
return err
}

// TODO Add tests
func (n *Nodes) GcAlloc(allocID string, q *QueryOptions) error {
var resp struct{}
_, err = nodeClient.query("/v1/client/gc", &resp, nil)
path := fmt.Sprintf("/v1/client/allocation/%s/gc", allocID)
_, err := n.client.query(path, &resp, q)
return err
}

Expand Down
27 changes: 27 additions & 0 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"testing"
"time"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestNodes_List(t *testing.T) {
Expand Down Expand Up @@ -275,3 +278,27 @@ func TestNodes_Sort(t *testing.T) {
t.Fatalf("\n\n%#v\n\n%#v", nodes, expect)
}
}

func TestNodes_GC(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

err := nodes.GC(uuid.Generate(), nil)
require.NotNil(err)
require.True(structs.IsErrUnknownNode(err))
}

func TestNodes_GcAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
c, s := makeClient(t, nil, nil)
defer s.Stop()
nodes := c.Nodes()

err := nodes.GcAlloc(uuid.Generate(), nil)
require.NotNil(err)
require.True(structs.IsErrUnknownAllocation(err))
}
Loading

0 comments on commit 6a96b82

Please sign in to comment.