Skip to content

Commit

Permalink
feat: add compaction threshold command for topic (streamnative/pulsar…
Browse files Browse the repository at this point in the history
…ctl#430)

### Changes

background from streamnative/pulsarctl#246,  the PR implements the following commands:

- `pulsarctl topics get-compaction-threshold <topic> --applied <bool>` - Get the compaction threshold for a topic
- `pulsarctl topics remove-compaction-threshold <topic>` - Remove the compaction threshold for a topic
- `pulsarctl topics set-compaction-threshold <topic> --threshold <string>` Set the compaction threshold for a topic
  • Loading branch information
nodece authored and tisonkun committed Aug 16, 2023
1 parent 440de49 commit 87aacec
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions pulsaradmin/pkg/pulsar/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ type Topics interface {

// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error

// Get the compaction threshold for a topic
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)

// Set the compaction threshold for a topic
SetCompactionThreshold(topic utils.TopicName, threshold int64) error

// Remove compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error
}

type topics struct {
Expand Down Expand Up @@ -600,3 +609,24 @@ func (t *topics) SetRetention(topic utils.TopicName, data utils.RetentionPolicie
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
return t.pulsar.Client.Post(endpoint, data)
}

func (t *topics) GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error) {
var threshold int64
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
_, err := t.pulsar.Client.GetWithQueryParams(endpoint, &threshold, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return threshold, err
}

func (t *topics) SetCompactionThreshold(topic utils.TopicName, threshold int64) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
err := t.pulsar.Client.Post(endpoint, threshold)
return err
}

func (t *topics) RemoveCompactionThreshold(topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
err := t.pulsar.Client.Delete(endpoint)
return err
}

0 comments on commit 87aacec

Please sign in to comment.