Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync2: multipeer: fix edge cases #6447

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"maps"
"math/rand/v2"
"net/url"
"os"
"strings"
Expand Down Expand Up @@ -223,8 +224,12 @@ type Opt func(c *conf)

// OpenInMemory creates an in-memory database.
func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) {
opts = append(opts, WithConnections(1), withForceFresh())
return Open("file::memory:?mode=memory", opts...)
opts = append(opts, withForceFresh())
// Unique uri is needed to avoid sharing the same in-memory database,
// while allowing multiple connections to the same database.
uri := fmt.Sprintf("file:mem-%d-%d?mode=memory&cache=shared",
rand.Uint64(), rand.Uint64())
return Open(uri, opts...)
}

// InMemory creates an in-memory database for testing and panics if
Expand Down
46 changes: 32 additions & 14 deletions sync2/dbset/dbset.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dbset

import (
"context"
"fmt"
"maps"
"sync"
Expand Down Expand Up @@ -49,7 +50,16 @@ func (d *DBSet) handleIDfromDB(stmt *sql.Statement) bool {
return true
}

// Loaded returns true if the DBSet is loaded.
// Implements rangesync.OrderedSet.
func (d *DBSet) Loaded() bool {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
return d.ft != nil
}

// EnsureLoaded ensures that the DBSet is loaded and ready to be used.
// Implements rangesync.OrderedSet.
func (d *DBSet) EnsureLoaded() error {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
Expand All @@ -65,7 +75,7 @@ func (d *DBSet) EnsureLoaded() error {
if err != nil {
return fmt.Errorf("error loading count: %w", err)
}
d.dbStore = fptree.NewDBBackedStore(d.db, d.snapshot, count, d.keyLen)
d.dbStore = fptree.NewDBBackedStore(d.db, d.snapshot, d.keyLen)
d.ft = fptree.NewFPTree(count, d.dbStore, d.keyLen, d.maxDepth)
return d.snapshot.Load(d.db, d.handleIDfromDB)
}
Expand Down Expand Up @@ -217,28 +227,39 @@ func (d *DBSet) Advance() error {
return d.snapshot.LoadSinceSnapshot(d.db, oldSnapshot, d.handleIDfromDB)
}

// Copy creates a copy of the DBSet.
// WithCopy invokes the specified function, passing it a temporary copy of the DBSet.
// Implements rangesync.OrderedSet.
func (d *DBSet) Copy(syncScope bool) rangesync.OrderedSet {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
if d.ft == nil {
// FIXME
panic("BUG: can't copy the DBItemStore before it's loaded")
func (d *DBSet) WithCopy(ctx context.Context, toCall func(rangesync.OrderedSet) error) error {
if err := d.EnsureLoaded(); err != nil {
return fmt.Errorf("loading DBSet: %w", err)
}
d.loadMtx.Lock()
ft := d.ft.Clone().(*fptree.FPTree)
return &DBSet{
ds := &DBSet{
db: d.db,
ft: ft,
st: d.st,
snapshot: d.snapshot,
keyLen: d.keyLen,
maxDepth: d.maxDepth,
dbStore: d.dbStore,
received: maps.Clone(d.received),
}
d.loadMtx.Unlock()
defer ds.release()
db, ok := d.db.(sql.Database)
if ok {
return db.WithConnection(ctx, func(ex sql.Executor) error {
ds.db = ex
return toCall(ds)
})
} else {
return toCall(ds)
}
}

// Has returns true if the DBSet contains the given item.
// Implements rangesync.OrderedSet.
func (d *DBSet) Has(k rangesync.KeyBytes) (bool, error) {
if err := d.EnsureLoaded(); err != nil {
return false, err
Expand All @@ -258,17 +279,14 @@ func (d *DBSet) Has(k rangesync.KeyBytes) (bool, error) {
}

// Recent returns a sequence of items that have been added to the DBSet since the given time.
// Implements rangesync.OrderedSet.
func (d *DBSet) Recent(since time.Time) (rangesync.SeqResult, int) {
return d.dbStore.Since(make(rangesync.KeyBytes, d.keyLen), since.UnixNano())
}

// Release releases resources associated with the DBSet.
func (d *DBSet) Release() error {
d.loadMtx.Lock()
defer d.loadMtx.Unlock()
func (d *DBSet) release() {
if d.ft != nil {
d.ft.Release()
d.ft = nil
}
return nil
}
189 changes: 107 additions & 82 deletions sync2/dbset/dbset_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package dbset_test

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sync2/dbset"
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
"github.com/spacemeshos/go-spacemesh/sync2/sqlstore"
Expand Down Expand Up @@ -36,7 +38,6 @@ func TestDBSet_Empty(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
empty, err := s.Empty()
require.NoError(t, err)
require.True(t, empty)
Expand Down Expand Up @@ -80,7 +81,6 @@ func TestDBSet(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
firstKey(t, s.Items()).String())
has, err := s.Has(
Expand Down Expand Up @@ -184,7 +184,6 @@ func TestDBSet_Receive(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
firstKey(t, s.Items()).String())

Expand Down Expand Up @@ -216,40 +215,42 @@ func TestDBSet_Copy(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.Equal(t, "0000000000000000000000000000000000000000000000000000000000000000",
firstKey(t, s.Items()).String())

copy := s.Copy(false)

info, err := copy.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

newID := rangesync.MustParseHexKeyBytes("abcdef1234567890000000000000000000000000000000000000000000000000")
require.NoError(t, copy.Receive(newID))

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err := s.Received().FirstN(100)
require.NoError(t, err)
require.Empty(t, items)

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err = copy.(*dbset.DBSet).Received().FirstN(100)
require.NoError(t, err)
require.Equal(t, []rangesync.KeyBytes{newID}, items)
require.NoError(t, s.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

newID := rangesync.MustParseHexKeyBytes(
"abcdef1234567890000000000000000000000000000000000000000000000000")
require.NoError(t, copy.Receive(newID))

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err := s.Received().FirstN(100)
require.NoError(t, err)
require.Empty(t, items)

info, err = s.GetRangeInfo(ids[2], ids[0])
require.NoError(t, err)
require.Equal(t, 2, info.Count)
require.Equal(t, "dddddddddddddddddddddddd", info.Fingerprint.String())
require.Equal(t, ids[2], firstKey(t, info.Items))

items, err = copy.(*dbset.DBSet).Received().FirstN(100)
require.NoError(t, err)
require.Equal(t, []rangesync.KeyBytes{newID}, items)

return nil
}))
}

func TestDBItemStore_Advance(t *testing.T) {
Expand All @@ -259,64 +260,86 @@ func TestDBItemStore_Advance(t *testing.T) {
rangesync.MustParseHexKeyBytes("5555555555555555555555555555555555555555555555555555555555555555"),
rangesync.MustParseHexKeyBytes("8888888888888888888888888888888888888888888888888888888888888888"),
}
db := sqlstore.PopulateDB(t, testKeyLen, ids)

st := &sqlstore.SyncedTable{
TableName: "foo",
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
require.NoError(t, s.EnsureLoaded())

copy := s.Copy(false)
verifyDS := func(db sql.Database, os rangesync.OrderedSet) {
require.NoError(t, os.EnsureLoaded())

info, err := s.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := os.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

sqlstore.InsertDBItems(t, db, []rangesync.KeyBytes{
rangesync.MustParseHexKeyBytes("abcdef1234567890000000000000000000000000000000000000000000000000"),
})
sqlstore.InsertDBItems(t, db, []rangesync.KeyBytes{
rangesync.MustParseHexKeyBytes(
"abcdef1234567890000000000000000000000000000000000000000000000000"),
})

info, err = s.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = os.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

require.NoError(t, s.Advance())
require.NoError(t, os.Advance())

info, err = s.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = os.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
info, err = copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 4, info.Count)
require.Equal(t, "cfe98ba54761032ddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))

info, err = s.Copy(false).GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
return nil
}))

require.NoError(t, os.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
info, err := copy.GetRangeInfo(ids[0], ids[0])
require.NoError(t, err)
require.Equal(t, 5, info.Count)
require.Equal(t, "642464b773377bbddddddddd", info.Fingerprint.String())
require.Equal(t, ids[0], firstKey(t, info.Items))
return nil
}))
}

t.Run("original DBSet", func(t *testing.T) {
db := sqlstore.PopulateDB(t, testKeyLen, ids)
dbSet := dbset.NewDBSet(db, st, testKeyLen, testDepth)
verifyDS(db, dbSet)
})

t.Run("DBSet copy", func(t *testing.T) {
db := sqlstore.PopulateDB(t, testKeyLen, ids)
origSet := dbset.NewDBSet(db, st, testKeyLen, testDepth)
require.NoError(t, origSet.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
verifyDS(db, copy)
return nil
}))
})
}

func TestDBSet_Added(t *testing.T) {
Expand All @@ -333,7 +356,6 @@ func TestDBSet_Added(t *testing.T) {
IDColumn: "id",
}
s := dbset.NewDBSet(db, st, testKeyLen, testDepth)
defer s.Release()
requireEmpty(t, s.Received())

add := []rangesync.KeyBytes{
Expand All @@ -353,7 +375,10 @@ func TestDBSet_Added(t *testing.T) {
rangesync.MustParseHexKeyBytes("4444444444444444444444444444444444444444444444444444444444444444"),
}, added)

added1, err := s.Copy(false).(*dbset.DBSet).Received().FirstN(3)
require.NoError(t, err)
require.ElementsMatch(t, added, added1)
require.NoError(t, s.WithCopy(context.Background(), func(copy rangesync.OrderedSet) error {
added1, err := copy.(*dbset.DBSet).Received().FirstN(3)
require.NoError(t, err)
require.ElementsMatch(t, added, added1)
return nil
}))
}
Loading
Loading