From 672b6a02ea0a4b60e366f1b8b5cc88f6c504eb53 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Wed, 29 Jan 2020 11:22:43 -0500 Subject: [PATCH] Allow nomad monitor command to lookup server UUID Allows addressing servers with nomad monitor using the servers name or ID. Also unifies logic for addressing servers for client_agent_endpoint commands and makes addressing logic region aware. --- nomad/client_agent_endpoint.go | 84 +++++++++-------------------- nomad/client_agent_endpoint_test.go | 81 ++++++++++++++++++++++------ nomad/rpc.go | 20 +++++++ 3 files changed, 110 insertions(+), 75 deletions(-) diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 4d56d14aec5e..3f6dd4cd13f4 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -149,24 +149,30 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { return } - currentServer := a.srv.serf.LocalMember().Name - var forwardServer bool - // Targeting a remote server which is not the leader and not this server - if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { - forwardServer = true + region := args.RequestRegion() + if region == "" { + handleStreamResultError(fmt.Errorf("missing target RPC"), helper.Int64ToPtr(400), encoder) + return } - - // Targeting leader and this server is not current leader - if args.ServerID == "leader" && !a.srv.IsLeader() { - forwardServer = true + if region != a.srv.config.Region { + // Mark that we are forwarding + args.SetForwarded() } - if forwardServer { - a.forwardMonitorServer(conn, args, encoder, decoder) - return + // Handle serverID not equal to ours including region forwarding + if args.ServerID != "" { + serverToFwd, err := a.forwardFor(args.ServerID, region) + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + if serverToFwd != nil { + a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder) + return + } } - // NodeID was empty, so monitor this current server + // NodeID was empty, ServerID was equal to this server, monitor this server ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -273,6 +279,7 @@ OUTER: // serverID and region so the request should not be forwarded. func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) { var target *serverParts + var err error if serverID == "leader" { isLeader, remoteLeader := a.srv.getLeader() @@ -285,19 +292,9 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) { return nil, nil } } else { - members := a.srv.Members() - for _, mem := range members { - if mem.Name == serverID || mem.Tags["id"] == serverID { - if ok, srv := isNomadServer(mem); ok { - if srv.Region != region { - return nil, - fmt.Errorf( - "Requested server:%s region:%s does not exist in requested region: %s", - serverID, srv.Region, region) - } - target = srv - } - } + target, err = a.srv.getServer(region, serverID) + if err != nil { + return nil, err } } @@ -384,42 +381,11 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni return } -func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { - var target *serverParts - serverID := args.ServerID - +func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { // empty ServerID to prevent forwarding loop args.ServerID = "" - if serverID == "leader" { - isLeader, remoteServer := a.srv.getLeader() - if !isLeader && remoteServer != nil { - target = remoteServer - } - if !isLeader && remoteServer == nil { - handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder) - return - } - } else { - // See if the server ID is a known member - serfMembers := a.srv.Members() - for _, mem := range serfMembers { - if mem.Name == serverID { - if ok, srv := isNomadServer(mem); ok { - target = srv - } - } - } - } - - // Unable to find a server - if target == nil { - err := fmt.Errorf("unknown nomad server %s", serverID) - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } - - serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor") + serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor") if err != nil { handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 931d03a60809..e6b8a53651c8 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -122,6 +122,7 @@ OUTER: func TestMonitor_Monitor_RemoteServer(t *testing.T) { t.Parallel() + foreignRegion := "foo" // start servers s1, cleanupS1 := TestServer(t, nil) @@ -130,9 +131,17 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { c.DevDisableBootstrap = true }) defer cleanupS2() - TestJoin(t, s1, s2) + + s3, cleanupS3 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + c.Region = foreignRegion + }) + defer cleanupS3() + + TestJoin(t, s1, s2, s3) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) + testutil.WaitForLeader(t, s3.RPC) // determine leader and nonleader servers := []*Server{s1, s2} @@ -152,6 +161,8 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog string logger hclog.InterceptLogger origin *Server + region string + expectedErr string }{ { desc: "remote leader", @@ -159,13 +170,23 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog: "leader log", logger: leader.logger, origin: nonLeader, + region: "global", }, { - desc: "remote server", + desc: "remote server, server name", serverID: nonLeader.serf.LocalMember().Name, expectedLog: "nonleader log", logger: nonLeader.logger, origin: leader, + region: "global", + }, + { + desc: "remote server, server UUID", + serverID: nonLeader.serf.LocalMember().Tags["id"], + expectedLog: "nonleader log", + logger: nonLeader.logger, + origin: leader, + region: "global", }, { desc: "serverID is current leader", @@ -173,6 +194,7 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog: "leader log", logger: leader.logger, origin: leader, + region: "global", }, { desc: "serverID is current server", @@ -180,6 +202,24 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog: "non leader log", logger: nonLeader.logger, origin: nonLeader, + region: "global", + }, + { + desc: "remote server, different region", + serverID: s3.serf.LocalMember().Name, + expectedLog: "remote region logger", + logger: s3.logger, + origin: nonLeader, + region: foreignRegion, + }, + { + desc: "different region, region mismatch", + serverID: s3.serf.LocalMember().Name, + expectedLog: "remote region logger", + logger: s3.logger, + origin: nonLeader, + region: "bar", + expectedErr: "No path to region", }, } @@ -204,6 +244,9 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { req := cstructs.MonitorRequest{ LogLevel: "warn", ServerID: tc.serverID, + QueryOptions: structs.QueryOptions{ + Region: tc.region, + }, } handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor") @@ -246,23 +289,29 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { for { select { case <-timeout: - t.Fatal("timeout waiting for logs") + require.Fail("timeout waiting for logs") case err := <-errCh: - t.Fatal(err) + require.Fail(err.Error()) case msg := <-streamMsg: if msg.Error != nil { - t.Fatalf("Got error: %v", msg.Error.Error()) - } - - var frame sframer.StreamFrame - err := json.Unmarshal(msg.Payload, &frame) - assert.NoError(t, err) - - received += string(frame.Data) - if strings.Contains(received, tc.expectedLog) { - close(doneCh) - require.Nil(p2.Close()) - break OUTER + if tc.expectedErr != "" { + require.Contains(msg.Error.Error(), tc.expectedErr) + // require.True(strings.Contains(msg.Error.Error(), tc.expectedErr)) + break OUTER + } else { + require.Failf("Got error: %v", msg.Error.Error()) + } + } else { + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, tc.expectedLog) { + close(doneCh) + require.Nil(p2.Close()) + break OUTER + } } } } diff --git a/nomad/rpc.go b/nomad/rpc.go index 41d47fe731a4..ee70f276628d 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -528,6 +528,26 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply) } +func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) { + // Bail if we can't find any servers + r.peerLock.RLock() + servers := r.peers[region] + if len(servers) == 0 { + r.peerLock.RUnlock() + r.logger.Warn("no path found to region", "region", region) + return nil, structs.ErrNoRegionPath + } + + // Lookup server by id or namedoes not exist in requested regiont + for _, server := range servers { + if server.Name == serverID || server.ID == serverID { + return server, nil + } + } + + return nil, fmt.Errorf("unknown nomad server %s", serverID) +} + // streamingRpc creates a connection to the given server and conducts the // initial handshake, returning the connection or an error. It is the callers // responsibility to close the connection if there is no returned error.