Skip to content

Commit

Permalink
Allow nomad monitor command to lookup server UUID
Browse files Browse the repository at this point in the history
Allows addressing servers with nomad monitor using the servers name or
ID.

Also unifies logic for addressing servers for client_agent_endpoint
commands and makes addressing logic region aware.
  • Loading branch information
drewbailey committed Jan 29, 2020
1 parent b789b50 commit 672b6a0
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 75 deletions.
84 changes: 25 additions & 59 deletions nomad/client_agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,30 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
return
}

currentServer := a.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
forwardServer = true
region := args.RequestRegion()
if region == "" {
handleStreamResultError(fmt.Errorf("missing target RPC"), helper.Int64ToPtr(400), encoder)
return
}

// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !a.srv.IsLeader() {
forwardServer = true
if region != a.srv.config.Region {
// Mark that we are forwarding
args.SetForwarded()
}

if forwardServer {
a.forwardMonitorServer(conn, args, encoder, decoder)
return
// Handle serverID not equal to ours including region forwarding
if args.ServerID != "" {
serverToFwd, err := a.forwardFor(args.ServerID, region)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if serverToFwd != nil {
a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder)
return
}
}

// NodeID was empty, so monitor this current server
// NodeID was empty, ServerID was equal to this server, monitor this server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -273,6 +279,7 @@ OUTER:
// serverID and region so the request should not be forwarded.
func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
var target *serverParts
var err error

if serverID == "leader" {
isLeader, remoteLeader := a.srv.getLeader()
Expand All @@ -285,19 +292,9 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
return nil, nil
}
} else {
members := a.srv.Members()
for _, mem := range members {
if mem.Name == serverID || mem.Tags["id"] == serverID {
if ok, srv := isNomadServer(mem); ok {
if srv.Region != region {
return nil,
fmt.Errorf(
"Requested server:%s region:%s does not exist in requested region: %s",
serverID, srv.Region, region)
}
target = srv
}
}
target, err = a.srv.getServer(region, serverID)
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -384,42 +381,11 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
return
}

func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
var target *serverParts
serverID := args.ServerID

func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
// empty ServerID to prevent forwarding loop
args.ServerID = ""

if serverID == "leader" {
isLeader, remoteServer := a.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
if !isLeader && remoteServer == nil {
handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder)
return
}
} else {
// See if the server ID is a known member
serfMembers := a.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
target = srv
}
}
}
}

// Unable to find a server
if target == nil {
err := fmt.Errorf("unknown nomad server %s", serverID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor")
serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
Expand Down
81 changes: 65 additions & 16 deletions nomad/client_agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ OUTER:

func TestMonitor_Monitor_RemoteServer(t *testing.T) {
t.Parallel()
foreignRegion := "foo"

// start servers
s1, cleanupS1 := TestServer(t, nil)
Expand All @@ -130,9 +131,17 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
c.DevDisableBootstrap = true
})
defer cleanupS2()
TestJoin(t, s1, s2)

s3, cleanupS3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Region = foreignRegion
})
defer cleanupS3()

TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForLeader(t, s3.RPC)

// determine leader and nonleader
servers := []*Server{s1, s2}
Expand All @@ -152,34 +161,65 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
expectedLog string
logger hclog.InterceptLogger
origin *Server
region string
expectedErr string
}{
{
desc: "remote leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: nonLeader,
region: "global",
},
{
desc: "remote server",
desc: "remote server, server name",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
region: "global",
},
{
desc: "remote server, server UUID",
serverID: nonLeader.serf.LocalMember().Tags["id"],
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
region: "global",
},
{
desc: "serverID is current leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: leader,
region: "global",
},
{
desc: "serverID is current server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "non leader log",
logger: nonLeader.logger,
origin: nonLeader,
region: "global",
},
{
desc: "remote server, different region",
serverID: s3.serf.LocalMember().Name,
expectedLog: "remote region logger",
logger: s3.logger,
origin: nonLeader,
region: foreignRegion,
},
{
desc: "different region, region mismatch",
serverID: s3.serf.LocalMember().Name,
expectedLog: "remote region logger",
logger: s3.logger,
origin: nonLeader,
region: "bar",
expectedErr: "No path to region",
},
}

Expand All @@ -204,6 +244,9 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
req := cstructs.MonitorRequest{
LogLevel: "warn",
ServerID: tc.serverID,
QueryOptions: structs.QueryOptions{
Region: tc.region,
},
}

handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor")
Expand Down Expand Up @@ -246,23 +289,29 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
require.Fail("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
require.Fail(err.Error())
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}

var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
if tc.expectedErr != "" {
require.Contains(msg.Error.Error(), tc.expectedErr)
// require.True(strings.Contains(msg.Error.Error(), tc.expectedErr))
break OUTER
} else {
require.Failf("Got error: %v", msg.Error.Error())
}
} else {
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,26 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
}

func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// Bail if we can't find any servers
r.peerLock.RLock()
servers := r.peers[region]
if len(servers) == 0 {
r.peerLock.RUnlock()
r.logger.Warn("no path found to region", "region", region)
return nil, structs.ErrNoRegionPath
}

// Lookup server by id or namedoes not exist in requested regiont
for _, server := range servers {
if server.Name == serverID || server.ID == serverID {
return server, nil
}
}

return nil, fmt.Errorf("unknown nomad server %s", serverID)
}

// streamingRpc creates a connection to the given server and conducts the
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
Expand Down

0 comments on commit 672b6a0

Please sign in to comment.