From c58eef0fa76b48f1aafabc04e50c8d5c83172529 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 1 Sep 2021 08:05:34 +0800 Subject: [PATCH] feat: add backlog quota command for topic (#429) Signed-off-by: Zixuan Liu ### Changes background from #246, the PR implements the following commands: - `pulsarctl topics get-backlog-quotas -a` - Get the backlog quota policy for a topic - `pulsarctl topics remove-backlog-quota --type ` - Remove a backlog quota policy from a topic - `pulsarctl topics set-backlog-quota --limit-size --limit-time --policy --type ` - Set a backlog quota policy for a topic ### TODO - [x] Add integration tests --- pkg/ctl/topic/backlog_quota_test.go | 70 ++++++++++++++ pkg/ctl/topic/get_backlog_quota.go | 72 ++++++++++++++ pkg/ctl/topic/remove_backlog_quota.go | 81 ++++++++++++++++ pkg/ctl/topic/set_backlog_quota.go | 133 ++++++++++++++++++++++++++ pkg/ctl/topic/topic.go | 3 + pkg/pulsar/topic.go | 43 +++++++++ pkg/pulsar/utils/backlog_quota.go | 28 +++++- 7 files changed, 426 insertions(+), 4 deletions(-) create mode 100644 pkg/ctl/topic/backlog_quota_test.go create mode 100644 pkg/ctl/topic/get_backlog_quota.go create mode 100644 pkg/ctl/topic/remove_backlog_quota.go create mode 100644 pkg/ctl/topic/set_backlog_quota.go diff --git a/pkg/ctl/topic/backlog_quota_test.go b/pkg/ctl/topic/backlog_quota_test.go new file mode 100644 index 00000000..53b94c9e --- /dev/null +++ b/pkg/ctl/topic/backlog_quota_test.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" + + "github.com/streamnative/pulsarctl/pkg/test" + "github.com/stretchr/testify/assert" +) + +func TestBacklogQuotaCmd(t *testing.T) { + topicName := fmt.Sprintf("persistent://public/default/test-backlog-quotas-topic-%s", test.RandomSuffix()) + createArgs := []string{"create", topicName, "1"} + _, execErr, _, _ := TestTopicCommands(CreateTopicCmd, createArgs) + assert.Nil(t, execErr) + + setArgs := []string{"set-backlog-quota", topicName, + "--limit-size", "1k", + "--limit-time", "20", + "-p", "producer_exception"} + out, execErr, _, _ := TestTopicCommands(SetBacklogQuotaCmd, setArgs) + assert.Nil(t, execErr) + assert.Equal(t, out.String(), fmt.Sprintf("Set backlog quota successfully for [%s]\n", topicName)) + + <-time.After(5 * time.Second) + + getArgs := []string{"get-backlog-quotas", topicName} + out, execErr, _, _ = TestTopicCommands(GetBacklogQuotaCmd, getArgs) + assert.Nil(t, execErr) + + var backlogQuotaMap map[string]utils.BacklogQuota + err := json.Unmarshal(out.Bytes(), &backlogQuotaMap) + assert.Nil(t, err) + assert.NotNil(t, backlogQuotaMap) + assert.NotNil(t, backlogQuotaMap["destination_storage"].LimitTime, int64(20)) + assert.NotNil(t, backlogQuotaMap["destination_storage"].LimitSize, int64(1024)) + assert.Equal(t, backlogQuotaMap["destination_storage"].Policy, utils.ProducerException) + + removeArgs := []string{"remove-backlog-quota", topicName} + out, execErr, _, _ = TestTopicCommands(RemoveBacklogQuotaCmd, removeArgs) + assert.Nil(t, execErr) + assert.Equal(t, out.String(), "Remove backlog quota successfully for ["+topicName+"]\n") + + <-time.After(5 * time.Second) + + out, execErr, _, _ = TestTopicCommands(GetBacklogQuotaCmd, getArgs) + assert.Nil(t, execErr) + assert.Equal(t, out.String(), "{}\n") +} diff --git a/pkg/ctl/topic/get_backlog_quota.go b/pkg/ctl/topic/get_backlog_quota.go new file mode 100644 index 00000000..130974ed --- /dev/null +++ b/pkg/ctl/topic/get_backlog_quota.go @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" +) + +func GetBacklogQuotaCmd(vc *cmdutils.VerbCmd) { + desc := cmdutils.LongDescription{} + desc.CommandUsedFor = "Get the backlog quota policy for a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + getBacklog := cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl topics get-backlog-quotas topic", + } + examples = append(examples, getBacklog) + desc.CommandExamples = examples + + vc.SetDescription( + "get-backlog-quotas", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString(), + "get-backlog-quotas", + ) + + var applied bool + vc.Command.Flags().BoolVarP( + &applied, + "applied", + "", + false, + "Get the applied policy for the topic") + + vc.SetRunFuncWithNameArg(func() error { + return doGetBacklogQuota(vc, applied) + }, "the topic name is not specified or the topic name is specified more than one") +} + +func doGetBacklogQuota(vc *cmdutils.VerbCmd, applied bool) error { + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + backlogQuotasMap, err := admin.Topics().GetBacklogQuotaMap(*topic, applied) + if err == nil { + cmdutils.PrintJSON(vc.Command.OutOrStdout(), &backlogQuotasMap) + } + + return err +} diff --git a/pkg/ctl/topic/remove_backlog_quota.go b/pkg/ctl/topic/remove_backlog_quota.go new file mode 100644 index 00000000..6808fad4 --- /dev/null +++ b/pkg/ctl/topic/remove_backlog_quota.go @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/pulsar/utils" +) + +func RemoveBacklogQuotaCmd(vc *cmdutils.VerbCmd) { + desc := cmdutils.LongDescription{} + desc.CommandUsedFor = "Remove a backlog quota policy from a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + removeBacklog := cmdutils.Example{ + Desc: "Remove a backlog quota policy from a topic", + Command: "pulsarctl topics remove-backlog-quota topic -t ", + } + examples = append(examples, removeBacklog) + desc.CommandExamples = examples + + var out []cmdutils.Output + successOut := cmdutils.Output{ + Desc: "normal output", + Out: "Remove backlog quota successfully for [topic]", + } + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "remove-backlog-quota", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString(), + "remove-backlog-quota", + ) + + var backlogQuotaType string + vc.Command.Flags().StringVarP( + &backlogQuotaType, + "type", + "t", + string(utils.DestinationStorage), + "Backlog quota type to remove", + ) + + vc.SetRunFuncWithNameArg(func() error { + return doRemoveBacklogQuota(vc, utils.BacklogQuotaType(backlogQuotaType)) + }, "the topic name is not specified or the topic name is specified more than one") +} + +func doRemoveBacklogQuota(vc *cmdutils.VerbCmd, backlogQuotaType utils.BacklogQuotaType) error { + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + err = admin.Topics().RemoveBacklogQuota(*topic, backlogQuotaType) + if err == nil { + vc.Command.Printf("Remove backlog quota successfully for [%s]\n", topic) + } + + return err +} diff --git a/pkg/ctl/topic/set_backlog_quota.go b/pkg/ctl/topic/set_backlog_quota.go new file mode 100644 index 00000000..f9b7db13 --- /dev/null +++ b/pkg/ctl/topic/set_backlog_quota.go @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/streamnative/pulsarctl/pkg/cmdutils" + "github.com/streamnative/pulsarctl/pkg/ctl/utils" + util "github.com/streamnative/pulsarctl/pkg/pulsar/utils" +) + +type backlogQuota struct { + LimitSize string + LimitTime int64 + Policy string + Type string +} + +func SetBacklogQuotaCmd(vc *cmdutils.VerbCmd) { + desc := cmdutils.LongDescription{} + desc.CommandUsedFor = "Set a backlog quota policy for a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + setBacklog := cmdutils.Example{ + Desc: desc.CommandUsedFor, + Command: "pulsarctl topics set-backlog-quota topic \n" + + "\t--limit-size 16G \n" + + "\t--limit-time -1 \n" + + "\t--policy \n" + + "\t--type ", + } + examples = append(examples, setBacklog) + desc.CommandExamples = examples + + var out []cmdutils.Output + successOut := cmdutils.Output{ + Desc: "normal output", + Out: "Set backlog quota successfully for [topic]", + } + out = append(out, successOut) + desc.CommandOutput = out + + vc.SetDescription( + "set-backlog-quota", + desc.CommandUsedFor, + desc.ToString(), + desc.ExampleToString(), + "set-backlog-quota", + ) + + backlogQuota := backlogQuota{} + vc.Command.Flags().StringVarP( + &backlogQuota.LimitSize, + "limit-size", + "", + "", + "Size limit (eg: 10M, 16G)") + + vc.Command.Flags().Int64VarP( + &backlogQuota.LimitTime, + "limit-time", + "", + -1, + "Time limit in seconds") + + vc.Command.Flags().StringVarP( + &backlogQuota.Policy, + "policy", + "p", + "", + "Retention policy to enforce when the limit is reached.\n"+ + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]") + + vc.Command.Flags().StringVarP(&backlogQuota.Type, + "type", + "t", + string(util.DestinationStorage), + "Backlog quota type to set.\n"+ + "Valid options are: [destination_storage, message_age]") + + _ = vc.Command.MarkFlagRequired("policy") + + vc.SetRunFuncWithNameArg(func() error { + return doSetBacklogQuota(vc, backlogQuota) + }, "the topic name is not specified or the topic name is specified more than one") + +} + +func doSetBacklogQuota(vc *cmdutils.VerbCmd, data backlogQuota) error { + topic, err := util.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + + sizeLimit, err := utils.ValidateSizeString(data.LimitSize) + if err != nil { + return err + } + + policy, err := util.ParseRetentionPolicy(data.Policy) + if err != nil { + return err + } + + backlogQuotaType, err := util.ParseBacklogQuotaType(data.Type) + if err != nil { + return err + } + + err = admin.Topics().SetBacklogQuota(*topic, util.NewBacklogQuota(sizeLimit, data.LimitTime, policy), backlogQuotaType) + if err == nil { + vc.Command.Printf("Set backlog quota successfully for [%s]\n", topic) + } + + return err +} diff --git a/pkg/ctl/topic/topic.go b/pkg/ctl/topic/topic.go index bb417c19..ff668032 100644 --- a/pkg/ctl/topic/topic.go +++ b/pkg/ctl/topic/topic.go @@ -81,6 +81,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { GetRetentionCmd, RemoveRetentionCmd, SetRetentionCmd, + GetBacklogQuotaCmd, + SetBacklogQuotaCmd, + RemoveBacklogQuotaCmd, } cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...) diff --git a/pkg/pulsar/topic.go b/pkg/pulsar/topic.go index 048b767d..8e780804 100644 --- a/pkg/pulsar/topic.go +++ b/pkg/pulsar/topic.go @@ -19,6 +19,7 @@ package pulsar import ( "fmt" + "net/url" "strconv" "github.com/streamnative/pulsarctl/pkg/pulsar/common" @@ -206,6 +207,15 @@ type Topics interface { // Remove compaction threshold for a topic RemoveCompactionThreshold(utils.TopicName) error + + // GetBacklogQuotaMap returns backlog quota map for a topic + GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error) + + // SetBacklogQuota sets a backlog quota for a topic + SetBacklogQuota(utils.TopicName, utils.BacklogQuota, utils.BacklogQuotaType) error + + // RemoveBacklogQuota removes a backlog quota policy from a topic + RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error } type topics struct { @@ -630,3 +640,36 @@ func (t *topics) RemoveCompactionThreshold(topic utils.TopicName) error { err := t.pulsar.Client.Delete(endpoint) return err } + +func (t *topics) GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, + error) { + var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuotaMap") + + queryParams := map[string]string{"applied": strconv.FormatBool(applied)} + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &backlogQuotaMap, queryParams, true) + + return backlogQuotaMap, err +} + +func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota utils.BacklogQuota, + backlogQuotaType utils.BacklogQuotaType) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota") + + u, err := url.Parse(endpoint) + if err != nil { + return err + } + q := u.Query() + q.Add("backlogQuotaType", string(backlogQuotaType)) + u.RawQuery = q.Encode() + + return t.pulsar.Client.Post(u.String(), &backlogQuota) +} + +func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType utils.BacklogQuotaType) error { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota") + return t.pulsar.Client.DeleteWithQueryParams(endpoint, map[string]string{ + "backlogQuotaType": string(backlogQuotaType), + }) +} diff --git a/pkg/pulsar/utils/backlog_quota.go b/pkg/pulsar/utils/backlog_quota.go index 2d14f809..3b78243b 100644 --- a/pkg/pulsar/utils/backlog_quota.go +++ b/pkg/pulsar/utils/backlog_quota.go @@ -35,10 +35,6 @@ func NewBacklogQuota(limitSize int64, limitTime int64, policy RetentionPolicy) B type RetentionPolicy string -type BacklogQuotaType string - -const DestinationStorage BacklogQuotaType = "destination_storage" - const ( ProducerRequestHold RetentionPolicy = "producer_request_hold" ProducerException RetentionPolicy = "producer_exception" @@ -47,6 +43,8 @@ const ( func ParseRetentionPolicy(str string) (RetentionPolicy, error) { switch str { + case ProducerRequestHold.String(): + return ProducerRequestHold, nil case ProducerException.String(): return ProducerException, nil case ConsumerBacklogEviction.String(): @@ -59,3 +57,25 @@ func ParseRetentionPolicy(str string) (RetentionPolicy, error) { func (s RetentionPolicy) String() string { return string(s) } + +type BacklogQuotaType string + +const ( + DestinationStorage BacklogQuotaType = "destination_storage" + MessageAge BacklogQuotaType = "message_age" +) + +func ParseBacklogQuotaType(str string) (BacklogQuotaType, error) { + switch str { + case DestinationStorage.String(): + return DestinationStorage, nil + case MessageAge.String(): + return MessageAge, nil + default: + return "", errors.Errorf("Invalid backlog quota type: %s", str) + } +} + +func (b BacklogQuotaType) String() string { + return string(b) +}