Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only update litestream_seq if size is below WAL header size #573

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (db *DB) acquireReadLock() error {
}

// Execute read query to obtain read lock.
if _, err := tx.ExecContext(db.ctx, `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
if _, err := tx.ExecContext(context.Background(), `SELECT COUNT(1) FROM _litestream_seq;`); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed to background context? If the db context gets canceled we'd want the lock to fail fast as well, right?

_ = tx.Rollback()
return err
}
Expand All @@ -649,6 +649,10 @@ func (db *DB) releaseReadLock() error {
// Rollback & clear read transaction.
err := db.rtx.Rollback()
db.rtx = nil

if errors.Is(err, context.Canceled) {
err = nil
}
return err
}

Expand Down Expand Up @@ -693,7 +697,7 @@ func (db *DB) createGeneration() (string, error) {

// Atomically write generation name as current generation.
generationNamePath := db.GenerationNamePath()
mode := os.FileMode(0600)
mode := os.FileMode(0o600)
if db.fileInfo != nil {
mode = db.fileInfo.Mode()
}
Expand Down Expand Up @@ -986,7 +990,7 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
}

// Write header to new WAL shadow file.
mode := os.FileMode(0600)
mode := os.FileMode(0o600)
if fi := db.fileInfo; fi != nil {
mode = fi.Mode()
}
Expand Down Expand Up @@ -1022,7 +1026,7 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
}
origWalSize = frameAlign(fi.Size(), db.pageSize)

w, err := os.OpenFile(filename, os.O_RDWR, 0666)
w, err := os.OpenFile(filename, os.O_RDWR, 0o666)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -1334,7 +1338,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
// a new page is written.
if err := db.execCheckpoint(mode); err != nil {
return err
} else if _, err = db.db.Exec(`INSERT INTO _litestream_seq (id, seq) VALUES (1, 1) ON CONFLICT (id) DO UPDATE SET seq = seq + 1`); err != nil {
} else if err := db.ensureWALExists(); err != nil {
return err
}

Expand Down Expand Up @@ -1424,7 +1428,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {

// Reacquire the read lock immediately after the checkpoint.
if err := db.acquireReadLock(); err != nil {
return fmt.Errorf("release read lock: %w", err)
return fmt.Errorf("acquire read lock: %w", err)
}

return nil
Expand Down
16 changes: 2 additions & 14 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestDB_Sync(t *testing.T) {
shadowWALPath := db.ShadowWALPath(pos0.Generation, pos0.Index)
if buf, err := os.ReadFile(shadowWALPath); err != nil {
t.Fatal(err)
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0600); err != nil {
} else if err := os.WriteFile(shadowWALPath, append(buf[:litestream.WALHeaderSize-8], 0, 0, 0, 0, 0, 0, 0, 0), 0o600); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -552,12 +552,7 @@ func TestDB_Sync(t *testing.T) {
t.Fatal(err)
}

// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
// NOTE: The minimum checkpoint may only do a PASSIVE checkpoint so we can't guarantee a rollover.
})

// Ensure DB checkpoints after interval.
Expand All @@ -581,13 +576,6 @@ func TestDB_Sync(t *testing.T) {
} else if err := db.Sync(context.Background()); err != nil {
t.Fatal(err)
}

// Ensure position is now on the second index.
if pos, err := db.Pos(); err != nil {
t.Fatal(err)
} else if got, want := pos.Index, 1; got != want {
t.Fatalf("Index=%v, want %v", got, want)
}
})
}

Expand Down
16 changes: 9 additions & 7 deletions replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func TestReplica_Sync(t *testing.T) {
t.Fatal(err)
}

if err := db.Checkpoint(context.Background(), litestream.CheckpointModeTruncate); err != nil {
t.Fatal(err)
}

c := file.NewReplicaClient(t.TempDir())
r := litestream.NewReplica(db, "")
c.Replica, r.Client = r, c
Expand Down Expand Up @@ -142,7 +146,7 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := info.Pos(), nextIndex(pos0); got != want {
} else if got, want := info.Pos(), pos0.Truncate(); got != want {
t.Fatalf("pos=%s, want %s", got, want)
}

Expand All @@ -166,20 +170,18 @@ func TestReplica_Snapshot(t *testing.T) {
t.Fatal(err)
} else if info, err := r.Snapshot(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := info.Pos(), nextIndex(pos1); got != want {
} else if got, want := info.Pos(), pos1.Truncate(); got != want {
t.Fatalf("pos=%v, want %v", got, want)
}

// Verify three snapshots exist.
// Verify snapshots exist.
if infos, err := r.Snapshots(context.Background()); err != nil {
t.Fatal(err)
} else if got, want := len(infos), 3; got != want {
} else if got, want := len(infos), 2; got != want {
t.Fatalf("len=%v, want %v", got, want)
} else if got, want := infos[0].Pos(), pos0.Truncate(); got != want {
t.Fatalf("info[0]=%s, want %s", got, want)
} else if got, want := infos[1].Pos(), nextIndex(pos0); got != want {
} else if got, want := infos[1].Pos(), pos1.Truncate(); got != want {
t.Fatalf("info[1]=%s, want %s", got, want)
} else if got, want := infos[2].Pos(), nextIndex(pos1); got != want {
t.Fatalf("info[2]=%s, want %s", got, want)
}
}
Loading