Skip to content

Commit

Permalink
Add client scheduling eligibility to heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
angrycub committed Sep 7, 2022
1 parent 11496d1 commit eb6a261
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 4 deletions.
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,6 +1954,11 @@ func (c *Client) updateNodeStatus() error {
"req_latency", end.Sub(start), "heartbeat_ttl", oldTTL, "since_last_heartbeat", time.Since(last))
}
}
// Check heartbeat response for information about the server-side scheduling
// state of this node
c.UpdateConfig(func(c *config.Config) {
c.Node.SchedulingEligibility = resp.ClientStatus.SchedulingEligibility
})

// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
Expand Down
14 changes: 10 additions & 4 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp

n.srv.peerLock.RLock()
defer n.srv.peerLock.RUnlock()
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
if err := n.constructNodeServerInfoResponse(args.Node.ID, snap, reply); err != nil {
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
return err
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func equalDevices(n1, n2 *structs.Node) bool {
}

// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
func (n *Node) constructNodeServerInfoResponse(nodeID string, snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
reply.LeaderRPCAddr = string(n.srv.raft.Leader())

// Reply with config information required for future RPC requests
Expand All @@ -271,6 +271,12 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
})
}

// Add ClientStatus information to heartbeat response.
node, _ := snap.NodeByID(nil, nodeID)
reply.ClientStatus = &structs.ClientStatus{
SchedulingEligibility: node.SchedulingEligibility,
}

// TODO(sean@): Use an indexed node count instead
//
// Snapshot is used only to iterate over all nodes to create a node
Expand Down Expand Up @@ -564,7 +570,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
reply.Index = index
n.srv.peerLock.RLock()
defer n.srv.peerLock.RUnlock()
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
return err
}
Expand Down Expand Up @@ -821,7 +827,7 @@ func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUp

n.srv.peerLock.RLock()
defer n.srv.peerLock.RUnlock()
if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
if err := n.constructNodeServerInfoResponse(node.GetID(), snap, reply); err != nil {
n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
return err
}
Expand Down
10 changes: 10 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,9 +1361,19 @@ type NodeUpdateResponse struct {
// region.
Servers []*NodeServerInfo

// ClientStatus is used to inform clients what the server-side
// has for their scheduling status during heartbeats.
ClientStatus *ClientStatus

QueryMeta
}

// ClientStatus is used to inform clients what the server-side
// has for their scheduling status during heartbeats.
type ClientStatus struct {
SchedulingEligibility string
}

// NodeDrainUpdateResponse is used to respond to a node drain update
type NodeDrainUpdateResponse struct {
NodeModifyIndex uint64
Expand Down

0 comments on commit eb6a261

Please sign in to comment.