Skip to content

Commit

Permalink
sync2: dbset: use single connection for each sync session (#6446)
Browse files Browse the repository at this point in the history
-------

## Motivation

Using single connection for multiple SQL queries which are executed
during sync avoids noticeable overhead due to SQLite connection pool
delays.
  • Loading branch information
ivan4th committed Dec 4, 2024
1 parent 2ab1adf commit 06f74fa
Show file tree
Hide file tree
Showing 20 changed files with 574 additions and 508 deletions.
8 changes: 6 additions & 2 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"maps"
"math/rand/v2"
"net/url"
"os"
"strings"
Expand Down Expand Up @@ -223,8 +224,11 @@ type Opt func(c *conf)

// OpenInMemory creates an in-memory database.
func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) {
opts = append(opts, WithConnections(1), withForceFresh())
return Open("file::memory:?mode=memory", opts...)
opts = append(opts, withForceFresh())
// Unique uri is needed to avoid sharing the same in-memory database,
// while allowing multiple connections to the same database.
uri := fmt.Sprintf("file:mem-%d?mode=memory&cache=shared", rand.Uint64())
return Open(uri, opts...)
}

// InMemory creates an in-memory database for testing and panics if
Expand Down
46 changes: 32 additions & 14 deletions sync2/dbset/dbset.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dbset

import (
"context"
"fmt"
"maps"
"sync"
Expand Down Expand Up @@ -49,7 +50,16 @@ func (d *DBSet) handleIDfromDB(stmt *sql.Statement) bool {
return true
}

// Loaded returns true if the DBSet is loaded.
// Implements rangesync.OrderedSet.
func (d *DBSet) Loaded() bool {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
return d.ft != nil
}

// EnsureLoaded ensures that the DBSet is loaded and ready to be used.
// Implements rangesync.OrderedSet.
func (d *DBSet) EnsureLoaded() error {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
Expand All @@ -65,7 +75,7 @@ func (d *DBSet) EnsureLoaded() error {
if err != nil {
return fmt.Errorf("error loading count: %w", err)
}
d.dbStore = fptree.NewDBBackedStore(d.db, d.snapshot, count, d.keyLen)
d.dbStore = fptree.NewDBBackedStore(d.db, d.snapshot, d.keyLen)
d.ft = fptree.NewFPTree(count, d.dbStore, d.keyLen, d.maxDepth)
return d.snapshot.Load(d.db, d.handleIDfromDB)
}
Expand Down Expand Up @@ -217,28 +227,39 @@ func (d *DBSet) Advance() error {
return d.snapshot.LoadSinceSnapshot(d.db, oldSnapshot, d.handleIDfromDB)
}

// Copy creates a copy of the DBSet.
// WithCopy invokes the specified function, passing it a temporary copy of the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) Copy(syncScope bool) rangesync.OrderedSet {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
if d.ft == nil {
// FIXME
panic("BUG: can't copy the DBItemStore before it's loaded")
func (d *DBSet) WithCopy(ctx context.Context, toCall func(rangesync.OrderedSet) error) error {
if err := d.EnsureLoaded(); err != nil {
return fmt.Errorf("loading DBSet: %w", err)
}
d.loadMtx.Lock()
ft := d.ft.Clone().(*fptree.FPTree)
return &DBSet{
ds := &DBSet{
db: d.db,
ft: ft,
st: d.st,
snapshot: d.snapshot,
keyLen: d.keyLen,
maxDepth: d.maxDepth,
dbStore: d.dbStore,
received: maps.Clone(d.received),
}
d.loadMtx.Unlock()
defer ds.release()
db, ok := d.db.(sql.Database)
if ok {
return db.WithConnection(ctx, func(ex sql.Executor) error {
ds.db = ex
return toCall(ds)
})
} else {
return toCall(ds)
}
}

// Has returns true if the DBSet contains the given item.
// Implements rangesync.OrderedSet.
func (d *DBSet) Has(k rangesync.KeyBytes) (bool, error) {
if err := d.EnsureLoaded(); err != nil {
return false, err
Expand All @@ -258,17 +279,14 @@ func (d *DBSet) Has(k rangesync.KeyBytes) (bool, error) {
}

// Recent returns a sequence of items that have been added to the DBSet since the given time.
// Implements rangesync.OrderedSet.
func (d *DBSet) Recent(since time.Time) (rangesync.SeqResult, int) {
return d.dbStore.Since(make(rangesync.KeyBytes, d.keyLen), since.UnixNano())
}

// Release releases resources associated with the DBSet.
func (d *DBSet) Release() error {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
func (d *DBSet) release() {
if d.ft != nil {
d.ft.Release()
d.ft = nil
}
return nil
}
189 changes: 107 additions & 82 deletions sync2/dbset/dbset_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package dbset_test

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sync2/dbset"
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
"github.com/spacemeshos/go-spacemesh/sync2/sqlstore"
Expand Down Expand Up @@ -36,7 +38,6 @@ func TestDBSet_Empty(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
empty, err := s.Empty()
require.NoError(t, err)
require.True(t, empty)
Expand Down Expand Up @@ -80,7 +81,6 @@ func TestDBSet(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
firstKey(t, s.Items()).String())
has, err := s.Has(
Expand Down Expand Up @@ -184,7 +184,6 @@ func TestDBSet_Receive(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
firstKey(t, s.Items()).String())

Expand Down Expand Up @@ -216,40 +215,42 @@ func TestDBSet_Copy(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
firstKey(t, s.Items()).String())

copy := s.Copy(false)

info, err := copy.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

newID := rangesync.MustParseHexKeyBytes("abcdef1234567890000000000000000000000000000000000000000000000000")
require.NoError(t, copy.Receive(newID))

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err := s.Received().FirstN(100)
require.NoError(t, err)
require.Empty(t, items)

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err = copy.(*dbset.DBSet).Received().FirstN(100)
require.NoError(t, err)
require.Equal(t, []rangesync.KeyBytes{newID}, items)
require.NoError(t, s.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

newID := rangesync.MustParseHexKeyBytes(
"abcdef1234567890000000000000000000000000000000000000000000000000")
require.NoError(t, copy.Receive(newID))

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err := s.Received().FirstN(100)
require.NoError(t, err)
require.Empty(t, items)

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err = copy.(*dbset.DBSet).Received().FirstN(100)
require.NoError(t, err)
require.Equal(t, []rangesync.KeyBytes{newID}, items)

return nil
}))
}

func TestDBItemStore_Advance(t *testing.T) {
Expand All @@ -259,64 +260,86 @@ func TestDBItemStore_Advance(t *testing.T) {
rangesync.MustParseHexKeyBytes("5555555555555555555555555555555555555555555555555555555555555555"),
rangesync.MustParseHexKeyBytes("8888888888888888888888888888888888888888888888888888888888888888"),
}
db := sqlstore.PopulateDB(t, testKeyLen, ids)

st := &sqlstore.SyncedTable{
TableName: "foo",
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.NoError(t, s.EnsureLoaded())

copy := s.Copy(false)
verifyDS := func(db sql.Database, os rangesync.OrderedSet) {
require.NoError(t, os.EnsureLoaded())

info, err := s.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := os.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

sqlstore.InsertDBItems(t, db, []rangesync.KeyBytes{
rangesync.MustParseHexKeyBytes("abcdef1234567890000000000000000000000000000000000000000000000000"),
})
sqlstore.InsertDBItems(t, db, []rangesync.KeyBytes{
rangesync.MustParseHexKeyBytes(
"abcdef1234567890000000000000000000000000000000000000000000000000"),
})

info, err = s.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = os.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

require.NoError(t, s.Advance())
require.NoError(t, os.Advance())

info, err = s.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = os.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = s.Copy(false).GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
return nil
}))

require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
return nil
}))
}

t.Run("original DBSet", func(t *testing.T) {
db := sqlstore.PopulateDB(t, testKeyLen, ids)
dbSet := dbset.NewDBSet(db, st, testKeyLen, testDepth)
verifyDS(db, dbSet)
})

t.Run("DBSet copy", func(t *testing.T) {
db := sqlstore.PopulateDB(t, testKeyLen, ids)
origSet := dbset.NewDBSet(db, st, testKeyLen, testDepth)
require.NoError(t, origSet.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
verifyDS(db, copy)
return nil
}))
})
}

func TestDBSet_Added(t *testing.T) {
Expand All @@ -333,7 +356,6 @@ func TestDBSet_Added(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
requireEmpty(t, s.Received())

add := []rangesync.KeyBytes{
Expand All @@ -353,7 +375,10 @@ func TestDBSet_Added(t *testing.T) {
rangesync.MustParseHexKeyBytes("4444444444444444444444444444444444444444444444444444444444444444"),
}, added)

added1, err := s.Copy(false).(*dbset.DBSet).Received().FirstN(3)
require.NoError(t, err)
require.ElementsMatch(t, added, added1)
require.NoError(t, s.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
added1, err := copy.(*dbset.DBSet).Received().FirstN(3)
require.NoError(t, err)
require.ElementsMatch(t, added, added1)
return nil
}))
}
Loading

0 comments on commit 06f74fa

Please sign in to comment.