Skip to content

Commit

Permalink
Add command topic Deduplication(#246) (#408)
Browse files Browse the repository at this point in the history
* Add command topic Deduplication(#246)

- pulsarctl topics get-deduplication [topic]
- pulsarctl topics set-deduplication [topic] -e
- pulsarctl topics remove-deduplication [topic]

* Modify prompt

Signed-off-by: limingnihao <limingnihao@live.com>
  • Loading branch information
limingnihao authored Aug 4, 2021
1 parent 36d2297 commit a529093
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 0 deletions.
71 changes: 71 additions & 0 deletions pkg/ctl/topic/deduplication_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"testing"
"time"

"github.com/streamnative/pulsarctl/pkg/test"
"github.com/stretchr/testify/assert"
)

func TestDeduplicationStatus(t *testing.T) {
topicName := "persistent://public/default/test-deduplication-status-topic-" + test.RandomSuffix()
args := []string{"create", topicName, "1"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, execErr)

args = []string{"set-deduplication", topicName, "-e"}
out, execErr, _, _ := TestTopicCommands(SetDeduplicationStatusCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "Enable the deduplication policy successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
args = []string{"get-deduplication", topicName}
out, execErr, _, _ = TestTopicCommands(GetDeduplicationStatusCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "true")

args = []string{"remove-deduplication", topicName}
out, execErr, _, _ = TestTopicCommands(RemoveDeduplicationStatusCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "Remove the deduplication policy successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
args = []string{"get-deduplication", topicName}
out, execErr, _, _ = TestTopicCommands(GetDeduplicationStatusCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "false")

args = []string{"set-deduplication", topicName, "-d"}
out, execErr, _, _ = TestTopicCommands(SetDeduplicationStatusCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "Disable the deduplication policy successfully for ["+topicName+"]\n")

time.Sleep(time.Duration(1) * time.Second)
args = []string{"get-deduplication", topicName}
out, execErr, _, _ = TestTopicCommands(GetDeduplicationStatusCmd, args)
assert.Nil(t, execErr)
assert.Equal(t, out.String(), "false")

args = []string{"set-deduplication", topicName, "-e", "-d"}
_, execErr, _, _ = TestTopicCommands(SetDeduplicationStatusCmd, args)
assert.NotNil(t, execErr)
assert.Equal(t, execErr.Error(), "Need to specify either --enable or --disable")
}
81 changes: 81 additions & 0 deletions pkg/ctl/topic/get_deduplication_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func GetDeduplicationStatusCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get the deduplication policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Get the deduplication policy for a topic",
Command: "pulsarctl topics get-deduplication topic",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Get the deduplication policy successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"get-deduplication",
"Get the deduplication policy for a topic",
desc.ToString(),
desc.ExampleToString(),
"get-deduplication",
)

vc.SetRunFuncWithNameArg(func() error {
return doGetDeduplicationStatus(vc)
}, "the topic name is not specified or the topic name is specified more than one")

vc.EnableOutputFlagSet()
}

func doGetDeduplicationStatus(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
deduplicationData, err := admin.Topics().GetDeduplicationStatus(*topic)
if err == nil {
oc := cmdutils.NewOutputContent().WithObject(deduplicationData)
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}
78 changes: 78 additions & 0 deletions pkg/ctl/topic/remove_deduplication_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func RemoveDeduplicationStatusCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Remove the deduplication policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Remove the deduplication policy for a topic",
Command: "pulsarctl topics remove-deduplication topic",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Remove the deduplication policy successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"remove-deduplication",
"Remove the deduplication policy for a topic",
desc.ToString(),
desc.ExampleToString(),
"remove-deduplication",
)

vc.SetRunFuncWithNameArg(func() error {
return doRemoveDeduplicationStatus(vc)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doRemoveDeduplicationStatus(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
err = admin.Topics().RemoveDeduplicationStatus(*topic)
if err == nil {
vc.Command.Printf("Remove the deduplication policy successfully for [%s]\n", topic.String())
}
return err
}
106 changes: 106 additions & 0 deletions pkg/ctl/topic/set_deduplication_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/pkg/errors"
"github.com/spf13/pflag"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
"github.com/streamnative/pulsarctl/pkg/pulsar/utils"
)

func SetDeduplicationStatusCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set the deduplication policy for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
msg := cmdutils.Example{
Desc: "Set the deduplication policy for a topic",
Command: "pulsarctl topics set-deduplication topic -e",
}
examples = append(examples, msg)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set the deduplication policy successfully for [topic]",
}
out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"set-deduplication",
"Set the deduplication policy for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-deduplication",
)
var enable, disable bool
vc.SetRunFuncWithNameArg(func() error {
return doSetDeduplicationStatus(vc, enable, disable)
}, "the topic name is not specified or the topic name is specified more than one")

vc.FlagSetGroup.InFlagSet("Deduplication", func(set *pflag.FlagSet) {
set.BoolVarP(
&enable,
"enable",
"e",
false,
"Enable deduplication")
set.BoolVarP(
&disable,
"disable",
"d",
false,
"Disable deduplication")
})
vc.EnableOutputFlagSet()
}

func doSetDeduplicationStatus(vc *cmdutils.VerbCmd, enable bool, disable bool) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}
if enable == disable {
msg := "Need to specify either --enable or --disable"
vc.Command.Printf(msg)
return errors.Errorf(msg)
}
var typeStr string
if enable {
typeStr = "Enable"
} else {
typeStr = "Disable"
}
admin := cmdutils.NewPulsarClient()
err = admin.Topics().SetDeduplicationStatus(*topic, enable)
if err == nil {
vc.Command.Printf("%s the deduplication policy successfully for [%s]\n", typeStr, topic.String())
}
return err
}
3 changes: 3 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
GetDispatchRateCmd,
SetDispatchRateCmd,
RemoveDispatchRateCmd,
GetDeduplicationStatusCmd,
SetDeduplicationStatusCmd,
RemoveDeduplicationStatusCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
25 changes: 25 additions & 0 deletions pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ type Topics interface {

// RemoveDispatchRate Remove message dispatch rate for a topic
RemoveDispatchRate(utils.TopicName) error

// GetDeduplicationStatus Get the deduplication policy for a topic
GetDeduplicationStatus(utils.TopicName) (bool, error)

// SetDeduplicationStatus Set the deduplication policy for a topic
SetDeduplicationStatus(utils.TopicName, bool) error

// RemoveDeduplicationStatus Remove the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -519,3 +528,19 @@ func (t *topics) RemoveDispatchRate(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.Delete(endpoint)
}

func (t *topics) GetDeduplicationStatus(topic utils.TopicName) (bool, error) {
var enabled bool
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
err := t.pulsar.Client.Get(endpoint, &enabled)
return enabled, err
}

func (t *topics) SetDeduplicationStatus(topic utils.TopicName, enabled bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.Post(endpoint, enabled)
}
func (t *topics) RemoveDeduplicationStatus(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.Delete(endpoint)
}

0 comments on commit a529093

Please sign in to comment.