diff --git a/nodebuilder/store_test.go b/nodebuilder/store_test.go index 5f2d1bed83..8a39849060 100644 --- a/nodebuilder/store_test.go +++ b/nodebuilder/store_test.go @@ -4,9 +4,8 @@ import ( "context" "strconv" "testing" + "time" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -68,30 +67,13 @@ func BenchmarkStore(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) b.Cleanup(cancel) - tmpDir := b.TempDir() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - edsStore, err := eds.NewStore(tmpDir, ds) - require.NoError(b, err) - err = edsStore.Start(ctx) - require.NoError(b, err) - // BenchmarkStore/bench_read_128-10 14 78970661 ns/op (~70ms) b.Run("bench put 128", func(b *testing.B) { - b.ResetTimer() dir := b.TempDir() - err := Init(*DefaultConfig(node.Full), dir, node.Full) require.NoError(b, err) - store, err := OpenStore(dir, nil) - require.NoError(b, err) - ds, err := store.Datastore() - require.NoError(b, err) - edsStore, err := eds.NewStore(dir, ds) - require.NoError(b, err) - err = edsStore.Start(ctx) - require.NoError(b, err) - + store := newStore(ctx, b, dir) size := 128 b.Run("enabled eds proof caching", func(b *testing.B) { b.StopTimer() @@ -111,7 +93,7 @@ func BenchmarkStore(b *testing.B) { ctx := ipld.CtxWithProofsAdder(ctx, adder) b.StartTimer() - err = edsStore.Put(ctx, dah.Hash(), eds) + err = store.edsStore.Put(ctx, dah.Hash(), eds) b.StopTimer() require.NoError(b, err) } @@ -126,10 +108,76 @@ func BenchmarkStore(b *testing.B) { require.NoError(b, err) b.StartTimer() - err = edsStore.Put(ctx, dah.Hash(), eds) + err = store.edsStore.Put(ctx, dah.Hash(), eds) b.StopTimer() require.NoError(b, err) } }) }) } + +func TestStoreRestart(t *testing.T) { + const ( + blocks = 5 + size = 32 + ) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + dir := t.TempDir() + err := Init(*DefaultConfig(node.Full), dir, node.Full) + require.NoError(t, err) + + store := newStore(ctx, t, dir) + + hashes := make([][]byte, blocks) + for i := range hashes { + edss := edstest.RandEDS(t, size) + require.NoError(t, err) + dah, err := da.NewDataAvailabilityHeader(edss) + require.NoError(t, err) + err = store.edsStore.Put(ctx, dah.Hash(), edss) + require.NoError(t, err) + + // store hashes for read loop later + hashes[i] = dah.Hash() + } + + // restart store + store.stop(ctx, t) + store = newStore(ctx, t, dir) + + for _, h := range hashes { + edsReader, err := store.edsStore.GetCAR(ctx, h) + require.NoError(t, err) + odsReader, err := eds.ODSReader(edsReader) + require.NoError(t, err) + _, err = eds.ReadEDS(ctx, odsReader, h) + require.NoError(t, err) + } +} + +type store struct { + s Store + edsStore *eds.Store +} + +func newStore(ctx context.Context, t require.TestingT, dir string) store { + s, err := OpenStore(dir, nil) + require.NoError(t, err) + ds, err := s.Datastore() + require.NoError(t, err) + edsStore, err := eds.NewStore(dir, ds) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + return store{ + s: s, + edsStore: edsStore, + } +} + +func (s *store) stop(ctx context.Context, t *testing.T) { + require.NoError(t, s.edsStore.Stop(ctx)) + require.NoError(t, s.s.Close()) +} diff --git a/share/eds/store.go b/share/eds/store.go index 69e3c6b4a4..5a85b9ab55 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -75,7 +75,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { } r := mount.NewRegistry() - err = r.Register("fs", &inMemoryOnceMount{Mount: &mount.FileMount{}}) + err = r.Register("fs", &inMemoryOnceMount{}) if err != nil { return nil, fmt.Errorf("failed to register memory mount on the registry: %w", err) } @@ -212,8 +212,8 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext // save encoded eds into buffer mount := &inMemoryOnceMount{ // TODO: buffer could be pre-allocated with capacity calculated based on eds size. - buf: bytes.NewBuffer(nil), - Mount: &mount.FileMount{Path: s.basepath + blocksPath + key}, + buf: bytes.NewBuffer(nil), + FileMount: mount.FileMount{Path: s.basepath + blocksPath + key}, } err = WriteEDS(ctx, square, mount) if err != nil { @@ -565,7 +565,7 @@ type inMemoryOnceMount struct { buf *bytes.Buffer readOnce atomic.Bool - mount.Mount + mount.FileMount } func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) { @@ -575,7 +575,7 @@ func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) { m.buf = nil return reader, nil } - return m.Mount.Fetch(ctx) + return m.FileMount.Fetch(ctx) } func (m *inMemoryOnceMount) Write(b []byte) (int, error) {