Skip to content

Commit

Permalink
Add support for placement tags and preferred to stream and consumer…
Browse files Browse the repository at this point in the history
… stepdown requests (#6284)

Like #6282, this adds preferred placement support to stream and consumer
stepdowns too.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Dec 19, 2024
2 parents 10c2971 + 41705de commit 470a7ac
Show file tree
Hide file tree
Showing 2 changed files with 507 additions and 17 deletions.
57 changes: 40 additions & 17 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2235,11 +2235,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
}
return
}
if !isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
Expand All @@ -2266,14 +2261,33 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *
return
}

node := mset.raftNode()
if node == nil {
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
return
}

var preferredLeader string
if isJSONObjectOrArray(msg) {
var req JSApiLeaderStepdownRequest
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil {
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

// Call actual stepdown. Do this in a Go routine.
go func() {
if node := mset.raftNode(); node != nil {
mset.setLeader(false)
// TODO (mh) eventually make sure all go routines exited and all channels are cleared
time.Sleep(250 * time.Millisecond)
node.StepDown()
}
mset.setLeader(false)
// TODO (mh) eventually make sure all go routines exited and all channels are cleared
time.Sleep(250 * time.Millisecond)
node.StepDown(preferredLeader)

resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
Expand Down Expand Up @@ -2353,11 +2367,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
}
return
}
if !isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

mset, err := acc.lookupStream(stream)
if err != nil {
Expand All @@ -2379,12 +2388,26 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _
return
}

var preferredLeader string
if isJSONObjectOrArray(msg) {
var req JSApiLeaderStepdownRequest
if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError(err)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil {
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

// Call actual stepdown. Do this in a Go routine.
go func() {
o.setLeader(false)
// TODO (mh) eventually make sure all go routines exited and all channels are cleared
time.Sleep(250 * time.Millisecond)
n.StepDown()
n.StepDown(preferredLeader)

resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
Expand Down
Loading

0 comments on commit 470a7ac

Please sign in to comment.