diff --git a/server/filestore.go b/server/filestore.go index 99ea07413c..0aaa1643ab 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -67,6 +67,8 @@ type FileStoreConfig struct { Cipher StoreCipher // Compression is the algorithm to use when compressing. Compression StoreCompression + // HandleStreamCorruption calls the stream corruption handler when internal stream state becomes inconsistent + HandleStreamCorruption bool // Internal reference to our server. srv *Server @@ -172,6 +174,7 @@ type fileStore struct { tombs []uint64 ld *LostStreamData scb StorageUpdateHandler + sscb StreamStateCorruptionHandler ageChk *time.Timer syncTmr *time.Timer cfg FileStreamInfo @@ -197,6 +200,7 @@ type fileStore struct { fip bool receivedAny bool firstMoved bool + corrupt bool } // Represents a message store block and its data. @@ -3502,6 +3506,13 @@ func (fs *fileStore) RegisterStorageUpdates(cb StorageUpdateHandler) { } } +// RegisterStreamStateCorruptionCB registers a callback for stream state corruption +func (fs *fileStore) RegisterStreamStateCorruptionCB(cb StreamStateCorruptionHandler) { + fs.mu.Lock() + fs.sscb = cb + fs.mu.Unlock() +} + // Helper to get hash key for specific message block. // Lock should be held func (fs *fileStore) hashKeyForBlock(index uint32) []byte { @@ -8276,10 +8287,25 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { t := time.NewTicker(writeThreshold + writeJitter) defer t.Stop() + stopStream := func() { + fs.warn("Stopping stream due to internal stream state inconsistency") + // mark fs as corrupt + fs.mu.Lock() + fs.corrupt = true + fs.mu.Unlock() + // Call StreamStateCorruptionHandler + fs.sscb() + } + for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if err == errCorruptState { + if fs.fcfg.HandleStreamCorruption { + stopStream() + } + } case <-qch: return } @@ -8320,6 +8346,11 @@ func (fs *fileStore) _writeFullState(force bool) error { return nil } + if fs.corrupt { + fs.mu.Unlock() + return errCorruptState + } + // For calculating size and checking time costs for non forced calls. numSubjects := fs.numSubjects() diff --git a/server/memstore.go b/server/memstore.go index fcee4ba615..32628c4c08 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -36,6 +36,7 @@ type memStore struct { dmap avl.SequenceSet maxp int64 scb StorageUpdateHandler + sscb StreamStateCorruptionHandler ageChk *time.Timer consumers int receivedAny bool @@ -309,6 +310,13 @@ func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) { ms.mu.Unlock() } +// RegisterStreamStateCorruptionCB registers a callback for stream state corruption +func (ms *memStore) RegisterStreamStateCorruptionCB(cb StreamStateCorruptionHandler) { + ms.mu.Lock() + ms.sscb = cb + ms.mu.Unlock() +} + // GetSeqFromTime looks for the first sequence number that has the message // with >= timestamp. // FIXME(dlc) - inefficient. diff --git a/server/opts.go b/server/opts.go index 172377253e..caccbdc57e 100644 --- a/server/opts.go +++ b/server/opts.go @@ -339,6 +339,7 @@ type Options struct { JetStreamRequestQueueLimit int64 StreamMaxBufferedMsgs int `json:"-"` StreamMaxBufferedSize int64 `json:"-"` + StreamStopOnCorruption bool `json:"-"` StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` SyncAlways bool `json:"-"` @@ -2433,6 +2434,10 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} } opts.StreamMaxBufferedSize = s + case "stream_stop_on_corruption": + if v, ok := mv.(bool); ok { + opts.StreamStopOnCorruption = v + } case "max_buffered_msgs": mlen, ok := mv.(int64) if !ok { diff --git a/server/store.go b/server/store.go index ed972922ff..652e21f25b 100644 --- a/server/store.go +++ b/server/store.go @@ -83,6 +83,9 @@ type StoreMsg struct { // For the cases where its a single message we will also supply sequence number and subject. type StorageUpdateHandler func(msgs, bytes int64, seq uint64, subj string) +// Callback to stop stream when stream state is corrupted +type StreamStateCorruptionHandler func() + type StreamStore interface { StoreMsg(subject string, hdr, msg []byte) (uint64, int64, error) StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64) error @@ -111,6 +114,7 @@ type StreamStore interface { SyncDeleted(dbs DeleteBlocks) Type() StorageType RegisterStorageUpdates(StorageUpdateHandler) + RegisterStreamStateCorruptionCB(StreamStateCorruptionHandler) UpdateConfig(cfg *StreamConfig) error Delete() error Stop() error diff --git a/server/stream.go b/server/stream.go index 84a8a05a42..83bf97bfa0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -711,6 +711,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt fsCfg.SyncInterval = s.getOpts().SyncInterval fsCfg.SyncAlways = s.getOpts().SyncAlways fsCfg.Compression = config.Compression + fsCfg.HandleStreamCorruption = s.getOpts().StreamStopOnCorruption if err := mset.setupStore(fsCfg); err != nil { mset.stop(true, false) @@ -4020,6 +4021,27 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error { } // This will fire the callback but we do not require the lock since md will be 0 here. mset.store.RegisterStorageUpdates(mset.storeUpdates) + // Register the callback to be called when stream state inconsistency is detected during flush + mset.store.RegisterStreamStateCorruptionCB(func() { + ch := make(chan struct{}) + + // Start server go routine to stop stream + s := mset.srv + started := s.startGoRoutine(func() { + defer s.grWG.Done() + select { + case <-ch: + mset.stop(false, false) + case <-s.quitCh: + return + } + }) + // Call server go routine + if started { + ch <- struct{}{} + } + + }) mset.mu.Unlock() return nil