diff --git a/share/eds/store.go b/share/eds/store.go index 960421c70c..a428424940 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -65,8 +65,8 @@ type Store struct { // lastGCResult is only stored on the store for testing purposes. lastGCResult atomic.Pointer[dagstore.GCResult] - // lock is used to lock parallel operations - lock sync.Map + // stripedLocks is used to synchronize parallel operations + stripedLocks [256]sync.Mutex metrics *metrics } @@ -201,11 +201,9 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext } func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) (err error) { - m, _ := s.lock.LoadOrStore(root.String(), sync.Mutex{}) - lock := m.(sync.Mutex) - lock.Lock() - defer lock.Unlock() - defer s.lock.Delete(root.String()) + lk := &s.stripedLocks[root[len(root)-1]] + lk.Lock() + defer lk.Unlock() // if root already exists, short-circuit if has, _ := s.Has(ctx, root); has { diff --git a/share/eds/store_test.go b/share/eds/store_test.go index 4b263e7062..43116e22d0 100644 --- a/share/eds/store_test.go +++ b/share/eds/store_test.go @@ -2,7 +2,9 @@ package eds import ( "context" + "github.com/filecoin-project/dagstore" "os" + "sync" "testing" "time" @@ -171,6 +173,30 @@ func TestEDSStore(t *testing.T) { assert.Contains(t, hashesOut, hash) } }) + + t.Run("Parallel put", func(t *testing.T) { + const amount = 20 + eds, dah := randomEDS(t) + + wg := sync.WaitGroup{} + for i := 1; i < amount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := edsStore.Put(ctx, dah.Hash(), eds) + if err != nil { + require.ErrorIs(t, err, dagstore.ErrShardExists) + } + }() + } + wg.Wait() + + eds, err := edsStore.Get(ctx, dah.Hash()) + require.NoError(t, err) + newDah, err := da.NewDataAvailabilityHeader(eds) + require.NoError(t, err) + require.Equal(t, dah.Hash(), newDah.Hash()) + }) } // TestEDSStore_GC verifies that unused transient shards are collected by the GC periodically.