Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post restore reset #545

Merged
merged 11 commits into from
Mar 17, 2023
8 changes: 8 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,14 @@ func (r *Raft) restoreSnapshot() error {
r.setCommittedConfiguration(conf, index)
r.setLatestConfiguration(conf, index)

// Remove old logs if r.logs is a MonotonicLogStore. Log any errors and
// continue.
if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to reset logs", "error", err)
}
}

banks marked this conversation as resolved.
Show resolved Hide resolved
// Success!
return nil
}
Expand Down
15 changes: 14 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,26 @@ type LogStore interface {
// StoreLog stores a log entry.
StoreLog(log *Log) error

// StoreLogs stores multiple log entries.
// StoreLogs stores multiple log entries. Implementers of StoreLogs with
// guarantees of monotonically increasing sequential indexes should make use
// of the MonotonicLogStore interface.
jmurret marked this conversation as resolved.
Show resolved Hide resolved
StoreLogs(logs []*Log) error

// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}

// MonotonicLogStore is an optional interface for LogStore implementations with
// gapless index requirements. If they return true, the LogStore must have an
// efficient implementation of DeleteLogs, as this called after every snapshot
// restore when gaps are not allowed. We avoid deleting all records for
// LogStores that do not implement MonotonicLogStore because this has a major
// negative performance impact on the BoltDB store that is currently the most
// widely used.
jmurret marked this conversation as resolved.
Show resolved Hide resolved
type MonotonicLogStore interface {
IsMonotonic() bool
}

func oldestLog(s LogStore) (Log, error) {
var l Log

Expand Down
10 changes: 10 additions & 0 deletions log_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ func NewLogCache(capacity int, store LogStore) (*LogCache, error) {
return c, nil
}

// IsMonotonic implements the MonotonicLogStore interface. This is a shim to
// expose the underyling store as monotonically indexed or not.
func (c *LogCache) IsMonotonic() bool {
if store, ok := c.store.(MonotonicLogStore); ok {
return store.IsMonotonic()
}

return false
}

func (c *LogCache) GetLog(idx uint64, log *Log) error {
// Check the buffer for an entry
c.l.RLock()
Expand Down
17 changes: 14 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,13 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
r.setLastApplied(lastIndex)
r.setLastSnapshot(lastIndex, term)

// Remove old logs if r.logs is a MonotonicLogStore. Log any errors and continue.
if logs, ok := r.logs.(MonotonicLogStore); ok && logs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to remove old logs", "error", err)
}
}

r.logger.Info("restored user snapshot", "index", latestIndex)
return nil
}
Expand Down Expand Up @@ -1790,15 +1797,19 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLatestConfiguration(reqConfiguration, reqConfigurationIndex)
r.setCommittedConfiguration(reqConfiguration, reqConfigurationIndex)

// Compact logs, continue even if this fails
if err := r.compactLogs(req.LastLogIndex); err != nil {
// Clear old logs if r.logs is a MonotonicLogStore. Otherwise compact the
// logs. In both cases, log any errors and continue.
if mlogs, ok := r.logs.(MonotonicLogStore); ok && mlogs.IsMonotonic() {
if err := r.removeOldLogs(); err != nil {
r.logger.Error("failed to reset logs", "error", err)
}
} else if err := r.compactLogs(req.LastLogIndex); err != nil {
jmurret marked this conversation as resolved.
Show resolved Hide resolved
r.logger.Error("failed to compact logs", "error", err)
}

r.logger.Info("Installed remote snapshot")
resp.Success = true
r.setLastContact()
return
}

// setLastContact is used to set the last contact time to now
Expand Down
130 changes: 127 additions & 3 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -1019,6 +1020,79 @@ func TestRaft_SnapshotRestore(t *testing.T) {
}
}

func TestRaft_SnapshotRestore_Monotonic(t *testing.T) {
jmurret marked this conversation as resolved.
Show resolved Hide resolved
// Make the cluster
conf := inmemConfig(t)
conf.TrailingLogs = 10
opts := &MakeClusterOpts{
Peers: 1,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
c := MakeClusterCustom(t, opts)
defer c.Close()

leader := c.Leader()

// Commit a lot of things
var future Future
for i := 0; i < 100; i++ {
future = leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
}

// Wait for the last future to apply
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Take a snapshot
snapFuture := leader.Snapshot()
if err := snapFuture.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Check for snapshot
snaps, _ := leader.snapshots.List()
if len(snaps) != 1 {
t.Fatalf("should have a snapshot")
}
snap := snaps[0]

// Logs should be trimmed
if idx, _ := leader.logs.FirstIndex(); idx != snap.Index-conf.TrailingLogs+1 {
t.Fatalf("should trim logs to %d: but is %d", snap.Index-conf.TrailingLogs+1, idx)
}

// Shutdown
shutdown := leader.Shutdown()
if err := shutdown.Error(); err != nil {
t.Fatalf("err: %v", err)
}

// Restart the Raft
r := leader
// Can't just reuse the old transport as it will be closed
_, trans2 := NewInmemTransport(r.trans.LocalAddr())
cfg := r.config()
r, err := NewRaft(&cfg, r.fsm, r.logs, r.stable, r.snapshots, trans2)
if err != nil {
t.Fatalf("err: %v", err)
}
c.rafts[0] = r

// We should have restored from the snapshot!
if last := r.getLastApplied(); last != snap.Index {
t.Fatalf("bad last index: %d, expecting %d", last, snap.Index)
}

// Verify that logs have been reset
first, _ := r.logs.FirstIndex()
last, _ := r.logs.LastIndex()
assert.Zero(t, first)
assert.Zero(t, last)
banks marked this conversation as resolved.
Show resolved Hide resolved
}

func TestRaft_SnapshotRestore_Progress(t *testing.T) {
// Make the cluster
conf := inmemConfig(t)
Expand Down Expand Up @@ -1342,7 +1416,7 @@ func TestRaft_UserSnapshot(t *testing.T) {

// snapshotAndRestore does a snapshot and restore sequence and applies the given
// offset to the snapshot index, so we can try out different situations.
func snapshotAndRestore(t *testing.T, offset uint64) {
func snapshotAndRestore(t *testing.T, offset uint64, monotonicLogStore bool) {
// Make the cluster.
conf := inmemConfig(t)

Expand All @@ -1352,7 +1426,18 @@ func snapshotAndRestore(t *testing.T, offset uint64) {
conf.ElectionTimeout = 500 * time.Millisecond
conf.LeaderLeaseTimeout = 500 * time.Millisecond

c := MakeCluster(3, t, conf)
var c *cluster
if monotonicLogStore {
opts := &MakeClusterOpts{
Peers: 3,
Bootstrap: true,
Conf: conf,
MonotonicLogs: true,
}
c = MakeClusterCustom(t, opts)
} else {
c = MakeCluster(3, t, conf)
}
defer c.Close()

// Wait for things to get stable and commit some things.
Expand Down Expand Up @@ -1448,7 +1533,10 @@ func TestRaft_UserRestore(t *testing.T) {

for _, c := range cases {
t.Run(fmt.Sprintf("case %v", c), func(t *testing.T) {
snapshotAndRestore(t, c)
snapshotAndRestore(t, c, false)
})
t.Run(fmt.Sprintf("monotonic case %v", c), func(t *testing.T) {
snapshotAndRestore(t, c, true)
mpalmi marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
Expand Down Expand Up @@ -2380,6 +2468,7 @@ func TestRaft_LeadershipTransferStopRightAway(t *testing.T) {
t.Errorf("leadership shouldn't have started, but instead it error with: %v", err)
}
}

func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down Expand Up @@ -2417,6 +2506,41 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
}
}

func TestRaft_LogStoreIsMonotonic(t *testing.T) {
c := MakeCluster(1, t, nil)
defer c.Close()

// Should be one leader
leader := c.Leader()
c.EnsureLeader(t, leader.localAddr)

// Test the monotonic type assertion on the InmemStore.
_, ok := leader.logs.(MonotonicLogStore)
assert.False(t, ok)

var log LogStore

// Wrapping the non-monotonic store as a LogCache should make it pass the
// type assertion, but the underlying store is still non-monotonic.
log, _ = NewLogCache(100, leader.logs)
mcast, ok := log.(MonotonicLogStore)
require.True(t, ok)
assert.False(t, mcast.IsMonotonic())

// Now create a new MockMonotonicLogStore using the leader logs and expect
// it to work.
log = &MockMonotonicLogStore{s: leader.logs}
mcast, ok = log.(MonotonicLogStore)
require.True(t, ok)
assert.True(t, mcast.IsMonotonic())

// Wrap the mock logstore in a LogCache and check again.
log, _ = NewLogCache(100, log)
mcast, ok = log.(MonotonicLogStore)
require.True(t, ok)
assert.True(t, mcast.IsMonotonic())
}

func TestRaft_CacheLogWithStoreError(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()
Expand Down
26 changes: 26 additions & 0 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,29 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
}
return nil
}

// removeOldLogs removes all old logs from the store. This is used for
// MonotonicLogStores after restore. Callers should verify that the store
// implementation is monotonic prior to calling.
func (r *Raft) removeOldLogs() error {
defer metrics.MeasureSince([]string{"raft", "removeOldLogs"}, time.Now())

// Determine log ranges to truncate
firstLogIdx, err := r.logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %w", err)
}

lastLogIdx, err := r.logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to get last log index: %w", err)
}

r.logger.Info("removing all old logs from log store", "first", firstLogIdx, "last", lastLogIdx)

if err := r.logs.DeleteRange(firstLogIdx, lastLogIdx); err != nil {
return fmt.Errorf("log truncation failed: %v", err)
}

return nil
}
49 changes: 48 additions & 1 deletion testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,47 @@ func (m *MockSnapshot) Persist(sink SnapshotSink) error {
func (m *MockSnapshot) Release() {
}

// MockMonotonicLogStore is a LogStore wrapper for testing the
// MonotonicLogStore interface.
type MockMonotonicLogStore struct {
s LogStore
}

// IsMonotonic implements the MonotonicLogStore interface.
func (m *MockMonotonicLogStore) IsMonotonic() bool {
return true
}

// FirstIndex implements the LogStore interface.
func (m *MockMonotonicLogStore) FirstIndex() (uint64, error) {
return m.s.FirstIndex()
}

// LastIndex implements the LogStore interface.
func (m *MockMonotonicLogStore) LastIndex() (uint64, error) {
return m.s.LastIndex()
}

// GetLog implements the LogStore interface.
func (m *MockMonotonicLogStore) GetLog(index uint64, log *Log) error {
return m.s.GetLog(index, log)
}

// StoreLog implements the LogStore interface.
func (m *MockMonotonicLogStore) StoreLog(log *Log) error {
return m.s.StoreLog(log)
}

// StoreLogs implements the LogStore interface.
func (m *MockMonotonicLogStore) StoreLogs(logs []*Log) error {
return m.s.StoreLogs(logs)
}

// DeleteRange implements the LogStore interface.
func (m *MockMonotonicLogStore) DeleteRange(min uint64, max uint64) error {
return m.s.DeleteRange(min, max)
}

// This can be used as the destination for a logger and it'll
// map them into calls to testing.T.Log, so that you only see
// the logging for failed tests.
Expand Down Expand Up @@ -673,6 +714,7 @@ type MakeClusterOpts struct {
ConfigStoreFSM bool
MakeFSMFunc func() FSM
LongstopTimeout time.Duration
MonotonicLogs bool
}

// makeCluster will return a cluster with the given config and number of peers.
Expand Down Expand Up @@ -748,11 +790,16 @@ func makeCluster(t *testing.T, opts *MakeClusterOpts) *cluster {
// Create all the rafts
c.startTime = time.Now()
for i := 0; i < opts.Peers; i++ {
logs := c.stores[i]
var logs LogStore
logs = c.stores[i]
store := c.stores[i]
snap := c.snaps[i]
trans := c.trans[i]

if opts.MonotonicLogs {
logs = &MockMonotonicLogStore{s: logs}
}

peerConf := opts.Conf
peerConf.LocalID = configuration.Servers[i].ID
peerConf.Logger = newTestLoggerWithPrefix(t, string(configuration.Servers[i].ID))
Expand Down