diff --git a/store/heightsub.go b/store/heightsub.go index 2335001d..c312f4d2 100644 --- a/store/heightsub.go +++ b/store/heightsub.go @@ -3,6 +3,7 @@ package store import ( "context" "errors" + "fmt" "sync" "sync/atomic" @@ -28,14 +29,22 @@ func newHeightSub[H header.Header[H]]() *heightSub[H] { } } -// Height reports current height. -func (hs *heightSub[H]) Height() uint64 { - return hs.height.Load() +func (hs *heightSub[H]) isInited() bool { + return hs.height.Load() != 0 } -// SetHeight sets the new head height for heightSub. -func (hs *heightSub[H]) SetHeight(height uint64) { - hs.height.Store(height) +// setHeight sets the new head height for heightSub. +// Only higher than current height can be set. +func (hs *heightSub[H]) setHeight(height uint64) { + for { + curr := hs.height.Load() + if curr >= height { + return + } + if hs.height.CompareAndSwap(curr, height) { + return + } + } } // Sub subscribes for a header of a given height. @@ -43,12 +52,12 @@ func (hs *heightSub[H]) SetHeight(height uint64) { // and caller should get it elsewhere. func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { var zero H - if hs.Height() >= height { + if hs.height.Load() >= height { return zero, errElapsedHeight } hs.heightReqsLk.Lock() - if hs.Height() >= height { + if hs.height.Load() >= height { // This is a rare case we have to account for. // The lock above can park a goroutine long enough for hs.height to change for a requested height, // leaving the request never fulfilled and the goroutine deadlocked. @@ -81,7 +90,7 @@ func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) { // Pub processes all the outstanding subscriptions matching the given headers. // Pub is only safe when called from one goroutine. -// For Pub to work correctly, heightSub has to be initialized with SetHeight +// For Pub to work correctly, heightSub has to be initialized with setHeight // so that given headers are contiguous to the height on heightSub. func (hs *heightSub[H]) Pub(headers ...H) { ln := len(headers) @@ -89,13 +98,12 @@ func (hs *heightSub[H]) Pub(headers ...H) { return } - height := hs.Height() from, to := headers[0].Height(), headers[ln-1].Height() - if height+1 != from && height != 0 { // height != 0 is needed to enable init from any height and not only 1 - log.Fatalf("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d", height+1, from) - return + if from > to { + panic(fmt.Sprintf("from must be lower than to, have: %d and %d", from, to)) } - hs.SetHeight(to) + + hs.setHeight(to) hs.heightReqsLk.Lock() defer hs.heightReqsLk.Unlock() diff --git a/store/heightsub_test.go b/store/heightsub_test.go index 3a48d950..92eecec4 100644 --- a/store/heightsub_test.go +++ b/store/heightsub_test.go @@ -20,7 +20,7 @@ func TestHeightSub(t *testing.T) { { h := headertest.RandDummyHeader(t) h.HeightI = 100 - hs.SetHeight(99) + hs.setHeight(99) hs.Pub(h) h, err := hs.Sub(ctx, 10) @@ -47,6 +47,68 @@ func TestHeightSub(t *testing.T) { } } +// Test heightSub can accept non-adj headers without a problem. +func TestHeightSubNonAdjacement(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.setHeight(99) + hs.Pub(h) + } + + { + go func() { + // fixes flakiness on CI + time.Sleep(time.Millisecond) + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + }() + + h, err := hs.Sub(ctx, 200) + assert.NoError(t, err) + assert.NotNil(t, h) + } +} + +func TestHeightSub_monotonicHeight(t *testing.T) { + hs := newHeightSub[*headertest.DummyHeader]() + + { + h := headertest.RandDummyHeader(t) + h.HeightI = 100 + hs.setHeight(99) + hs.Pub(h) + } + + { + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 200 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 300 + hs.Pub(h1, h2) + } + + { + + h1 := headertest.RandDummyHeader(t) + h1.HeightI = 120 + h2 := headertest.RandDummyHeader(t) + h2.HeightI = 130 + hs.Pub(h1, h2) + } + + assert.Equal(t, hs.height.Load(), uint64(300)) +} + func TestHeightSubCancellation(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/store/store.go b/store/store.go index 6ea126ff..0a0acaae 100644 --- a/store/store.go +++ b/store/store.go @@ -1,9 +1,11 @@ package store import ( + "cmp" "context" "errors" "fmt" + "slices" "sync/atomic" "time" @@ -51,6 +53,7 @@ type Store[H header.Header[H]] struct { writesDn chan struct{} // writeHead maintains the current write head writeHead atomic.Pointer[H] + // pending keeps headers pending to be written in one batch pending *batch[H] @@ -112,7 +115,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store } func (s *Store[H]) Init(ctx context.Context, initial H) error { - if s.heightSub.Height() != 0 { + if s.heightSub.isInited() { return errors.New("store already initialized") } // trust the given header as the initial head @@ -164,27 +167,37 @@ func (s *Store[H]) Stop(ctx context.Context) error { } func (s *Store[H]) Height() uint64 { - return s.heightSub.Height() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + head, err := s.Head(ctx) + if err != nil { + if errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, datastore.ErrNotFound) { + return 0 + } + panic(err) + } + return head.Height() } +// Head returns the highest contiguous header written to the store. func (s *Store[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) { - head, err := s.GetByHeight(ctx, s.heightSub.Height()) - if err == nil { - return head, nil + headPtr := s.writeHead.Load() + if headPtr != nil { + return *headPtr, nil } - var zero H - head, err = s.readHead(ctx) - switch { - default: + head, err := s.readHead(ctx) + if err != nil { + var zero H return zero, err - case errors.Is(err, datastore.ErrNotFound), errors.Is(err, header.ErrNotFound): - return zero, header.ErrNoHead - case err == nil: - s.heightSub.SetHeight(head.Height()) - log.Infow("loaded head", "height", head.Height(), "hash", head.Hash()) - return head, nil } + + s.writeHead.CompareAndSwap(nil, &head) + + return head, nil } func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) { @@ -231,12 +244,16 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) { return h, nil } + return s.getByHeight(ctx, height) +} + +func (s *Store[H]) getByHeight(ctx context.Context, height uint64) (H, error) { + var zero H hash, err := s.heightIndex.HashByHeight(ctx, height) if err != nil { if errors.Is(err, datastore.ErrNotFound) { return zero, header.ErrNotFound } - return zero, err } @@ -300,29 +317,27 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool { return height != uint64(0) && s.Height() >= height } +// Append the given headers to the store. Real write to the disk happens +// asynchronously and might fail without reporting error (just logging). func (s *Store[H]) Append(ctx context.Context, headers ...H) error { lh := len(headers) if lh == 0 { return nil } - var err error // take current write head to verify headers against - var head H - headPtr := s.writeHead.Load() - if headPtr == nil { - head, err = s.Head(ctx) - if err != nil { - return err - } - } else { - head = *headPtr + head, err := s.Head(ctx) + if err != nil { + return err } + slices.SortFunc(headers, func(a, b H) int { + return cmp.Compare(a.Height(), b.Height()) + }) + // collect valid headers verified := make([]H, 0, lh) for i, h := range headers { - err = head.Verify(h) if err != nil { var verErr *header.VerifyError @@ -346,27 +361,19 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error { head = h } - onWrite := func() { - newHead := verified[len(verified)-1] - s.writeHead.Store(&newHead) - log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash()) - s.metrics.newHead(newHead.Height()) - } - // queue headers to be written on disk select { case s.writes <- verified: // we return an error here after writing, // as there might be an invalid header in between of a given range - onWrite() return err default: s.metrics.writesQueueBlocked(ctx) } + // if the writes queue is full, we block until it is not select { case s.writes <- verified: - onWrite() return err case <-s.writesDn: return errStoppedStore @@ -413,6 +420,8 @@ func (s *Store[H]) flushLoop() { time.Sleep(sleep) } + s.tryAdvanceHead(ctx, toFlush...) + s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false) // reset pending s.pending.Reset() @@ -501,6 +510,41 @@ func (s *Store[H]) get(ctx context.Context, hash header.Hash) ([]byte, error) { return data, nil } +// try advance heighest writeHead based on passed or already written headers. +func (s *Store[H]) tryAdvanceHead(ctx context.Context, headers ...H) { + writeHead := s.writeHead.Load() + if writeHead == nil || len(headers) == 0 { + return + } + + currHeight := (*writeHead).Height() + + // advance based on passed headers. + for i := 0; i < len(headers); i++ { + if headers[i].Height() != currHeight+1 { + break + } + newHead := headers[i] + s.writeHead.Store(&newHead) + currHeight++ + } + + // TODO(cristaloleg): benchmark this timeout or make it dynamic. + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // advance based on already written headers. + for { + h, err := s.getByHeight(ctx, currHeight+1) + if err != nil { + break + } + newHead := h + s.writeHead.Store(&newHead) + currHeight++ + } +} + // indexTo saves mapping between header Height and Hash to the given batch. func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, headers ...H) error { for _, h := range headers { diff --git a/store/store_test.go b/store/store_test.go index 53d40d55..7d123cf9 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,7 +1,10 @@ package store import ( + "bytes" "context" + "math/rand" + stdsync "sync" "testing" "time" @@ -20,7 +23,7 @@ func TestStore(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(5)) head, err := store.Head(ctx) require.NoError(t, err) @@ -36,9 +39,12 @@ func TestStore(t *testing.T) { assert.Equal(t, h.Hash(), out[i].Hash()) } - head, err = store.Head(ctx) - require.NoError(t, err) - assert.Equal(t, out[len(out)-1].Hash(), head.Hash()) + // we need to wait for a flush + assert.Eventually(t, func() bool { + head, err = store.Head(ctx) + require.NoError(t, err) + return bytes.Equal(out[len(out)-1].Hash(), head.Hash()) + }, time.Second, 100*time.Millisecond) ok, err := store.Has(ctx, in[5].Hash()) require.NoError(t, err) @@ -141,6 +147,123 @@ func TestStore_Append_BadHeader(t *testing.T) { require.Error(t, err) } +func TestStore_Append(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + const workers = 10 + const chunk = 5 + headers := suite.GenDummyHeaders(workers * chunk) + + errCh := make(chan error, workers) + var wg stdsync.WaitGroup + wg.Add(workers) + + for i := range workers { + go func() { + defer wg.Done() + // make every append happened in random order. + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + + err := store.Append(ctx, headers[i*chunk:(i+1)*chunk]...) + errCh <- err + }() + } + + wg.Wait() + close(errCh) + for err := range errCh { + assert.NoError(t, err) + } + + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + head, err = store.Head(ctx) + assert.NoError(t, err) + assert.Equal(t, head.Height(), headers[len(headers)-1].Height()) + assert.Equal(t, head.Hash(), headers[len(headers)-1].Hash()) +} + +func TestStore_Append_stableHeadWhenGaps(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(4)) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Hash(), suite.Head().Hash()) + + firstChunk := suite.GenDummyHeaders(5) + missedChunk := suite.GenDummyHeaders(5) + lastChunk := suite.GenDummyHeaders(5) + + wantHead := firstChunk[len(firstChunk)-1] + latestHead := lastChunk[len(lastChunk)-1] + + { + err := store.Append(ctx, firstChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is advanced to the last known header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } + { + err := store.Append(ctx, lastChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(100 * time.Millisecond) + + // head is not advanced due to a gap. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), wantHead.Height()) + assert.Equal(t, head.Hash(), wantHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } + { + err := store.Append(ctx, missedChunk...) + require.NoError(t, err) + // wait for batch to be written. + time.Sleep(time.Second) + + // after appending missing headers we're on the latest header. + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, head.Height(), latestHead.Height()) + assert.Equal(t, head.Hash(), latestHead.Hash()) + + // check that store height is aligned with the head. + height := store.Height() + assert.Equal(t, height, head.Height()) + } +} + // TestStore_GetRange tests possible combinations of requests and ensures that // the store can handle them adequately (even malformed requests) func TestStore_GetRange(t *testing.T) { diff --git a/sync/sync_head_test.go b/sync/sync_head_test.go index cc60e481..eecd3a35 100644 --- a/sync/sync_head_test.go +++ b/sync/sync_head_test.go @@ -2,6 +2,7 @@ package sync import ( "context" + "errors" "sync" "sync/atomic" "testing" @@ -121,13 +122,11 @@ func (t *wrappedGetter) Head(ctx context.Context, options ...header.HeadOption[* } func (t *wrappedGetter) Get(ctx context.Context, hash header.Hash) (*headertest.DummyHeader, error) { - // TODO implement me - panic("implement me") + return nil, errors.New("implement me") } func (t *wrappedGetter) GetByHeight(ctx context.Context, u uint64) (*headertest.DummyHeader, error) { - // TODO implement me - panic("implement me") + return nil, errors.New("implement me") } func (t *wrappedGetter) GetRangeByHeight( @@ -135,6 +134,5 @@ func (t *wrappedGetter) GetRangeByHeight( from *headertest.DummyHeader, to uint64, ) ([]*headertest.DummyHeader, error) { - // TODO implement me - panic("implement me") + return nil, errors.New("implement me") } diff --git a/sync/sync_test.go b/sync/sync_test.go index b9acb2d3..78d89425 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -23,14 +23,14 @@ func TestSyncSimpleRequestingHead(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) err := remoteStore.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) _, err = remoteStore.GetByHeight(ctx, 100) require.NoError(t, err) - localStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, @@ -47,19 +47,37 @@ func TestSyncSimpleRequestingHead(t *testing.T) { err = syncer.SyncWait(ctx) require.NoError(t, err) - exp, err := remoteStore.Head(ctx) - require.NoError(t, err) - - have, err := localStore.Head(ctx) - require.NoError(t, err) - assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) - - state := syncer.State() - assert.Equal(t, uint64(exp.Height()), state.Height) - assert.Equal(t, uint64(2), state.FromHeight) - assert.Equal(t, uint64(exp.Height()), state.ToHeight) - assert.True(t, state.Finished(), state) + // force sync to update underlying stores. + syncer.wantSync() + + // we need to wait for a flush + assert.Eventually(t, func() bool { + exp, err := remoteStore.Head(ctx) + require.NoError(t, err) + + have, err := localStore.Head(ctx) + require.NoError(t, err) + + state := syncer.State() + switch { + case exp.Height() != have.Height(): + return false + case syncer.pending.Head() != nil: + return false + + case uint64(exp.Height()) != state.Height: + return false + case uint64(2) != state.FromHeight: + return false + + case uint64(exp.Height()) != state.ToHeight: + return false + case !state.Finished(): + return false + default: + return true + } + }, 2*time.Second, 100*time.Millisecond) } func TestDoSyncFullRangeFromExternalPeer(t *testing.T) { @@ -108,8 +126,8 @@ func TestSyncCatchUp(t *testing.T) { suite := headertest.NewTestSuite(t) head := suite.Head() - remoteStore := newTestStore(t, ctx, head) - localStore := newTestStore(t, ctx, head) + remoteStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) syncer, err := NewSyncer( local.NewExchange(remoteStore), localStore, @@ -138,12 +156,22 @@ func TestSyncCatchUp(t *testing.T) { require.NoError(t, err) // 4. assert syncer caught-up - have, err := localStore.Head(ctx) - require.NoError(t, err) - - assert.Equal(t, have.Height(), incomingHead.Height()) - assert.Equal(t, exp.Height()+1, have.Height()) // plus one as we didn't add last header to remoteStore - assert.Empty(t, syncer.pending.Head()) + // we need to wait for a flush + assert.Eventually(t, func() bool { + have, err := localStore.Head(ctx) + require.NoError(t, err) + + switch { + case have.Height() != incomingHead.Height(): + return false + case exp.Height()+1 != have.Height(): // plus one as we didn't add last header to remoteStore + return false + case syncer.pending.Head() != nil: + return false + default: + return true + } + }, time.Second, 100*time.Millisecond) state := syncer.State() assert.Equal(t, uint64(exp.Height()+1), state.Height) @@ -210,7 +238,7 @@ func TestSyncPendingRangesWithMisses(t *testing.T) { require.NoError(t, err) assert.Equal(t, exp.Height(), have.Height()) - assert.Empty(t, syncer.pending.Head()) // assert all cache from pending is used + assert.Nil(t, syncer.pending.Head()) // assert all cache from pending is used } // TestSyncer_FindHeadersReturnsCorrectRange ensures that `findHeaders` returns @@ -303,7 +331,7 @@ func TestSync_InvalidSyncTarget(t *testing.T) { head := suite.Head() // create a local store which is initialised at genesis height - localStore := newTestStore(t, ctx, head) + localStore := newTestStore(t, ctx, head, store.WithWriteBatchSize(1)) // create a peer which is already on height 100 remoteStore := headertest.NewStore(t, suite, 100) @@ -347,7 +375,14 @@ func TestSync_InvalidSyncTarget(t *testing.T) { // ensure syncer could only sync up to one header below the bad sync target h, err := localStore.Head(ctx) require.NoError(t, err) - require.Equal(t, maliciousHeader.Height()-1, h.Height()) + + // we need to wait for a flush + assert.Eventually(t, func() bool { + h, err = localStore.Head(ctx) + require.NoError(t, err) + + return maliciousHeader.Height()-1 == h.Height() + }, time.Second, 100*time.Millisecond) // manually change bad sync target to a good header in remote peer // store so it can re-serve it to syncer once it re-requests the height @@ -400,7 +435,7 @@ func (d *delayedGetter[H]) GetRangeByHeight(ctx context.Context, from H, to uint } // newTestStore creates initialized and started in memory header Store which is useful for testing. -func newTestStore(tb testing.TB, ctx context.Context, head *headertest.DummyHeader) header.Store[*headertest.DummyHeader] { +func newTestStore(tb testing.TB, ctx context.Context, head *headertest.DummyHeader, opts ...store.Option) header.Store[*headertest.DummyHeader] { ds := sync.MutexWrap(datastore.NewMapDatastore()) - return store.NewTestStore(tb, ctx, ds, head) + return store.NewTestStore(tb, ctx, ds, head, opts...) }