Skip to content

Commit

Permalink
Merge pull request #260 from ripienaar/stream_permissions
Browse files Browse the repository at this point in the history
support stream permissions
  • Loading branch information
ripienaar authored Oct 7, 2021
2 parents 764302f + 5af28ce commit 102d011
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 22 deletions.
26 changes: 25 additions & 1 deletion nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestCLIStreamCreate(t *testing.T) {
srv, _, mgr := setupJStreamTest(t)
defer srv.Shutdown()

runNatsCli(t, fmt.Sprintf("--server='%s' str create mem1 --subjects 'js.mem.>,js.other' --storage m --max-msgs-per-subject=10 --max-msgs=-1 --max-age=-1 --max-bytes=-1 --ack --retention limits --max-msg-size=1024 --discard new --dupe-window 1h --replicas 1 --description 'test suite'", srv.ClientURL()))
runNatsCli(t, fmt.Sprintf("--server='%s' str create mem1 --subjects 'js.mem.>,js.other' --storage m --max-msgs-per-subject=10 --max-msgs=-1 --max-age=-1 --max-bytes=-1 --ack --retention limits --max-msg-size=1024 --discard new --dupe-window 1h --replicas 1 --description 'test suite' --allow-rollup --deny-delete --deny-purge", srv.ClientURL()))
streamShouldExist(t, mgr, "mem1")
info := streamInfo(t, mgr, "mem1")

Expand Down Expand Up @@ -222,6 +222,18 @@ func TestCLIStreamCreate(t *testing.T) {
t.Fatalf("expected max messages per subject to be 10 got %d", info.Config.MaxMsgsPer)
}

if !info.Config.RollupAllowed {
t.Fatalf("expected rollups to be allowed")
}

if !info.Config.DenyPurge {
t.Fatalf("expected purge to be denied")
}

if !info.Config.DenyDelete {
t.Fatalf("expected delete to be denied")
}

runNatsCli(t, fmt.Sprintf("--server='%s' str create ORDERS --config testdata/ORDERS_config.json", srv.ClientURL()))
streamShouldExist(t, mgr, "ORDERS")
info = streamInfo(t, mgr, "ORDERS")
Expand All @@ -241,6 +253,18 @@ func TestCLIStreamCreate(t *testing.T) {
if info.Config.Duplicates != time.Hour {
t.Fatalf("expected duplicate window of 1 hour got %v", info.Config.Duplicates)
}

if info.Config.RollupAllowed {
t.Fatalf("expected rollups to be denied")
}

if info.Config.DenyPurge {
t.Fatalf("expected purge to be allowed")
}

if info.Config.DenyDelete {
t.Fatalf("expected delete to be allowed")
}
}

func TestCLIStreamInfo(t *testing.T) {
Expand Down
85 changes: 64 additions & 21 deletions nats/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type streamCmd struct {
purgeSubject string
purgeSequence uint64
description string
allowRollup bool
denyDelete bool
denyPurge bool

vwStartId int
vwStartDelta time.Duration
Expand Down Expand Up @@ -120,7 +123,7 @@ func configureStreamCommand(app *kingpin.Application) {
c := &streamCmd{msgID: -1}

addCreateFlags := func(f *kingpin.CmdClause) {
f.Flag("ack", "Acknowledge publishes").Default("true").BoolVar(&c.ack)
f.Flag("ack", "(--no-ack) Acknowledge publishes").Default("true").BoolVar(&c.ack)
f.Flag("cluster", "Place the stream on a specific cluster").StringVar(&c.placementCluster)
f.Flag("description", "Sets a contextual description for the stream").StringVar(&c.description)
f.Flag("discard", "Defines the discard policy (new, old)").EnumVar(&c.discardPolicy, "new", "old")
Expand All @@ -139,6 +142,9 @@ func configureStreamCommand(app *kingpin.Application) {
f.Flag("storage", "Storage backend to use (file, memory)").EnumVar(&c.storage, "file", "f", "memory", "m")
f.Flag("subjects", "Subjects that are consumed by the Stream").Default().StringsVar(&c.subjects)
f.Flag("tags", "Place the stream on servers that has specific tags").StringsVar(&c.placementTags)
OptionalBoolean(f.Flag("allow-rollup", "(--no-allow-rollup) Allows roll-ups to be done by publishing messages with special headers"))
OptionalBoolean(f.Flag("deny-delete", "(--no-deny-delete) Deny messages from being deleted via the API"))
OptionalBoolean(f.Flag("deny-purge", "(--no-deny-purge) Deny entire stream or subject purges via the API"))
}

str := app.Command("stream", "JetStream Stream management").Alias("str").Alias("st").Alias("ms").Alias("s")
Expand Down Expand Up @@ -655,7 +661,7 @@ func (c *streamCmd) streamTemplateRm(_ *kingpin.ParseContext) error {
}

func (c *streamCmd) streamTemplateAdd(pc *kingpin.ParseContext) (err error) {
cfg := c.prepareConfig()
cfg := c.prepareConfig(pc)

if c.maxStreams == -1 {
err = survey.AskOne(&survey.Input{
Expand Down Expand Up @@ -1052,6 +1058,16 @@ func (c *streamCmd) copyAndEditStream(cfg api.StreamConfig) (api.StreamConfig, e
cfg.Description = c.description
}

if c.allowRollup {
cfg.RollupAllowed = true
}
if c.denyPurge {
cfg.DenyPurge = true
}
if c.denyDelete {
cfg.DenyDelete = true
}

return cfg, nil
}

Expand Down Expand Up @@ -1191,6 +1207,9 @@ func (c *streamCmd) showStreamConfig(cfg api.StreamConfig) {
fmt.Printf(" Replicas: %d\n", cfg.Replicas)
fmt.Printf(" Discard Policy: %s\n", cfg.Discard.String())
fmt.Printf(" Duplicate Window: %v\n", cfg.Duplicates)
fmt.Printf(" Allows Msg Delete: %v\n", !cfg.DenyDelete)
fmt.Printf(" Allows Purge: %v\n", !cfg.DenyPurge)
fmt.Printf(" Allows Rollups: %v\n", cfg.RollupAllowed)
if cfg.MaxMsgs == -1 {
fmt.Println(" Maximum Messages: unlimited")
} else {
Expand All @@ -1204,7 +1223,7 @@ func (c *streamCmd) showStreamConfig(cfg api.StreamConfig) {
} else {
fmt.Printf(" Maximum Bytes: %s\n", humanize.IBytes(uint64(cfg.MaxBytes)))
}
if cfg.MaxAge == -1 {
if cfg.MaxAge <= 0 {
fmt.Println(" Maximum Age: unlimited")
} else {
fmt.Printf(" Maximum Age: %s\n", humanizeDuration(cfg.MaxAge))
Expand Down Expand Up @@ -1459,7 +1478,7 @@ func (c *streamCmd) retentionPolicyFromString() api.RetentionPolicy {
}
}

func (c *streamCmd) prepareConfig() api.StreamConfig {
func (c *streamCmd) prepareConfig(pc *kingpin.ParseContext) api.StreamConfig {
var err error

if c.inputFile != "" {
Expand Down Expand Up @@ -1606,6 +1625,27 @@ func (c *streamCmd) prepareConfig() api.StreamConfig {
kingpin.FatalIfError(err, "invalid duplicate window format")
}

allowRollup := pc.SelectedCommand.GetFlag("allow-rollup").Model().Value.(*OptionalBoolValue)
if !allowRollup.IsSetByUser() {
allow, err := askConfirmation("Allow message Roll-ups", false)
kingpin.FatalIfError(err, "invalid input")
allowRollup.SetBool(allow)
}

denyDelete := pc.SelectedCommand.GetFlag("deny-delete").Model().Value.(*OptionalBoolValue)
if !denyDelete.IsSetByUser() {
allow, err := askConfirmation("Allow message deletion", true)
kingpin.FatalIfError(err, "invalid input")
denyDelete.SetBool(!allow)
}

denyPurge := pc.SelectedCommand.GetFlag("deny-purge").Model().Value.(*OptionalBoolValue)
if !denyPurge.IsSetByUser() {
allow, err := askConfirmation("Allow purging subjects or the entire stream", true)
kingpin.FatalIfError(err, "invalid input")
denyPurge.SetBool(!allow)
}

if c.replicas == 0 {
c.replicas, err = askOneInt("Replicas", "1", "When clustered, defines how many replicas of the data to store. Settable using --replicas.")
kingpin.FatalIfError(err, "invalid input")
Expand All @@ -1615,21 +1655,24 @@ func (c *streamCmd) prepareConfig() api.StreamConfig {
}

cfg := api.StreamConfig{
Name: c.stream,
Description: c.description,
Subjects: c.subjects,
MaxMsgs: c.maxMsgLimit,
MaxMsgsPer: c.maxMsgPerSubjectLimit,
MaxBytes: c.maxBytesLimit,
MaxMsgSize: int32(c.maxMsgSize),
Duplicates: dupeWindow,
MaxAge: maxAge,
Storage: storage,
NoAck: !c.ack,
Retention: c.retentionPolicyFromString(),
Discard: c.discardPolicyFromString(),
MaxConsumers: c.maxConsumers,
Replicas: int(c.replicas),
Name: c.stream,
Description: c.description,
Subjects: c.subjects,
MaxMsgs: c.maxMsgLimit,
MaxMsgsPer: c.maxMsgPerSubjectLimit,
MaxBytes: c.maxBytesLimit,
MaxMsgSize: int32(c.maxMsgSize),
Duplicates: dupeWindow,
MaxAge: maxAge,
Storage: storage,
NoAck: !c.ack,
Retention: c.retentionPolicyFromString(),
Discard: c.discardPolicyFromString(),
MaxConsumers: c.maxConsumers,
Replicas: int(c.replicas),
RollupAllowed: allowRollup.Value(),
DenyPurge: denyPurge.Value(),
DenyDelete: denyDelete.Value(),
}

if c.placementCluster != "" || len(c.placementTags) > 0 {
Expand Down Expand Up @@ -1840,8 +1883,8 @@ func (c *streamCmd) validateCfg(cfg *api.StreamConfig) (bool, []byte, []string,
return valid, j, errs, nil
}

func (c *streamCmd) addAction(_ *kingpin.ParseContext) (err error) {
cfg := c.prepareConfig()
func (c *streamCmd) addAction(pc *kingpin.ParseContext) (err error) {
cfg := c.prepareConfig(pc)

switch {
case c.validateOnly:
Expand Down

0 comments on commit 102d011

Please sign in to comment.