From e7ff4e1c4c272d49dd70ed46dfc876dfe034b23f Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Mon, 14 Aug 2023 20:32:19 +0700 Subject: [PATCH] fix(share/eds): dagstore shard restore reflection workaround (#2559) Turns out `dagstore` has bug in its restore shard code. For each shard it attempts to create deep copy of registered type, but copies only basic struct fields of the mount type. In out case it means, that it copies a pointer. That causes all shards to point to the same mount (same pointer). Before `dagstore` is fixed to take into account possibility of pointers/interface (and any other reference type) in mount type struct fields, we will use direct copy of `mount.FileMount` instead of `mount.Mount` interface. (cherry picked from commit 003c2c4ab4e75406216982b01af7418182753340) --- nodebuilder/store_test.go | 92 +++++++++++++++++++++++++++++---------- share/eds/store.go | 10 ++--- 2 files changed, 75 insertions(+), 27 deletions(-) diff --git a/nodebuilder/store_test.go b/nodebuilder/store_test.go index 5f2d1bed83..8a39849060 100644 --- a/nodebuilder/store_test.go +++ b/nodebuilder/store_test.go @@ -4,9 +4,8 @@ import ( "context" "strconv" "testing" + "time" - "github.com/ipfs/go-datastore" - ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -68,30 +67,13 @@ func BenchmarkStore(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) b.Cleanup(cancel) - tmpDir := b.TempDir() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - edsStore, err := eds.NewStore(tmpDir, ds) - require.NoError(b, err) - err = edsStore.Start(ctx) - require.NoError(b, err) - // BenchmarkStore/bench_read_128-10 14 78970661 ns/op (~70ms) b.Run("bench put 128", func(b *testing.B) { - b.ResetTimer() dir := b.TempDir() - err := Init(*DefaultConfig(node.Full), dir, node.Full) require.NoError(b, err) - store, err := OpenStore(dir, nil) - require.NoError(b, err) - ds, err := store.Datastore() - require.NoError(b, err) - edsStore, err := eds.NewStore(dir, ds) - require.NoError(b, err) - err = edsStore.Start(ctx) - require.NoError(b, err) - + store := newStore(ctx, b, dir) size := 128 b.Run("enabled eds proof caching", func(b *testing.B) { b.StopTimer() @@ -111,7 +93,7 @@ func BenchmarkStore(b *testing.B) { ctx := ipld.CtxWithProofsAdder(ctx, adder) b.StartTimer() - err = edsStore.Put(ctx, dah.Hash(), eds) + err = store.edsStore.Put(ctx, dah.Hash(), eds) b.StopTimer() require.NoError(b, err) } @@ -126,10 +108,76 @@ func BenchmarkStore(b *testing.B) { require.NoError(b, err) b.StartTimer() - err = edsStore.Put(ctx, dah.Hash(), eds) + err = store.edsStore.Put(ctx, dah.Hash(), eds) b.StopTimer() require.NoError(b, err) } }) }) } + +func TestStoreRestart(t *testing.T) { + const ( + blocks = 5 + size = 32 + ) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + dir := t.TempDir() + err := Init(*DefaultConfig(node.Full), dir, node.Full) + require.NoError(t, err) + + store := newStore(ctx, t, dir) + + hashes := make([][]byte, blocks) + for i := range hashes { + edss := edstest.RandEDS(t, size) + require.NoError(t, err) + dah, err := da.NewDataAvailabilityHeader(edss) + require.NoError(t, err) + err = store.edsStore.Put(ctx, dah.Hash(), edss) + require.NoError(t, err) + + // store hashes for read loop later + hashes[i] = dah.Hash() + } + + // restart store + store.stop(ctx, t) + store = newStore(ctx, t, dir) + + for _, h := range hashes { + edsReader, err := store.edsStore.GetCAR(ctx, h) + require.NoError(t, err) + odsReader, err := eds.ODSReader(edsReader) + require.NoError(t, err) + _, err = eds.ReadEDS(ctx, odsReader, h) + require.NoError(t, err) + } +} + +type store struct { + s Store + edsStore *eds.Store +} + +func newStore(ctx context.Context, t require.TestingT, dir string) store { + s, err := OpenStore(dir, nil) + require.NoError(t, err) + ds, err := s.Datastore() + require.NoError(t, err) + edsStore, err := eds.NewStore(dir, ds) + require.NoError(t, err) + err = edsStore.Start(ctx) + require.NoError(t, err) + return store{ + s: s, + edsStore: edsStore, + } +} + +func (s *store) stop(ctx context.Context, t *testing.T) { + require.NoError(t, s.edsStore.Stop(ctx)) + require.NoError(t, s.s.Close()) +} diff --git a/share/eds/store.go b/share/eds/store.go index 69e3c6b4a4..5a85b9ab55 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -75,7 +75,7 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { } r := mount.NewRegistry() - err = r.Register("fs", &inMemoryOnceMount{Mount: &mount.FileMount{}}) + err = r.Register("fs", &inMemoryOnceMount{}) if err != nil { return nil, fmt.Errorf("failed to register memory mount on the registry: %w", err) } @@ -212,8 +212,8 @@ func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext // save encoded eds into buffer mount := &inMemoryOnceMount{ // TODO: buffer could be pre-allocated with capacity calculated based on eds size. - buf: bytes.NewBuffer(nil), - Mount: &mount.FileMount{Path: s.basepath + blocksPath + key}, + buf: bytes.NewBuffer(nil), + FileMount: mount.FileMount{Path: s.basepath + blocksPath + key}, } err = WriteEDS(ctx, square, mount) if err != nil { @@ -565,7 +565,7 @@ type inMemoryOnceMount struct { buf *bytes.Buffer readOnce atomic.Bool - mount.Mount + mount.FileMount } func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) { @@ -575,7 +575,7 @@ func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) { m.buf = nil return reader, nil } - return m.Mount.Fetch(ctx) + return m.FileMount.Fetch(ctx) } func (m *inMemoryOnceMount) Write(b []byte) (int, error) {