diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 9d29776fb9..755f330555 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2235,11 +2235,6 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * } return } - if !isEmptyRequest(msg) { - resp.Error = NewJSBadRequestError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } // Check to see if we are a member of the group and if the group has no leader. if js.isGroupLeaderless(sa.Group) { @@ -2266,14 +2261,33 @@ func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ * return } + node := mset.raftNode() + if node == nil { + resp.Success = true + s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) + return + } + + var preferredLeader string + if isJSONObjectOrArray(msg) { + var req JSApiLeaderStepdownRequest + if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError(err) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(node, req.Placement); resp.Error != nil { + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + } + // Call actual stepdown. Do this in a Go routine. go func() { - if node := mset.raftNode(); node != nil { - mset.setLeader(false) - // TODO (mh) eventually make sure all go routines exited and all channels are cleared - time.Sleep(250 * time.Millisecond) - node.StepDown() - } + mset.setLeader(false) + // TODO (mh) eventually make sure all go routines exited and all channels are cleared + time.Sleep(250 * time.Millisecond) + node.StepDown(preferredLeader) resp.Success = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) @@ -2353,11 +2367,6 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ } return } - if !isEmptyRequest(msg) { - resp.Error = NewJSBadRequestError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } mset, err := acc.lookupStream(stream) if err != nil { @@ -2379,12 +2388,26 @@ func (s *Server) jsConsumerLeaderStepDownRequest(sub *subscription, c *client, _ return } + var preferredLeader string + if isJSONObjectOrArray(msg) { + var req JSApiLeaderStepdownRequest + if err := s.unmarshalRequest(c, acc, subject, msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError(err) + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(n, req.Placement); resp.Error != nil { + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + } + // Call actual stepdown. Do this in a Go routine. go func() { o.setLeader(false) // TODO (mh) eventually make sure all go routines exited and all channels are cleared time.Sleep(250 * time.Millisecond) - n.StepDown() + n.StepDown(preferredLeader) resp.Success = true s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index c8edd7151b..b3322788b2 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -239,6 +239,473 @@ func TestJetStreamSuperClusterMetaStepDown(t *testing.T) { }) } +func TestJetStreamSuperClusterStreamStepDown(t *testing.T) { + sc := createJetStreamTaggedSuperCluster(t) + defer sc.shutdown() + + sc.waitOnLeader() + s := sc.randomServer() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Replicas: 3, + Retention: nats.InterestPolicy, + Placement: &nats.Placement{ + Cluster: "C1", + Tags: []string{"cloud:aws"}, + }, + }) + require_NoError(t, err) + + stepdown := func(preferred, cn string, tags []string) *JSApiStreamLeaderStepDownResponse { + jreq, err := json.Marshal(&JSApiLeaderStepdownRequest{ + Placement: &Placement{ + Cluster: cn, + Tags: tags, + Preferred: preferred, + }, + }) + require_NoError(t, err) + + resp, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "foo"), jreq, time.Second) + require_NoError(t, err) + + var sdr JSApiStreamLeaderStepDownResponse + require_NoError(t, json.Unmarshal(resp.Data, &sdr)) + return &sdr + } + + // Make sure we get correct errors for clusters we don't know about. + t.Run("UnknownCluster", func(t *testing.T) { + sdr := stepdown(_EMPTY_, "ThisClusterDoesntExist", nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for servers we don't know about. + t.Run("UnknownPreferredServer", func(t *testing.T) { + sdr := stepdown("ThisServerDoesntExist", _EMPTY_, nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for tags that don't match any servers. + t.Run("UnknownTag", func(t *testing.T) { + sdr := stepdown(_EMPTY_, _EMPTY_, []string{"thistag:doesntexist"}) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for clusters that we aren't assigned to. + // The asset is in C1, not C2, so placement should fail. + t.Run("NonParticipantCluster", func(t *testing.T) { + sdr := stepdown(_EMPTY_, "C2", nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for servers that are already leader. + t.Run("PreferredServerAlreadyLeader", func(t *testing.T) { + sl := sc.clusterForName("C1").streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + + sdr := stepdown(sl.Name(), _EMPTY_, nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for tags and bad or unavailable cluster placement. + t.Run("PlacementByPreferredServer", func(t *testing.T) { + c := sc.clusterForName("C1") + sl := c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + + var preferredServer string + for _, s := range c.servers { + if s == sl { + continue + } + preferredServer = s.Name() + break + } + + sdr := stepdown(preferredServer, _EMPTY_, nil) + require_Equal(t, sdr.Error, nil) + + c.waitOnStreamLeader(globalAccountName, "foo") + sl = c.streamLeader(globalAccountName, "foo") + require_Equal(t, sl.Name(), preferredServer) + }) + + // Influence the placement by using the cluster name. For streams this doesn't really + // make sense, since the stream can only exist in a single cluster (the one that it + // had its placement in), so this effectively works the same as specifying a stepdown + // without a cluster name specified. Let's just make sure it does the right thing in + // any case. + t.Run("PlacementByCluster", func(t *testing.T) { + c := sc.clusterForName("C1") + sl := c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + + sdr := stepdown(_EMPTY_, "C1", nil) + require_Equal(t, sdr.Error, nil) + + c.waitOnStreamLeader(globalAccountName, "foo") + sl = c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + require_Equal(t, sl.ClusterName(), "C1") + }) + + // Influence the placement by using tag names. + t.Run("PlacementByTag", func(t *testing.T) { + c := sc.clusterForName("C1") + sl := c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "node:1": {}, + "node:2": {}, + "node:3": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range sl.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, _EMPTY_, []string{chosenTag}) + require_Equal(t, sdr.Error, nil) + + c.waitOnStreamLeader(globalAccountName, "foo") + sl = c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + require_True(t, sl.getOpts().Tags.Contains(chosenTag)) + }) + + // Influence the placement by using tag names, we need to match all of them. + t.Run("PlacementByMultipleTags", func(t *testing.T) { + c := sc.clusterForName("C1") + sl := c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "node:1": {}, + "node:2": {}, + "node:3": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range sl.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, _EMPTY_, []string{chosenTag, "cloud:aws"}) + require_Equal(t, sdr.Error, nil) + + c.waitOnStreamLeader(globalAccountName, "foo") + sl = c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + require_True(t, sl.getOpts().Tags.Contains(chosenTag)) + require_True(t, sl.getOpts().Tags.Contains("cloud:aws")) + }) + + // Influence the placement by using the cluster name and a tag. Like with + // PlacementByCluster above, the cluster portion of this request is not really + // doing anything, but it's useful just to ensure the API behaves properly. + t.Run("PlacementByClusterAndTag", func(t *testing.T) { + c := sc.clusterForName("C1") + sl := c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "node:1": {}, + "node:2": {}, + "node:3": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range sl.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, "C1", []string{chosenTag, "cloud:aws"}) + require_Equal(t, sdr.Error, nil) + + c.waitOnStreamLeader(globalAccountName, "foo") + sl = c.streamLeader(globalAccountName, "foo") + require_NotNil(t, sl) + require_True(t, sl.getOpts().Tags.Contains(chosenTag)) + require_True(t, sl.getOpts().Tags.Contains("cloud:aws")) + require_Equal(t, sl.ClusterName(), "C1") + }) +} + +func TestJetStreamSuperClusterConsumerStepDown(t *testing.T) { + sc := createJetStreamTaggedSuperCluster(t) + defer sc.shutdown() + + sc.waitOnLeader() + s := sc.randomServer() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "foo", + Subjects: []string{"foo.>"}, + Replicas: 3, + Retention: nats.InterestPolicy, + Placement: &nats.Placement{ + Cluster: "C1", + Tags: []string{"cloud:aws"}, + }, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("foo", &nats.ConsumerConfig{ + Name: "consumer", + }) + require_NoError(t, err) + + stepdown := func(preferred, cn string, tags []string) *JSApiStreamLeaderStepDownResponse { + jreq, err := json.Marshal(&JSApiLeaderStepdownRequest{ + Placement: &Placement{ + Cluster: cn, + Tags: tags, + Preferred: preferred, + }, + }) + require_NoError(t, err) + + resp, err := nc.Request(fmt.Sprintf(JSApiConsumerLeaderStepDownT, "foo", "consumer"), jreq, time.Second) + require_NoError(t, err) + + var sdr JSApiStreamLeaderStepDownResponse + require_NoError(t, json.Unmarshal(resp.Data, &sdr)) + return &sdr + } + + // Make sure we get correct errors for clusters we don't know about. + t.Run("UnknownCluster", func(t *testing.T) { + sdr := stepdown(_EMPTY_, "ThisClusterDoesntExist", nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for servers we don't know about. + t.Run("UnknownPreferredServer", func(t *testing.T) { + sdr := stepdown("ThisServerDoesntExist", _EMPTY_, nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for tags that don't match any servers. + t.Run("UnknownTag", func(t *testing.T) { + sdr := stepdown(_EMPTY_, _EMPTY_, []string{"thistag:doesntexist"}) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for clusters that we aren't assigned to. + // The asset is in C1, not C2, so placement should fail. + t.Run("NonParticipantCluster", func(t *testing.T) { + sdr := stepdown(_EMPTY_, "C2", nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for servers that are already leader. + t.Run("PreferredServerAlreadyLeader", func(t *testing.T) { + cl := sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + + sdr := stepdown(cl.Name(), _EMPTY_, nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for tags and bad or unavailable cluster placement. + t.Run("PlacementByPreferredServer", func(t *testing.T) { + c := sc.clusterForName("C1") + cl := sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + + var preferredServer string + for _, s := range c.servers { + if s == cl { + continue + } + preferredServer = s.Name() + break + } + + sdr := stepdown(preferredServer, _EMPTY_, nil) + require_Equal(t, sdr.Error, nil) + + c.waitOnConsumerLeader(globalAccountName, "foo", "consumer") + cl = sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_Equal(t, cl.Name(), preferredServer) + }) + + // Influence the placement by using the cluster name. For consumers this doesn't really + // make sense, since the consumers can only exist in a single cluster (the one that it + // had its placement in), so this effectively works the same as specifying a stepdown + // without a cluster name specified. Let's just make sure it does the right thing in + // any case. + t.Run("PlacementByCluster", func(t *testing.T) { + c := sc.clusterForName("C1") + cl := sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + + sdr := stepdown(_EMPTY_, "C1", nil) + require_Equal(t, sdr.Error, nil) + + c.waitOnConsumerLeader(globalAccountName, "foo", "consumer") + cl = sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + require_Equal(t, cl.ClusterName(), "C1") + }) + + // Influence the placement by using tag names. + t.Run("PlacementByTag", func(t *testing.T) { + c := sc.clusterForName("C1") + cl := sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "node:1": {}, + "node:2": {}, + "node:3": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range cl.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, _EMPTY_, []string{chosenTag}) + require_Equal(t, sdr.Error, nil) + + c.waitOnConsumerLeader(globalAccountName, "foo", "consumer") + cl = sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + require_True(t, cl.getOpts().Tags.Contains(chosenTag)) + }) + + // Influence the placement by using tag names, we need to match all of them. + t.Run("PlacementByMultipleTags", func(t *testing.T) { + c := sc.clusterForName("C1") + cl := sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "node:1": {}, + "node:2": {}, + "node:3": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range cl.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, _EMPTY_, []string{chosenTag, "cloud:aws"}) + require_Equal(t, sdr.Error, nil) + + c.waitOnConsumerLeader(globalAccountName, "foo", "consumer") + cl = sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + require_True(t, cl.getOpts().Tags.Contains(chosenTag)) + require_True(t, cl.getOpts().Tags.Contains("cloud:aws")) + }) + + // Influence the placement by using the cluster name and a tag. Like with + // PlacementByCluster above, the cluster portion of this request is not really + // doing anything, but it's useful just to ensure the API behaves properly. + t.Run("PlacementByClusterAndTag", func(t *testing.T) { + c := sc.clusterForName("C1") + cl := sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "node:1": {}, + "node:2": {}, + "node:3": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range cl.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, "C1", []string{chosenTag, "cloud:aws"}) + require_Equal(t, sdr.Error, nil) + + c.waitOnConsumerLeader(globalAccountName, "foo", "consumer") + cl = sc.clusterForName("C1").consumerLeader(globalAccountName, "foo", "consumer") + require_NotNil(t, cl) + require_True(t, cl.getOpts().Tags.Contains(chosenTag)) + require_True(t, cl.getOpts().Tags.Contains("cloud:aws")) + require_Equal(t, cl.ClusterName(), "C1") + }) +} + func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { tmlp := ` listen: 127.0.0.1:-1