Skip to content

Commit

Permalink
Merge pull request #7015 from hashicorp/b-allow-monitor-by-server-id
Browse files Browse the repository at this point in the history
Allow nomad monitor command to lookup server UUID
  • Loading branch information
drewbailey committed Jan 29, 2020
2 parents 8b6a8c0 + 2dbcad3 commit 07df966
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 81 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ IMPROVEMENTS:
BUG FIXES:

* agent: Fixed race condition in logging when using `nomad monitor` command [[GH-6872](https://github.com/hashicorp/nomad/issues/6872)]
* agent: Fixed a bug where `nomad monitor -server-id` only work for a server's name instead of uuid or name. [[GH-7015](https://github.com/hashicorp/nomad/issues/7015)]
* cli: Fixed a bug where `nomad monitor -node-id` would cause a cli panic when no nodes where found. [[GH-6828](https://github.com/hashicorp/nomad/issues/6828)]
* config: Fixed a bug where agent startup would fail if the `consul.timeout` configuration was set. [[GH-6907](https://github.com/hashicorp/nomad/issues/6907)]
* consul: Fixed a bug where script-based health checks would fail if the service configuration included interpolation. [[GH-6916](https://github.com/hashicorp/nomad/issues/6916)]
* consul: Fixed a bug where script-based health checks would fail if the service configuration included interpolation. [[GH-6916](https://github.com/hashicorp/nomad/issues/6916)]
* consul/connect: Fixed a bug where Connect-enabled jobs failed to validate when service names used interpolation. [[GH-6855](https://github.com/hashicorp/nomad/issues/6855)]
* scheduler: Fixed a bug that caused evicted allocs on a lost node to be stuck in running. [[GH-6902](https://github.com/hashicorp/nomad/issues/6902)]
* scheduler: Fixed a bug where `nomad job plan/apply` returned errors instead of a partial placement warning for ineligible nodes. [[GH-6968](https://github.com/hashicorp/nomad/issues/6968)]
Expand Down
2 changes: 1 addition & 1 deletion api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestAgentCPUProfile(t *testing.T) {
}
resp, err := agent.CPUProfile(opts, q)
require.Error(t, err)
require.Contains(t, err.Error(), "500 (unknown nomad server unknown.global)")
require.Contains(t, err.Error(), "500 (unknown Nomad server unknown.global)")
require.Nil(t, resp)
}

Expand Down
4 changes: 2 additions & 2 deletions e2e/connect/input/multi-service.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ job "multi-service" {

config {
image = "hashicorp/http-echo"
args = ["-listen=:9001", "-text=echo1"]
args = ["-listen=:9001", "-text=echo1"]
}
}

Expand All @@ -43,7 +43,7 @@ job "multi-service" {

config {
image = "hashicorp/http-echo"
args = ["-listen=:9002", "-text=echo2"]
args = ["-listen=:9002", "-text=echo2"]
}
}
}
Expand Down
84 changes: 25 additions & 59 deletions nomad/client_agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,30 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
return
}

currentServer := a.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
forwardServer = true
region := args.RequestRegion()
if region == "" {
handleStreamResultError(fmt.Errorf("missing target RPC"), helper.Int64ToPtr(400), encoder)
return
}

// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !a.srv.IsLeader() {
forwardServer = true
if region != a.srv.config.Region {
// Mark that we are forwarding
args.SetForwarded()
}

if forwardServer {
a.forwardMonitorServer(conn, args, encoder, decoder)
return
// Try to forward request to remote region/server
if args.ServerID != "" {
serverToFwd, err := a.forwardFor(args.ServerID, region)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if serverToFwd != nil {
a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder)
return
}
}

// NodeID was empty, so monitor this current server
// NodeID was empty, ServerID was equal to this server, monitor this server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -273,6 +279,7 @@ OUTER:
// serverID and region so the request should not be forwarded.
func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
var target *serverParts
var err error

if serverID == "leader" {
isLeader, remoteLeader := a.srv.getLeader()
Expand All @@ -285,19 +292,9 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
return nil, nil
}
} else {
members := a.srv.Members()
for _, mem := range members {
if mem.Name == serverID || mem.Tags["id"] == serverID {
if ok, srv := isNomadServer(mem); ok {
if srv.Region != region {
return nil,
fmt.Errorf(
"Requested server:%s region:%s does not exist in requested region: %s",
serverID, srv.Region, region)
}
target = srv
}
}
target, err = a.srv.getServer(region, serverID)
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -384,42 +381,11 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
return
}

func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
var target *serverParts
serverID := args.ServerID

func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
// empty ServerID to prevent forwarding loop
args.ServerID = ""

if serverID == "leader" {
isLeader, remoteServer := a.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
if !isLeader && remoteServer == nil {
handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder)
return
}
} else {
// See if the server ID is a known member
serfMembers := a.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
target = srv
}
}
}
}

// Unable to find a server
if target == nil {
err := fmt.Errorf("unknown nomad server %s", serverID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor")
serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
Expand Down
87 changes: 69 additions & 18 deletions nomad/client_agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ OUTER:

func TestMonitor_Monitor_RemoteServer(t *testing.T) {
t.Parallel()
foreignRegion := "foo"

// start servers
s1, cleanupS1 := TestServer(t, nil)
Expand All @@ -130,9 +131,17 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
c.DevDisableBootstrap = true
})
defer cleanupS2()
TestJoin(t, s1, s2)

s3, cleanupS3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Region = foreignRegion
})
defer cleanupS3()

TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForLeader(t, s3.RPC)

// determine leader and nonleader
servers := []*Server{s1, s2}
Expand All @@ -152,34 +161,65 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
expectedLog string
logger hclog.InterceptLogger
origin *Server
region string
expectedErr string
}{
{
desc: "remote leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: nonLeader,
region: "global",
},
{
desc: "remote server",
desc: "remote server, server name",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
region: "global",
},
{
desc: "remote server, server UUID",
serverID: nonLeader.serf.LocalMember().Tags["id"],
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
region: "global",
},
{
desc: "serverID is current leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: leader,
region: "global",
},
{
desc: "serverID is current server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "non leader log",
logger: nonLeader.logger,
origin: nonLeader,
region: "global",
},
{
desc: "remote server, different region",
serverID: s3.serf.LocalMember().Name,
expectedLog: "remote region logger",
logger: s3.logger,
origin: nonLeader,
region: foreignRegion,
},
{
desc: "different region, region mismatch",
serverID: s3.serf.LocalMember().Name,
expectedLog: "remote region logger",
logger: s3.logger,
origin: nonLeader,
region: "bar",
expectedErr: "No path to region",
},
}

Expand All @@ -204,6 +244,9 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
req := cstructs.MonitorRequest{
LogLevel: "warn",
ServerID: tc.serverID,
QueryOptions: structs.QueryOptions{
Region: tc.region,
},
}

handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor")
Expand Down Expand Up @@ -246,23 +289,28 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
require.Fail("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
require.Fail(err.Error())
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}

var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
if tc.expectedErr != "" {
require.Contains(msg.Error.Error(), tc.expectedErr)
break OUTER
} else {
require.Failf("Got error: %v", msg.Error.Error())
}
} else {
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
}
}
}
}
Expand All @@ -282,6 +330,9 @@ func TestMonitor_MonitorServer(t *testing.T) {
// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
QueryOptions: structs.QueryOptions{
Region: "global",
},
}

handler, err := s.StreamingRpcHandler("Agent.Monitor")
Expand Down Expand Up @@ -541,7 +592,7 @@ func TestAgentProfile_RemoteRegionMisMatch(t *testing.T) {
reply := structs.AgentPprofResponse{}

err := s1.RPC("Agent.Profile", &req, &reply)
require.Contains(err.Error(), "does not exist in requested region")
require.Contains(err.Error(), "unknown Nomad server")
require.Nil(reply.Payload)
}

Expand Down Expand Up @@ -656,7 +707,7 @@ func TestAgentProfile_Server(t *testing.T) {
serverID: uuid.Generate(),
origin: nonLeader,
reqType: pprof.CmdReq,
expectedErr: "unknown nomad server",
expectedErr: "unknown Nomad server",
expectedAgentID: "",
},
}
Expand Down
21 changes: 21 additions & 0 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,27 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
}

func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// Bail if we can't find any servers
r.peerLock.RLock()
defer r.peerLock.RUnlock()

servers := r.peers[region]
if len(servers) == 0 {
r.logger.Warn("no path found to region", "region", region)
return nil, structs.ErrNoRegionPath
}

// Lookup server by id or name
for _, server := range servers {
if server.Name == serverID || server.ID == serverID {
return server, nil
}
}

return nil, fmt.Errorf("unknown Nomad server %s", serverID)
}

// streamingRpc creates a connection to the given server and conducts the
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
Expand Down
26 changes: 26 additions & 0 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,32 @@ func TestRPC_forwardRegion(t *testing.T) {
}
}

func TestRPC_getServer(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.Region = "global"
})
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

// Lookup by name
srv, err := s1.getServer("global", s2.serf.LocalMember().Name)
require.NoError(t, err)

require.Equal(t, srv.Name, s2.serf.LocalMember().Name)

// Lookup by id
srv, err = s2.getServer("global", s1.serf.LocalMember().Tags["id"])
require.NoError(t, err)

require.Equal(t, srv.Name, s1.serf.LocalMember().Name)
}

func TestRPC_PlaintextRPCSucceedsWhenInUpgradeMode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
Expand Down

0 comments on commit 07df966

Please sign in to comment.