Skip to content

Commit

Permalink
Always snapshot stream when at compaction minimum
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Dec 20, 2024
1 parent 470a7ac commit ffe8f1c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 43 deletions.
7 changes: 3 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2439,16 +2439,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// a complete and detailed state which could be costly in terms of memory, cpu and GC.
// This only entails how many messages, and the first and last sequence of the stream.
// This is all that is needed to detect a change, and we can get this from FilteredState()
// with and empty filter.
// with an empty filter.
var lastState SimpleState
var lastSnapTime time.Time

// Don't allow the upper layer to install snapshots until we have
// fully recovered from disk.
isRecovering := true

doSnapshot := func() {
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
if mset == nil || isRecovering || isRestore {
return
}

Expand All @@ -2466,7 +2465,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}

if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
lastState = curState
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
Expand Down
39 changes: 0 additions & 39 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6945,45 +6945,6 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterDontSnapshotTooOften(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// We force the snapshot compact size to hit multiple times.
// But, we should not be making snapshots too often since that would degrade performance.
data := make([]byte, 1024*1024) // 1MB payload
_, err = crand.Read(data)
require_NoError(t, err)
for i := 0; i < 50; i++ {
// We do synchronous publishes so we're more likely to have entries pass through the apply queue.
_, err = js.Publish("foo", data)
require_NoError(t, err)
}

for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
snap, err := mset.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
// This measure is not exact and more of a side effect.
// We expect one snapshot to be made pretty soon and to be on cooldown after.
// So no snapshots should be made after that.
require_LessThan(t, snap.lastIndex, 20)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit ffe8f1c

Please sign in to comment.