Skip to content

Commit

Permalink
Some updates based on code review
Browse files Browse the repository at this point in the history
- Added non-public stream and consumer configuration options to
achieve the "no subject" and "no interest" capabilities. Had
to implement custom FileStreamInfo and FileConsumerInfo marshal/
unmarshal methods so that those non public fields can be
persisted/recovered properly.
- Restored some of JS original code (since now can use config
instead of passing booleans to the functions).
- Use RLock for deliveryFormsCycle() check (unrelated to MQTT).
- Removed restriction on creating streams with MQTT prefix.
- Preventing API deletion of internal streams and their consumers.
- Added comment on Sublist's ReverseMatch method.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Dec 1, 2020
1 parent 718c995 commit 53d8020
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 108 deletions.
56 changes: 23 additions & 33 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type ConsumerConfig struct {
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`

// These are non public configuration options.
// If you add new options, check fileConsumerInfoJSON in order for them to
// be properly persisted/recovered, if needed.
allowNoInterest bool
}

type CreateConsumerRequest struct {
Expand Down Expand Up @@ -213,13 +218,6 @@ const (
)

func (mset *Stream) AddConsumer(config *ConsumerConfig) (*Consumer, error) {
if name := mset.Name(); strings.HasPrefix(name, mqttStreamNamePrefix) {
return nil, fmt.Errorf("stream prefix %q is reserved for MQTT, unable to create consumer on %q", mqttStreamNamePrefix, name)
}
return mset.addConsumerCheckInterest(config, true)
}

func (mset *Stream) addConsumerCheckInterest(config *ConsumerConfig, checkInterest bool) (*Consumer, error) {
if config == nil {
return nil, fmt.Errorf("consumer config required")
}
Expand Down Expand Up @@ -272,18 +270,23 @@ func (mset *Stream) addConsumerCheckInterest(config *ConsumerConfig, checkIntere

// Make sure any partition subject is also a literal.
if config.FilterSubject != "" {
// If this is a direct match for the streams only subject clear the filter.
var checkSubject bool

mset.mu.RLock()
if len(mset.config.Subjects) == 1 && mset.config.Subjects[0] == config.FilterSubject {
config.FilterSubject = _EMPTY_
// If the stream was created with no subject, then skip the checks
if !mset.config.allowNoSubject {
// If this is a direct match for the streams only subject clear the filter.
if len(mset.config.Subjects) == 1 && mset.config.Subjects[0] == config.FilterSubject {
config.FilterSubject = _EMPTY_
} else {
checkSubject = true
}
}
mset.mu.RUnlock()

if config.FilterSubject != "" {
// Make sure this is a valid partition of the interest subjects.
if !mset.validSubject(config.FilterSubject) {
return nil, fmt.Errorf("consumer filter subject is not a valid subset of the interest subjects")
}
// Make sure this is a valid partition of the interest subjects.
if checkSubject && !mset.validSubject(config.FilterSubject) {
return nil, fmt.Errorf("consumer filter subject is not a valid subset of the interest subjects")
}
}

Expand Down Expand Up @@ -357,7 +360,7 @@ func (mset *Stream) addConsumerCheckInterest(config *ConsumerConfig, checkIntere
} else {
// If we are a push mode and not active and the only difference
// is deliver subject then update and return.
if configsEqualSansDelivery(ocfg, *config) && (!checkInterest || eo.hasNoLocalInterest()) {
if configsEqualSansDelivery(ocfg, *config) && (config.allowNoInterest || eo.hasNoLocalInterest()) {
eo.updateDeliverSubject(config.DeliverSubject)
return eo, nil
} else {
Expand Down Expand Up @@ -2173,8 +2176,8 @@ func (o *Consumer) stop(dflag, doSignal, advisory bool) error {
// Check that we do not form a cycle by delivering to a delivery subject
// that is part of the interest group.
func (mset *Stream) deliveryFormsCycle(deliverySubject string) bool {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.mu.RLock()
defer mset.mu.RUnlock()

for _, subject := range mset.config.Subjects {
if subjectIsSubsetMatch(deliverySubject, subject) {
Expand All @@ -2184,22 +2187,9 @@ func (mset *Stream) deliveryFormsCycle(deliverySubject string) bool {
return false
}

// Check that the subject is a subset of the stream's configured subjects,
// or returns true if the stream has been created with no subject.
// This is same as check for delivery cycle.
func (mset *Stream) validSubject(partitionSubject string) bool {
mset.mu.RLock()
defer mset.mu.RUnlock()

if mset.nosubj && len(mset.config.Subjects) == 0 {
return true
}

for _, subject := range mset.config.Subjects {
if subjectIsSubsetMatch(partitionSubject, subject) {
return true
}
}
return false
return mset.deliveryFormsCycle(partitionSubject)
}

// SetInActiveDeleteThreshold sets the delete threshold for how long to wait
Expand Down
57 changes: 57 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,70 @@ type FileStreamInfo struct {
StreamConfig
}

// Need an alias (which does not have MarshalJSON/UnmarshalJSON) to avoid
// recursive calls which would lead to stack overflow.
type fileStreamInfoAlias FileStreamInfo

// We will use this struct definition to serialize/deserialize FileStreamInfo
// object. This embeds FileStreamInfo (the alias to prevent recursive calls)
// and makes the non-public options public so they can be persisted/recovered.
type fileStreamInfoJSON struct {
fileStreamInfoAlias
Internal bool `json:"internal,omitempty"`
AllowNoSubject bool `json:"allow_no_subject,omitempty"`
}

func (fsi FileStreamInfo) MarshalJSON() ([]byte, error) {
return json.Marshal(&fileStreamInfoJSON{
fileStreamInfoAlias(fsi),
fsi.internal,
fsi.allowNoSubject,
})
}

func (fsi *FileStreamInfo) UnmarshalJSON(b []byte) error {
fsiJSON := &fileStreamInfoJSON{}
if err := json.Unmarshal(b, &fsiJSON); err != nil {
return err
}
*fsi = FileStreamInfo(fsiJSON.fileStreamInfoAlias)
fsi.internal = fsiJSON.Internal
fsi.allowNoSubject = fsiJSON.AllowNoSubject
return nil
}

// File ConsumerInfo is used for creating consumer stores.
type FileConsumerInfo struct {
Created time.Time
Name string
ConsumerConfig
}

// See fileStreamInfoAlias, etc.. for details on how this all work.
type fileConsumerInfoAlias FileConsumerInfo

type fileConsumerInfoJSON struct {
fileConsumerInfoAlias
AllowNoInterest bool `json:"allow_no_interest,omitempty"`
}

func (fci FileConsumerInfo) MarshalJSON() ([]byte, error) {
return json.Marshal(&fileConsumerInfoJSON{
fileConsumerInfoAlias(fci),
fci.allowNoInterest,
})
}

func (fci *FileConsumerInfo) UnmarshalJSON(b []byte) error {
fciJSON := &fileConsumerInfoJSON{}
if err := json.Unmarshal(b, &fciJSON); err != nil {
return err
}
*fci = FileConsumerInfo(fciJSON.fileConsumerInfoAlias)
fci.allowNoInterest = fciJSON.AllowNoInterest
return nil
}

type fileStore struct {
mu sync.RWMutex
state StreamState
Expand Down
11 changes: 3 additions & 8 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,12 +533,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
s.Warnf(" Error adding Stream %q to Template %q: %v", cfg.Name, cfg.Template, err)
}
}
// TODO: We should not rely on the stream name.
// However, having a StreamConfig property, such as AllowNoSubject,
// was not accepted because it does not make sense outside of the
// MQTT use-case. So need to revisit this.
mqtt := cfg.StreamConfig.Name == mqttStreamName
mset, err := a.addStreamWithStore(&cfg.StreamConfig, nil, mqtt)
mset, err := a.AddStream(&cfg.StreamConfig)
if err != nil {
s.Warnf(" Error recreating Stream %q: %v", cfg.Name, err)
continue
Expand Down Expand Up @@ -583,7 +578,7 @@ func (a *Account) EnableJetStream(limits *JetStreamAccountLimits) error {
// the consumer can reconnect. We will create it as a durable and switch it.
cfg.ConsumerConfig.Durable = ofi.Name()
}
obs, err := mset.addConsumerCheckInterest(&cfg.ConsumerConfig, !mqtt)
obs, err := mset.AddConsumer(&cfg.ConsumerConfig)
if err != nil {
s.Warnf(" Error adding Consumer: %v", err)
continue
Expand Down Expand Up @@ -1065,7 +1060,7 @@ func (a *Account) AddStreamTemplate(tc *StreamTemplateConfig) (*StreamTemplate,
// FIXME(dlc) - Hacky
tcopy := tc.deepCopy()
tcopy.Config.Name = "_"
cfg, err := checkStreamCfg(tcopy.Config, false)
cfg, err := checkStreamCfg(tcopy.Config)
if err != nil {
return nil, err
}
Expand Down
18 changes: 14 additions & 4 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,10 +953,10 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, subject, repl
return
}
config := mset.Config()
// MQTT streams are created without subject, but "nats" tooling would then
// fail to display them since it uses validation and expect the config's
// Subjects to not be empty.
if strings.HasPrefix(name, mqttStreamNamePrefix) && len(config.Subjects) == 0 {
// Some streams are created without subject (for instance MQTT streams),
// but "nats" tooling would then fail to display them since it uses
// validation and expect the config's Subjects to not be empty.
if config.allowNoSubject && len(config.Subjects) == 0 {
config.Subjects = []string{">"}
}
resp.StreamInfo = &StreamInfo{Created: mset.Created(), State: mset.State(), Config: config}
Expand Down Expand Up @@ -1006,6 +1006,11 @@ func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, subject, re
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if mset.Config().internal {
resp.Error = &ApiError{Code: 403, Description: "not allowed to delete internal stream"}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if err := mset.Delete(); err != nil {
resp.Error = jsError(err)
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
Expand Down Expand Up @@ -1713,6 +1718,11 @@ func (s *Server) jsConsumerDeleteRequest(sub *subscription, c *client, subject,
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if mset.Config().internal {
resp.Error = &ApiError{Code: 403, Description: "not allowed to delete consumer of internal stream"}
s.sendAPIResponse(c, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
consumer := consumerNameFromSubject(subject)
obs := mset.LookupConsumer(consumer)
if obs == nil {
Expand Down
61 changes: 34 additions & 27 deletions server/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,38 +631,44 @@ func (as *mqttAccountSessionManager) init(acc *Account, c *client) error {
// Start with sessions stream
as.sstream, err = acc.LookupStream(mqttSessionsStreamName)
if err != nil {
as.sstream, err = acc.addStreamWithStore(&StreamConfig{
Subjects: []string{},
Name: mqttSessionsStreamName,
Storage: FileStorage,
Retention: InterestPolicy,
}, nil, true)
as.sstream, err = acc.AddStream(&StreamConfig{
Subjects: []string{},
Name: mqttSessionsStreamName,
Storage: FileStorage,
Retention: InterestPolicy,
internal: true,
allowNoSubject: true,
})
if err != nil {
return fmt.Errorf("unable to create sessions stream for MQTT account %q: %v", acc.GetName(), err)
}
}
// Create the stream for the messages.
as.mstream, err = acc.LookupStream(mqttStreamName)
if err != nil {
as.mstream, err = acc.addStreamWithStore(&StreamConfig{
Subjects: []string{},
Name: mqttStreamName,
Storage: FileStorage,
Retention: InterestPolicy,
}, nil, true)
as.mstream, err = acc.AddStream(&StreamConfig{
Subjects: []string{},
Name: mqttStreamName,
Storage: FileStorage,
Retention: InterestPolicy,
internal: true,
allowNoSubject: true,
})
if err != nil {
return fmt.Errorf("unable to create messages stream for MQTT account %q: %v", acc.GetName(), err)
}
}
// Create the stream for retained messages.
as.rstream, err = acc.LookupStream(mqttRetainedMsgsStreamName)
if err != nil {
as.rstream, err = acc.addStreamWithStore(&StreamConfig{
Subjects: []string{},
Name: mqttRetainedMsgsStreamName,
Storage: FileStorage,
Retention: InterestPolicy,
}, nil, true)
as.rstream, err = acc.AddStream(&StreamConfig{
Subjects: []string{},
Name: mqttRetainedMsgsStreamName,
Storage: FileStorage,
Retention: InterestPolicy,
internal: true,
allowNoSubject: true,
})
if err != nil {
return fmt.Errorf("unable to create retained messages stream for MQTT account %q: %v", acc.GetName(), err)
}
Expand Down Expand Up @@ -2059,15 +2065,16 @@ func (c *client) mqttProcessJSConsumer(sess *mqttSession, stream *Stream, subjec
maxAckPending = mqttDefaultMaxAckPending
}
cc := &ConsumerConfig{
DeliverSubject: inbox,
Durable: durName,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: subject,
AckWait: ackWait,
MaxAckPending: int(maxAckPending),
}
cons, err = stream.addConsumerCheckInterest(cc, false)
DeliverSubject: inbox,
Durable: durName,
AckPolicy: AckExplicit,
DeliverPolicy: DeliverNew,
FilterSubject: subject,
AckWait: ackWait,
MaxAckPending: int(maxAckPending),
allowNoInterest: true,
}
cons, err = stream.AddConsumer(cc)
if err != nil {
c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err)
return nil, nil, err
Expand Down
Loading

0 comments on commit 53d8020

Please sign in to comment.