Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track all client connections #4222

Merged
merged 2 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions nomad/client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@ type nodeConnState struct {
func (s *Server) getNodeConn(nodeID string) (*nodeConnState, bool) {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
state, ok := s.nodeConns[nodeID]
conns, ok := s.nodeConns[nodeID]

// Return the latest conn
var state *nodeConnState
for _, conn := range conns {
if state == nil || state.Established.Before(conn.Established) {
state = conn
}
}

return state, ok
}

Expand All @@ -39,8 +48,12 @@ func (s *Server) connectedNodes() map[string]time.Time {
s.nodeConnsLock.RLock()
defer s.nodeConnsLock.RUnlock()
nodes := make(map[string]time.Time, len(s.nodeConns))
for nodeID, state := range s.nodeConns {
nodes[nodeID] = state.Established
for nodeID, conns := range s.nodeConns {
for _, conn := range conns {
if nodes[nodeID].Before(conn.Established) {
nodes[nodeID] = conn.Established
}
}
}
return nodes
}
Expand All @@ -54,11 +67,26 @@ func (s *Server) addNodeConn(ctx *RPCContext) {

s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
s.nodeConns[ctx.NodeID] = &nodeConnState{

// Capture the tracked connections so far
currentConns := s.nodeConns[ctx.NodeID]

// Check if we already have the connection. If we do, just update the
// establish time.
for _, c := range currentConns {
if c.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
c.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
c.Established = time.Now()
return
}
}

// Add the new conn
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID], &nodeConnState{
Session: ctx.Session,
Established: time.Now(),
Ctx: ctx,
}
})
}

// removeNodeConn removes the mapping between a node and its session.
Expand All @@ -70,7 +98,7 @@ func (s *Server) removeNodeConn(ctx *RPCContext) {

s.nodeConnsLock.Lock()
defer s.nodeConnsLock.Unlock()
state, ok := s.nodeConns[ctx.NodeID]
conns, ok := s.nodeConns[ctx.NodeID]
if !ok {
return
}
Expand All @@ -80,9 +108,12 @@ func (s *Server) removeNodeConn(ctx *RPCContext) {
// dial various addresses that all route to the same server. The most common
// case for this is the original address the client uses to connect to the
// server differs from the advertised address sent by the heartbeat.
if state.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
state.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
delete(s.nodeConns, ctx.NodeID)
for i, conn := range conns {
if conn.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
conn.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
s.nodeConns[ctx.NodeID] = append(s.nodeConns[ctx.NodeID][:i], s.nodeConns[ctx.NodeID][i+1:]...)
return
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions nomad/client_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestServer_removeNodeConn_differentAddrs(t *testing.T) {
s1.addNodeConn(ctx1)
s1.addNodeConn(ctx2)
require.Len(s1.connectedNodes(), 1)
require.Len(s1.nodeConns[nodeID], 2)

// Check that the value is the second conn.
state, ok := s1.getNodeConn(nodeID)
Expand All @@ -66,6 +67,7 @@ func TestServer_removeNodeConn_differentAddrs(t *testing.T) {
// Delete the first
s1.removeNodeConn(ctx1)
require.Len(s1.connectedNodes(), 1)
require.Len(s1.nodeConns[nodeID], 1)

// Check that the value is the second conn.
state, ok = s1.getNodeConn(nodeID)
Expand Down
4 changes: 2 additions & 2 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type Server struct {

// nodeConns is the set of multiplexed node connections we have keyed by
// NodeID
nodeConns map[string]*nodeConnState
nodeConns map[string][]*nodeConnState
nodeConnsLock sync.RWMutex

// peers is used to track the known Nomad servers. This is
Expand Down Expand Up @@ -294,7 +294,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string]*nodeConnState),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
Expand Down