Skip to content

Commit

Permalink
Add command topic max unacked messages per consumer. (streamnative/pu…
Browse files Browse the repository at this point in the history
…lsarctl#246) (apache#358)

Add command topic max unacked messages per consumer:

* pulsarctl topics get-max-unacked-messages-per-consumer [topic]
* pulsarctl topics set-max-unacked-messages-per-consumer [topic] -m [max]
* pulsarctl topics remove-max-unacked-messages-per-consumer [topic]
  • Loading branch information
limingnihao authored and tisonkun committed Aug 15, 2023
1 parent 42e66cd commit 961a513
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ type Topics interface {

// RemoveMaxConsumers Remove max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error

// GetMaxUnackMessagesPerConsumer Get max unacked messages policy on consumer for a topic
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)

// SetMaxUnackMessagesPerConsumer Set max unacked messages policy on consumer for a topic
SetMaxUnackMessagesPerConsumer(utils.TopicName, int) error

// RemoveMaxUnackMessagesPerConsumer Remove max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -389,3 +398,20 @@ func (t *topics) RemoveMaxConsumers(topic utils.TopicName) error {
err := t.pulsar.Client.Delete(endpoint)
return err
}

func (t *topics) GetMaxUnackMessagesPerConsumer(topic utils.TopicName) (int, error) {
var maxNum int
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.Get(endpoint, &maxNum)
return maxNum, err
}

func (t *topics) SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
return t.pulsar.Client.Post(endpoint, &maxUnackedNum)
}

func (t *topics) RemoveMaxUnackMessagesPerConsumer(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
return t.pulsar.Client.Delete(endpoint)
}

0 comments on commit 961a513

Please sign in to comment.