From 429f5311b5c6ef4ff7e942e1844d6d0f105fdac3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 17 Mar 2020 16:14:11 -0400 Subject: [PATCH 1/2] server: node connections must not be forwarded This fixes a bug where a forwarded node update request may be assumed to be the actual direct client connection if the server just lost leadership. When a nomad non-leader server receives a Node.UpdateStatus request, it forwards the RPC request to the leader, and holds on the request Yamux connection in a cache to allow for server<->client forwarding. When the leader handles the request, it must differentiate between a forwarded connection vs the actual connection. This is done in https://github.com/hashicorp/nomad/blob/v0.10.4/nomad/node_endpoint.go#L412 Now, consider if the non-leader server forwards to the connection to a recently deposed nomad leader, which in turn forwards the RPC request to the new leader. Without this change, the deposed leader will mistake the forwarded connection for the actual client connection and cache it mapped to the client ID. If the server attempts to connect to that client, it will attempt to start a connection/session to the other server instead and the call will hang forever. This change ensures that we only add node connection mapping if the request is not a forwarded request, regardless of circumstances. --- nomad/node_endpoint.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d4753027f274..f635abc2791b 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -82,7 +82,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp // We have a valid node connection since there is no error from the // forwarded server, so add the mapping to cache the // connection and allow the server to send RPCs to the client. - if err == nil && n.ctx != nil && n.ctx.NodeID == "" { + if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { n.ctx.NodeID = args.Node.ID n.srv.addNodeConn(n.ctx) } @@ -374,7 +374,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct // We have a valid node connection since there is no error from the // forwarded server, so add the mapping to cache the // connection and allow the server to send RPCs to the client. - if err == nil && n.ctx != nil && n.ctx.NodeID == "" { + if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { n.ctx.NodeID = args.NodeID n.srv.addNodeConn(n.ctx) } @@ -925,7 +925,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, // We have a valid node connection since there is no error from the // forwarded server, so add the mapping to cache the // connection and allow the server to send RPCs to the client. - if err == nil && n.ctx != nil && n.ctx.NodeID == "" { + if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { n.ctx.NodeID = args.NodeID n.srv.addNodeConn(n.ctx) } From 63b8e45a12452f8b5a753517036723e33e94d9f3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 17 Mar 2020 21:35:56 -0400 Subject: [PATCH 2/2] Protect against args being modified --- nomad/node_endpoint.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index f635abc2791b..e26891349f65 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -78,11 +78,12 @@ type Node struct { // Register is used to upsert a client that is available for scheduling func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { + isForwarded := args.IsForwarded() if done, err := n.srv.forward("Node.Register", args, args, reply); done { // We have a valid node connection since there is no error from the // forwarded server, so add the mapping to cache the // connection and allow the server to send RPCs to the client. - if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { + if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded { n.ctx.NodeID = args.Node.ID n.srv.addNodeConn(n.ctx) } @@ -370,11 +371,12 @@ func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, // UpdateStatus is used to update the status of a client node func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { + isForwarded := args.IsForwarded() if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done { // We have a valid node connection since there is no error from the // forwarded server, so add the mapping to cache the // connection and allow the server to send RPCs to the client. - if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { + if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded { n.ctx.NodeID = args.NodeID n.srv.addNodeConn(n.ctx) } @@ -921,11 +923,12 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply *structs.NodeClientAllocsResponse) error { + isForwarded := args.IsForwarded() if done, err := n.srv.forward("Node.GetClientAllocs", args, args, reply); done { // We have a valid node connection since there is no error from the // forwarded server, so add the mapping to cache the // connection and allow the server to send RPCs to the client. - if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { + if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded { n.ctx.NodeID = args.NodeID n.srv.addNodeConn(n.ctx) }