Skip to content

Commit

Permalink
Subject delete markers on purges and removes for filestore (#6428)
Browse files Browse the repository at this point in the history
This PR adds subject delete markers when doing purge/compact/remove
operations for the filestore.

Memstore will be a separate PR but expect it to look a lot like this
one.

Remaining questions before I mark for review:

- [ ] Are we happy with the approach?
- [ ] Does the name `Nats-Applied-Limit` make sense or do we want a more
generic name like `Nats-Marker-Reason`?
- [ ] ... or do we want a separate header for administrative reasons
like `Purge` and `Remove`?

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Jan 30, 2025
2 parents f81f131 + a647835 commit 5fd5409
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 48 deletions.
122 changes: 83 additions & 39 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type fileStore struct {
firstMoved bool
ttls *thw.HashWheel
ttlseq uint64 // How up-to-date is the `ttls` THW?
markers []string
}

// Represents a message store block and its data.
Expand Down Expand Up @@ -2119,7 +2120,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
fs.removePerSubject(subj, false)
}
return true
})
Expand Down Expand Up @@ -2213,17 +2214,7 @@ func (fs *fileStore) expireMsgsOnRecover() error {
// Update fss
// Make sure we have fss loaded.
mb.removeSeqPerSubject(sm.subj, seq)
if fs.removePerSubject(sm.subj) && fs.cfg.SubjectDeleteMarkers {
// Need to release the mb lock here in case we need to write a new
// tombstone into the same mb in subjectDeleteMarkerIfNeeded. However
// at this point fs.mu is held, so nothing else should happen here.
// No need to process the callbacks from subjectDeleteMarkerIfNeeded
// here as none will have been registered yet (we haven't yet returned
// from newFileStore*).
mb.mu.Unlock()
fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge)
mb.mu.Lock()
}
fs.removePerSubject(sm.subj, fs.cfg.SubjectDeleteMarkers && len(getHeader(JSMarkerReason, sm.hdr)) == 0)
}
// Make sure we have a proper next first sequence.
if needNextFirst {
Expand Down Expand Up @@ -2280,6 +2271,9 @@ func (fs *fileStore) expireMsgsOnRecover() error {
fs.psim, fs.tsl = fs.psim.Empty(), 0
}

// If we have pending markers, then create them.
fs.subjectDeleteMarkersAfterOperation(JSMarkerReasonMaxAge)

// If we purged anything, make sure we kick flush state loop.
if purged > 0 {
fs.dirty++
Expand Down Expand Up @@ -4362,7 +4356,7 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {

// Convenience function to remove per subject tracking at the filestore level.
// Lock should be held. Returns if we deleted the last message on the subject.
func (fs *fileStore) removePerSubject(subj string) bool {
func (fs *fileStore) removePerSubject(subj string, marker bool) bool {
if len(subj) == 0 || fs.psim == nil {
return false
}
Expand All @@ -4375,6 +4369,9 @@ func (fs *fileStore) removePerSubject(subj string) bool {
} else if info.total == 0 {
if _, ok = fs.psim.Delete(bsubj); ok {
fs.tsl -= len(subj)
if marker {
fs.markers = append(fs.markers, subj)
}
return true
}
}
Expand Down Expand Up @@ -4491,7 +4488,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (

// If we are tracking multiple subjects here make sure we update that accounting.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
wasLast := fs.removePerSubject(sm.subj, false)

if secure {
// Grab record info.
Expand Down Expand Up @@ -4547,6 +4544,17 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
mb.mu.Unlock()

// If the deleted message was itself a delete marker then
// don't write out more of them or we'll churn endlessly.
var sdmcb func()
if wasLast && len(getHeader(JSMarkerReason, sm.hdr)) == 0 { // Not a marker.
if viaLimits {
sdmcb = fs.subjectDeleteMarkerIfNeeded(sm.subj, JSMarkerReasonMaxAge)
} else {
sdmcb = fs.subjectDeleteMarkerIfNeeded(sm.subj, JSMarkerReasonRemove)
}
}

// If we emptied the current message block and the seq was state.FirstSeq
// then we need to jump message blocks. We will also write the index so
// we don't lose track of the first sequence.
Expand All @@ -4563,16 +4571,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
fs.writeTombstone(sm.seq, sm.ts)
}

if cb := fs.scb; cb != nil {
if cb := fs.scb; cb != nil || sdmcb != nil {
// If we have a callback registered we need to release lock regardless since cb might need it to lookup msg, etc.
fs.mu.Unlock()
// Storage updates.
var subj string
if sm != nil {
subj = sm.subj
if cb != nil {
var subj string
if sm != nil {
subj = sm.subj
}
delta := int64(msz)
cb(-1, -delta, seq, subj)
}
if sdmcb != nil {
sdmcb()
}
delta := int64(msz)
cb(-1, -delta, seq, subj)

if !needFSLock {
fs.mu.Lock()
Expand Down Expand Up @@ -5338,16 +5351,11 @@ func (fs *fileStore) cancelAgeChk() {
// delete marker. If the delete marker is written successfully then
// this function returns a callback func to call scb and sdmcb after
// the lock has been released.
func (fs *fileStore) subjectDeleteMarkerIfNeeded(sm *StoreMsg, reason string) func() {
// If the deleted message was itself a delete marker then
// don't write out more of them or we'll churn endlessly.
if len(getHeader(JSAppliedLimit, sm.hdr)) != 0 {
return nil
}
func (fs *fileStore) subjectDeleteMarkerIfNeeded(subj string, reason string) func() {
if !fs.cfg.SubjectDeleteMarkers {
return nil
}
if _, ok := fs.psim.Find(stringToBytes(sm.subj)); ok {
if _, ok := fs.psim.Find(stringToBytes(subj)); ok {
// There are still messages left with this subject,
// therefore it wasn't the last message deleted.
return nil
Expand All @@ -5361,20 +5369,41 @@ func (fs *fileStore) subjectDeleteMarkerIfNeeded(sm *StoreMsg, reason string) fu
return nil
}
var _hdr [128]byte
hdr := fmt.Appendf(_hdr[:0], "NATS/1.0\r\n%s: %s\r\n%s: %s\r\n\r\n", JSAppliedLimit, reason, JSMessageTTL, time.Duration(ttl)*time.Second)
hdr := fmt.Appendf(_hdr[:0], "NATS/1.0\r\n%s: %s\r\n%s: %s\r\n\r\n", JSMarkerReason, reason, JSMessageTTL, time.Duration(ttl)*time.Second)
seq, ts := fs.state.LastSeq+1, time.Now().UnixNano()
// Store it in the stream and then prepare the callbacks
// to return to the caller.
if err := fs.storeRawMsg(sm.subj, hdr, nil, seq, ts, ttl); err != nil {
if err := fs.storeRawMsg(subj, hdr, nil, seq, ts, ttl); err != nil {
return nil
}
cb, tcb := fs.scb, fs.sdmcb
return func() {
if cb != nil {
cb(1, int64(fileStoreMsgSize(sm.subj, hdr, nil)), seq, sm.subj)
cb(1, int64(fileStoreMsgSize(subj, hdr, nil)), seq, subj)
}
if tcb != nil {
tcb(seq, sm.subj)
tcb(seq, subj)
}
}
}

// Filestore lock must be held but message block locks must not be.
// The caller should call the callback, if non-nil, after releasing
// the filestore lock.
func (fs *fileStore) subjectDeleteMarkersAfterOperation(reason string) func() {
if !fs.cfg.SubjectDeleteMarkers || len(fs.markers) == 0 {
return nil
}
cbs := make([]func(), 0, len(fs.markers))
for _, subject := range fs.markers {
if cb := fs.subjectDeleteMarkerIfNeeded(subject, reason); cb != nil {
cbs = append(cbs, cb)
}
}
fs.markers = nil
return func() {
for _, cb := range cbs {
cb()
}
}
}
Expand Down Expand Up @@ -5404,11 +5433,7 @@ func (fs *fileStore) expireMsgs() {
// if it was the last message of that particular subject that we just deleted.
fs.mu.Lock()
fs.removeMsgViaLimits(sm.seq)
cbs := fs.subjectDeleteMarkerIfNeeded(sm, JSAppliedLimitMaxAge)
fs.mu.Unlock()
if cbs != nil {
cbs()
}
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}
Expand Down Expand Up @@ -7542,7 +7567,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
// PSIM and FSS updates.
mb.removeSeqPerSubject(sm.subj, seq)
fs.removePerSubject(sm.subj)
fs.removePerSubject(sm.subj, fs.cfg.SubjectDeleteMarkers)
// Track tombstones we need to write.
tombs = append(tombs, msgId{sm.seq, sm.ts})

Expand Down Expand Up @@ -7595,11 +7620,15 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++
cb := fs.scb
sdmcb := fs.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge)
fs.mu.Unlock()

if cb != nil {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
if sdmcb != nil {
sdmcb()
}

return purged, nil
}
Expand Down Expand Up @@ -7633,6 +7662,13 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
fs.blks = nil
fs.lmb = nil
fs.bim = make(map[uint32]*msgBlock)
// Subject delete markers if needed.
if fs.cfg.SubjectDeleteMarkers {
fs.psim.Iter(func(subject []byte, _ *psi) bool {
fs.markers = append(fs.markers, string(subject))
return true
})
}
// Clear any per subject tracking.
fs.psim, fs.tsl = fs.psim.Empty(), 0
// Mark dirty.
Expand Down Expand Up @@ -7689,6 +7725,7 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
}

cb := fs.scb
sdmcb := fs.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge)
fs.mu.Unlock()

// Force a new index.db to be written.
Expand All @@ -7699,6 +7736,9 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
if cb != nil {
cb(-int64(purged), -rbytes, 0, _EMPTY_)
}
if sdmcb != nil {
sdmcb()
}

return purged, nil
}
Expand Down Expand Up @@ -7740,7 +7780,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
mb.fss.Iter(func(bsubj []byte, ss *SimpleState) bool {
subj := bytesToString(bsubj)
for i := uint64(0); i < ss.Msgs; i++ {
fs.removePerSubject(subj)
fs.removePerSubject(subj, fs.cfg.SubjectDeleteMarkers)
}
return true
})
Expand Down Expand Up @@ -7786,7 +7826,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
}
// Update fss
smb.removeSeqPerSubject(sm.subj, mseq)
fs.removePerSubject(sm.subj)
fs.removePerSubject(sm.subj, fs.cfg.SubjectDeleteMarkers)
}
}

Expand Down Expand Up @@ -7894,7 +7934,8 @@ SKIP:
// after we release the lock.
os.Remove(filepath.Join(fs.fcfg.StoreDir, msgDir, streamStreamStateFile))
fs.dirty++

// Subject delete markers if needed.
sdmcb := fs.subjectDeleteMarkersAfterOperation(JSMarkerReasonPurge)
cb := fs.scb
fs.mu.Unlock()

Expand All @@ -7906,6 +7947,9 @@ SKIP:
if cb != nil && purged > 0 {
cb(-int64(purged), -int64(bytes), 0, _EMPTY_)
}
if sdmcb != nil {
sdmcb()
}

return purged, err
}
Expand Down
Loading

0 comments on commit 5fd5409

Please sign in to comment.