Skip to content

Commit

Permalink
feat: add backlog quota command for topic (#429)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>

### Changes

background from #246,  the PR implements the following commands:

- `pulsarctl topics get-backlog-quotas <topic> -a` - Get the backlog quota policy for a topic
- `pulsarctl topics remove-backlog-quota <topic> --type <producer_request_hold|message_age>` - Remove a backlog quota policy from a topic
- `pulsarctl topics set-backlog-quota <topic> --limit-size <string> --limit-time <int> --policy <producer_request_hold|producer_exception|consumer_backlog_eviction> --type <producer_request_hold|message_age>` - Set a backlog quota policy for a topic

### TODO
- [x] Add integration tests
  • Loading branch information
nodece authored Sep 1, 2021
1 parent a8286d1 commit c58eef0
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 4 deletions.
70 changes: 70 additions & 0 deletions pkg/ctl/topic/backlog_quota_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/streamnative/pulsarctl/pkg/pulsar/utils"

"github.com/streamnative/pulsarctl/pkg/test"
"github.com/stretchr/testify/assert"
)

func TestBacklogQuotaCmd(t *testing.T) {
topicName := fmt.Sprintf("persistent://public/default/test-backlog-quotas-topic-%s", test.RandomSuffix())
createArgs := []string{"create", topicName, "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, createArgs)
assert.Nil(t, execErr)

setArgs := []string{"set-backlog-quota", topicName,
"--limit-size", "1k",
"--limit-time", "20",
"-p", "producer_exception"}
out, execErr, _, _ := TestTopicCommands(SetBacklogQuotaCmd, setArgs)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), fmt.Sprintf("Set backlog quota successfully for [%s]\n", topicName))

<-time.After(5 * time.Second)

getArgs := []string{"get-backlog-quotas", topicName}
out, execErr, _, _ = TestTopicCommands(GetBacklogQuotaCmd, getArgs)
assert.Nil(t, execErr)

var backlogQuotaMap map[string]utils.BacklogQuota
err := json.Unmarshal(out.Bytes(), &backlogQuotaMap)
assert.Nil(t, err)
assert.NotNil(t, backlogQuotaMap)
assert.NotNil(t, backlogQuotaMap["destination_storage"].LimitTime, int64(20))
assert.NotNil(t, backlogQuotaMap["destination_storage"].LimitSize, int64(1024))
assert.Equal(t, backlogQuotaMap["destination_storage"].Policy, utils.ProducerException)

removeArgs := []string{"remove-backlog-quota", topicName}
out, execErr, _, _ = TestTopicCommands(RemoveBacklogQuotaCmd, removeArgs)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "Remove backlog quota successfully for ["+topicName+"]\n")

<-time.After(5 * time.Second)

out, execErr, _, _ = TestTopicCommands(GetBacklogQuotaCmd, getArgs)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "{}\n")
}
72 changes: 72 additions & 0 deletions pkg/ctl/topic/get_backlog_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func GetBacklogQuotaCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get the backlog quota policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
getBacklog := cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl topics get-backlog-quotas topic",
}
examples = append(examples, getBacklog)
desc.CommandExamples = examples

vc.SetDescription(
"get-backlog-quotas",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString(),
"get-backlog-quotas",
)

var applied bool
vc.Command.Flags().BoolVarP(
&applied,
"applied",
"",
false,
"Get the applied policy for the topic")

vc.SetRunFuncWithNameArg(func() error {
return doGetBacklogQuota(vc, applied)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doGetBacklogQuota(vc *cmdutils.VerbCmd, applied bool) error {
topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
backlogQuotasMap, err := admin.Topics().GetBacklogQuotaMap(*topic, applied)
if err == nil {
cmdutils.PrintJSON(vc.Command.OutOrStdout(), &backlogQuotasMap)
}

return err
}
81 changes: 81 additions & 0 deletions pkg/ctl/topic/remove_backlog_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func RemoveBacklogQuotaCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Remove a backlog quota policy from a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
removeBacklog := cmdutils.Example{
Desc: "Remove a backlog quota policy from a topic",
Command: "pulsarctl topics remove-backlog-quota topic -t <destination_storage|message_age>",
}
examples = append(examples, removeBacklog)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Remove backlog quota successfully for [topic]",
}
out = append(out, successOut)
desc.CommandOutput = out

vc.SetDescription(
"remove-backlog-quota",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString(),
"remove-backlog-quota",
)

var backlogQuotaType string
vc.Command.Flags().StringVarP(
&backlogQuotaType,
"type",
"t",
string(utils.DestinationStorage),
"Backlog quota type to remove",
)

vc.SetRunFuncWithNameArg(func() error {
return doRemoveBacklogQuota(vc, utils.BacklogQuotaType(backlogQuotaType))
}, "the topic name is not specified or the topic name is specified more than one")
}

func doRemoveBacklogQuota(vc *cmdutils.VerbCmd, backlogQuotaType utils.BacklogQuotaType) error {
topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
err = admin.Topics().RemoveBacklogQuota(*topic, backlogQuotaType)
if err == nil {
vc.Command.Printf("Remove backlog quota successfully for [%s]\n", topic)
}

return err
}
133 changes: 133 additions & 0 deletions pkg/ctl/topic/set_backlog_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/ctl/utils"
util "github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

type backlogQuota struct {
LimitSize string
LimitTime int64
Policy string
Type string
}

func SetBacklogQuotaCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set a backlog quota policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
setBacklog := cmdutils.Example{
Desc: desc.CommandUsedFor,
Command: "pulsarctl topics set-backlog-quota topic \n" +
"\t--limit-size 16G \n" +
"\t--limit-time -1 \n" +
"\t--policy <producer_request_hold|producer_exception|consumer_backlog_eviction> \n" +
"\t--type <destination_storage|message_age>",
}
examples = append(examples, setBacklog)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set backlog quota successfully for [topic]",
}
out = append(out, successOut)
desc.CommandOutput = out

vc.SetDescription(
"set-backlog-quota",
desc.CommandUsedFor,
desc.ToString(),
desc.ExampleToString(),
"set-backlog-quota",
)

backlogQuota := backlogQuota{}
vc.Command.Flags().StringVarP(
&backlogQuota.LimitSize,
"limit-size",
"",
"",
"Size limit (eg: 10M, 16G)")

vc.Command.Flags().Int64VarP(
&backlogQuota.LimitTime,
"limit-time",
"",
-1,
"Time limit in seconds")

vc.Command.Flags().StringVarP(
&backlogQuota.Policy,
"policy",
"p",
"",
"Retention policy to enforce when the limit is reached.\n"+
"Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]")

vc.Command.Flags().StringVarP(&backlogQuota.Type,
"type",
"t",
string(util.DestinationStorage),
"Backlog quota type to set.\n"+
"Valid options are: [destination_storage, message_age]")

_ = vc.Command.MarkFlagRequired("policy")

vc.SetRunFuncWithNameArg(func() error {
return doSetBacklogQuota(vc, backlogQuota)
}, "the topic name is not specified or the topic name is specified more than one")

}

func doSetBacklogQuota(vc *cmdutils.VerbCmd, data backlogQuota) error {
topic, err := util.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()

sizeLimit, err := utils.ValidateSizeString(data.LimitSize)
if err != nil {
return err
}

policy, err := util.ParseRetentionPolicy(data.Policy)
if err != nil {
return err
}

backlogQuotaType, err := util.ParseBacklogQuotaType(data.Type)
if err != nil {
return err
}

err = admin.Topics().SetBacklogQuota(*topic, util.NewBacklogQuota(sizeLimit, data.LimitTime, policy), backlogQuotaType)
if err == nil {
vc.Command.Printf("Set backlog quota successfully for [%s]\n", topic)
}

return err
}
3 changes: 3 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetRetentionCmd,
RemoveRetentionCmd,
SetRetentionCmd,
GetBacklogQuotaCmd,
SetBacklogQuotaCmd,
RemoveBacklogQuotaCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
Loading

0 comments on commit c58eef0

Please sign in to comment.