Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop stream when stream state inconsistency is detected during flush #6237

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression
// HandleStreamCorruption calls the stream corruption handler when internal stream state becomes inconsistent
HandleStreamCorruption bool

// Internal reference to our server.
srv *Server
Expand Down Expand Up @@ -172,6 +174,7 @@ type fileStore struct {
tombs []uint64
ld *LostStreamData
scb StorageUpdateHandler
sscb StreamStateCorruptionHandler
ageChk *time.Timer
syncTmr *time.Timer
cfg FileStreamInfo
Expand All @@ -197,6 +200,7 @@ type fileStore struct {
fip bool
receivedAny bool
firstMoved bool
corrupt bool
}

// Represents a message store block and its data.
Expand Down Expand Up @@ -3502,6 +3506,13 @@ func (fs *fileStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
}
}

// RegisterStreamStateCorruptionCB registers a callback for stream state corruption
func (fs *fileStore) RegisterStreamStateCorruptionCB(cb StreamStateCorruptionHandler) {
fs.mu.Lock()
fs.sscb = cb
fs.mu.Unlock()
}

// Helper to get hash key for specific message block.
// Lock should be held
func (fs *fileStore) hashKeyForBlock(index uint32) []byte {
Expand Down Expand Up @@ -8276,10 +8287,25 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
t := time.NewTicker(writeThreshold + writeJitter)
defer t.Stop()

stopStream := func() {
fs.warn("Stopping stream due to internal stream state inconsistency")
// mark fs as corrupt
fs.mu.Lock()
fs.corrupt = true
fs.mu.Unlock()
// Call StreamStateCorruptionHandler
fs.sscb()
}

for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if err == errCorruptState {
if fs.fcfg.HandleStreamCorruption {
stopStream()
}
}
case <-qch:
return
}
Expand Down Expand Up @@ -8320,6 +8346,11 @@ func (fs *fileStore) _writeFullState(force bool) error {
return nil
}

if fs.corrupt {
fs.mu.Unlock()
return errCorruptState
}

// For calculating size and checking time costs for non forced calls.
numSubjects := fs.numSubjects()

Expand Down
8 changes: 8 additions & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type memStore struct {
dmap avl.SequenceSet
maxp int64
scb StorageUpdateHandler
sscb StreamStateCorruptionHandler
ageChk *time.Timer
consumers int
receivedAny bool
Expand Down Expand Up @@ -309,6 +310,13 @@ func (ms *memStore) RegisterStorageUpdates(cb StorageUpdateHandler) {
ms.mu.Unlock()
}

// RegisterStreamStateCorruptionCB registers a callback for stream state corruption
func (ms *memStore) RegisterStreamStateCorruptionCB(cb StreamStateCorruptionHandler) {
ms.mu.Lock()
ms.sscb = cb
ms.mu.Unlock()
}

// GetSeqFromTime looks for the first sequence number that has the message
// with >= timestamp.
// FIXME(dlc) - inefficient.
Expand Down
5 changes: 5 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ type Options struct {
JetStreamRequestQueueLimit int64
StreamMaxBufferedMsgs int `json:"-"`
StreamMaxBufferedSize int64 `json:"-"`
StreamStopOnCorruption bool `json:"-"`
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
Expand Down Expand Up @@ -2433,6 +2434,10 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.StreamMaxBufferedSize = s
case "stream_stop_on_corruption":
if v, ok := mv.(bool); ok {
opts.StreamStopOnCorruption = v
}
case "max_buffered_msgs":
mlen, ok := mv.(int64)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type StoreMsg struct {
// For the cases where its a single message we will also supply sequence number and subject.
type StorageUpdateHandler func(msgs, bytes int64, seq uint64, subj string)

// Callback to stop stream when stream state is corrupted
type StreamStateCorruptionHandler func()

type StreamStore interface {
StoreMsg(subject string, hdr, msg []byte) (uint64, int64, error)
StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64) error
Expand Down Expand Up @@ -111,6 +114,7 @@ type StreamStore interface {
SyncDeleted(dbs DeleteBlocks)
Type() StorageType
RegisterStorageUpdates(StorageUpdateHandler)
RegisterStreamStateCorruptionCB(StreamStateCorruptionHandler)
UpdateConfig(cfg *StreamConfig) error
Delete() error
Stop() error
Expand Down
22 changes: 22 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.SyncInterval = s.getOpts().SyncInterval
fsCfg.SyncAlways = s.getOpts().SyncAlways
fsCfg.Compression = config.Compression
fsCfg.HandleStreamCorruption = s.getOpts().StreamStopOnCorruption

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down Expand Up @@ -4020,6 +4021,27 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
}
// This will fire the callback but we do not require the lock since md will be 0 here.
mset.store.RegisterStorageUpdates(mset.storeUpdates)
// Register the callback to be called when stream state inconsistency is detected during flush
mset.store.RegisterStreamStateCorruptionCB(func() {
ch := make(chan struct{})

// Start server go routine to stop stream
s := mset.srv
started := s.startGoRoutine(func() {
defer s.grWG.Done()
select {
case <-ch:
mset.stop(false, false)
case <-s.quitCh:
return
}
})
// Call server go routine
if started {
ch <- struct{}{}
}

})
mset.mu.Unlock()

return nil
Expand Down
Loading