diff --git a/core/core.go b/core/core.go index 2e981299a..78de322f9 100644 --- a/core/core.go +++ b/core/core.go @@ -43,6 +43,7 @@ import ( ) const ( + blockRetentionLimit = 20 ethereumRPCRequestTimeout = 30 * time.Second peerConnectTimeout = 60 * time.Second checkNewAddrInterval = 20 * time.Second @@ -333,7 +334,7 @@ func newWithPrivateConfig(ctx context.Context, config Config, pConfig privateCon Topics: topics, Client: blockWatcherClient, } - blockWatcher := blockwatch.New(blockWatcherConfig) + blockWatcher := blockwatch.New(blockRetentionLimit, blockWatcherConfig) // Initialize the order validator orderValidator, err := ordervalidator.New( diff --git a/db/common.go b/db/common.go index 775cfa1c2..15f41ef89 100644 --- a/db/common.go +++ b/db/common.go @@ -43,6 +43,7 @@ type Database interface { FindMiniHeaders(opts *MiniHeaderQuery) ([]*types.MiniHeader, error) DeleteMiniHeader(hash common.Hash) error DeleteMiniHeaders(opts *MiniHeaderQuery) ([]*types.MiniHeader, error) + ResetMiniHeaders(miniHeaders []*types.MiniHeader) error GetMetadata() (*types.Metadata, error) SaveMetadata(metadata *types.Metadata) error UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMetadata *types.Metadata)) error diff --git a/db/dexie_implementation.go b/db/dexie_implementation.go index 6118540dc..9b1286852 100644 --- a/db/dexie_implementation.go +++ b/db/dexie_implementation.go @@ -241,6 +241,22 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil } +// ResetMiniHeaders deletes all of the existing miniheaders and then stores new +// miniheaders in the database. +func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) { + defer func() { + if r := recover(); r != nil { + err = recoverError(r) + } + }() + jsNewMiniHeaders := dexietypes.MiniHeadersFromCommonType(newMiniHeaders) + _, err = jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("resetMiniHeadersAsync", jsNewMiniHeaders)) + if err != nil { + return convertJSError(err) + } + return nil +} + func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) { defer func() { if r := recover(); r != nil { diff --git a/db/sql_implementation.go b/db/sql_implementation.go index e4310bda2..01fd26425 100644 --- a/db/sql_implementation.go +++ b/db/sql_implementation.go @@ -567,6 +567,29 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi return added, removed, nil } +// ResetMiniHeaders deletes all of the existing miniheaders and then stores new +// miniheaders in the database. +func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) { + defer func() { + err = convertErr(err) + }() + + err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + _, err := txn.DeleteFrom("miniHeaders").ExecContext(db.ctx) + if err != nil { + return err + } + for _, newMiniHeader := range newMiniHeaders { + _, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(newMiniHeader)) + if err != nil { + return err + } + } + return nil + }) + return nil +} + func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) { defer func() { err = convertErr(err) diff --git a/ethereum/blockwatch/block_watcher.go b/ethereum/blockwatch/block_watcher.go index efd30c79e..15e26bf7f 100644 --- a/ethereum/blockwatch/block_watcher.go +++ b/ethereum/blockwatch/block_watcher.go @@ -12,6 +12,7 @@ import ( "github.com/0xProject/0x-mesh/common/types" "github.com/0xProject/0x-mesh/constants" "github.com/0xProject/0x-mesh/db" + "github.com/0xProject/0x-mesh/ethereum/simplestack" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -29,7 +30,7 @@ const rpcClientNotFoundError = "not found" var maxBlocksInGetLogsQuery = 60 // warningLevelErrorMessages are certain blockwatch.Watch errors that we want to report as warnings -// because they do not represent a bug or issue with Mesh and are expected to happen from time to time +// because they do not represent a bug or issue with Mesh and are expected to happen from time to time. var warningLevelErrorMessages = []string{ constants.GethFilterUnknownBlock, rpcClientNotFoundError, @@ -77,7 +78,8 @@ type Config struct { // supplied stack) handling block re-orgs and network disruptions gracefully. It can be started from // any arbitrary block height, and will emit both block added and removed events. type Watcher struct { - stack *Stack + stack *simplestack.SimpleStack + db *db.DB client Client blockFeed event.Feed blockScope event.SubscriptionScope // Subscription scope tracking current live listeners @@ -90,10 +92,11 @@ type Watcher struct { } // New creates a new Watcher instance. -func New(config Config) *Watcher { +func New(retentionLimit int, config Config) *Watcher { return &Watcher{ pollingInterval: config.PollingInterval, - stack: NewStack(config.DB), + db: config.DB, + stack: simplestack.New(retentionLimit, []*types.MiniHeader{}), client: config.Client, withLogs: config.WithLogs, topics: config.Topics, @@ -114,10 +117,8 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int, } w.mu.Unlock() - latestBlockProcessed, err := w.stack.Peek() - if err != nil { - return 0, err - } + latestBlockProcessed := w.stack.Peek() + // No previously stored block so no blocks have elapsed if latestBlockProcessed == nil { return 0, nil @@ -139,13 +140,19 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int, return blocksElapsed, err } if len(events) > 0 { + newMiniHeaders := w.stack.PeekAll() + if err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil { + return blocksElapsed, err + } w.blockFeed.Send(events) } } else { - // Clear all block headers from stack so BlockWatcher starts again from latest block - if err := w.stack.Clear(); err != nil { + // Clear all block headers from stack and database so BlockWatcher + // starts again from latest block + if _, err := w.db.DeleteMiniHeaders(nil); err != nil { return blocksElapsed, err } + w.stack.Clear() } return blocksElapsed, nil @@ -224,25 +231,19 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription { } // SyncToLatestBlock syncs our local state of the chain to the latest block found via -// Ethereum RPC +// Ethereum RPC. func (w *Watcher) SyncToLatestBlock() error { w.syncToLatestBlockMu.Lock() defer w.syncToLatestBlockMu.Unlock() - existingMiniHeaders, err := w.stack.PeekAll() - if err != nil { - return err - } + checkpoint := w.stack.Checkpoint() latestHeader, err := w.client.HeaderByNumber(nil) if err != nil { return err } latestBlockNumber := latestHeader.Number.Int64() - lastStoredHeader, err := w.stack.Peek() - if err != nil { - return err - } + lastStoredHeader := w.stack.Peek() var lastStoredBlockNumber int64 if lastStoredHeader != nil { lastStoredBlockNumber = lastStoredHeader.Number.Int64() @@ -277,11 +278,7 @@ func (w *Watcher) SyncToLatestBlock() error { // stored and fetch it nextHeader := latestHeader if numBlocksToFetch != 1 { - lastStoredHeader, err := w.stack.Peek() - if err != nil { - syncErr = err - break - } + lastStoredHeader := w.stack.Peek() nextBlockNumber := big.NewInt(0).Add(lastStoredHeader.Number, big.NewInt(1)) nextHeader, err = w.client.HeaderByNumber(nextBlockNumber) if err != nil { @@ -302,14 +299,15 @@ func (w *Watcher) SyncToLatestBlock() error { return syncErr } if w.shouldRevertChanges(lastStoredHeader, allEvents) { - // TODO(albrow): Take another look at this. Maybe extract to method. - if err := w.stack.Clear(); err != nil { + if err := w.stack.Reset(checkpoint); err != nil { return err } - if _, _, err := w.stack.db.AddMiniHeaders(existingMiniHeaders); err != nil { + } else { + w.stack.Checkpoint() + newMiniHeaders := w.stack.PeekAll() + if err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil { return err } - } else { w.blockFeed.Send(allEvents) } @@ -328,10 +326,7 @@ func (w *Watcher) shouldRevertChanges(lastStoredHeader *types.MiniHeader, events } func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Event) ([]*Event, error) { - latestHeader, err := w.stack.Peek() - if err != nil { - return nil, err - } + latestHeader := w.stack.Peek() // Is the stack empty or is it the next block? if latestHeader == nil || nextHeader.Parent == latestHeader.Hash { nextHeader, err := w.addLogs(nextHeader) @@ -350,9 +345,7 @@ func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Ev } // Pop latestHeader from the stack. We already have a reference to it. - if _, err := w.stack.Pop(); err != nil { - return events, err - } + w.stack.Pop() events = append(events, &Event{ Type: Removed, BlockHeader: latestHeader, @@ -397,7 +390,7 @@ func (w *Watcher) addLogs(header *types.MiniHeader) (*types.MiniHeader, error) { return header, nil } -// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was +// getMissedEventsToBackfill finds missed events that might have occurred while the Mesh node was // offline. It does this by comparing the last block stored with the latest block discoverable via RPC. // If the stored block is older then the latest block, it batch fetches the events for missing blocks, // re-sets the stored blocks and returns the block events found. @@ -411,9 +404,8 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, blocksElapsed i // If we have processed blocks further then the latestRetainedBlock in the DB, we // want to remove all blocks from the DB and insert the furthestBlockProcessed // Doing so will cause the BlockWatcher to start from that furthestBlockProcessed. - if err := w.stack.Clear(); err != nil { - return events, err - } + w.stack.Clear() + // Add furthest block processed into the DB latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed))) if err != nil { @@ -470,7 +462,7 @@ type logRequestResult struct { Err error } -// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch +// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch. const getLogsRequestChunkSize = 3 // getLogsInBlockRange attempts to fetch all logs in the block range supplied. It implements a @@ -486,7 +478,7 @@ func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]etht for len(blockRanges) != 0 { var chunk []*blockRange if len(blockRanges) < getLogsRequestChunkSize { - chunk = blockRanges[:len(blockRanges)] + chunk = blockRanges } else { chunk = blockRanges[:getLogsRequestChunkSize] } @@ -586,38 +578,16 @@ type blockRange struct { // blocks' logs twice. func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange { chunks := []*blockRange{} - numBlocksLeft := to - from - if numBlocksLeft < rangeSize { + for ; from+rangeSize-1 < to; from += rangeSize { chunks = append(chunks, &blockRange{ FromBlock: from, - ToBlock: to, + ToBlock: from + rangeSize - 1, }) - } else { - blocks := []int{} - for i := 0; i <= numBlocksLeft; i++ { - blocks = append(blocks, from+i) - } - numChunks := len(blocks) / rangeSize - remainder := len(blocks) % rangeSize - if remainder > 0 { - numChunks = numChunks + 1 - } - - for i := 0; i < numChunks; i = i + 1 { - fromIndex := i * rangeSize - toIndex := fromIndex + rangeSize - if toIndex > len(blocks) { - toIndex = len(blocks) - } - bs := blocks[fromIndex:toIndex] - blockRange := &blockRange{ - FromBlock: bs[0], - ToBlock: bs[len(bs)-1], - } - chunks = append(chunks, blockRange) - } } - return chunks + return append(chunks, &blockRange{ + FromBlock: from, + ToBlock: to, + }) } const infuraTooManyResultsErrMsg = "query returned more than 10000 results" @@ -627,7 +597,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) ( "from": from, "to": to, }).Trace("Fetching block logs") - numBlocks := to - from + numBlocks := to - from + 1 topics := [][]common.Hash{} if len(w.topics) > 0 { topics = append(topics, w.topics) @@ -650,14 +620,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) ( return allLogs, fmt.Errorf("Unable to get the logs for block #%d, because it contains too many logs", from) } - r := numBlocks % 2 - firstBatchSize := numBlocks / 2 - secondBatchSize := firstBatchSize - if r == 1 { - secondBatchSize = secondBatchSize + 1 - } - - endFirstHalf := from + firstBatchSize + endFirstHalf := from + numBlocks/2 startSecondHalf := endFirstHalf + 1 allLogs, err := w.filterLogsRecurisively(from, endFirstHalf, allLogs) if err != nil { @@ -676,11 +639,6 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) ( return allLogs, nil } -// getAllRetainedBlocks returns the blocks retained in-memory by the Watcher. -func (w *Watcher) getAllRetainedBlocks() ([]*types.MiniHeader, error) { - return w.stack.PeekAll() -} - func isWarning(err error) bool { message := err.Error() for _, warningLevelErrorMessage := range warningLevelErrorMessages { diff --git a/ethereum/blockwatch/block_watcher_test.go b/ethereum/blockwatch/block_watcher_test.go index db3fe925e..647c4fc3f 100644 --- a/ethereum/blockwatch/block_watcher_test.go +++ b/ethereum/blockwatch/block_watcher_test.go @@ -25,30 +25,20 @@ var config = Config{ Topics: []common.Hash{}, } -var ( +const ( basicFakeClientFixture = "testdata/fake_client_basic_fixture.json" + blockRetentionLimit = 10 ) func dbOptions() *db.Options { options := db.TestOptions() - // For the block watcher tests we set MaxMiniHeaders to 10. - options.MaxMiniHeaders = 10 return options } func TestWatcher(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) - fakeClient, err := newFakeClient("testdata/fake_client_block_poller_fixtures.json") - require.NoError(t, err) - - // Polling interval unused because we hijack the ticker for this test - require.NoError(t, err) - config.Client = fakeClient - config.DB = database - watcher := New(config) + fakeClient, watcher := setupFakeClientAndOrderWatcher(t, ctx, "testdata/fake_client_block_poller_fixtures.json") // Having a buffer of 1 unblocks the below for-loop without resorting to a goroutine events := make(chan []*Event, 1) @@ -57,15 +47,14 @@ func TestWatcher(t *testing.T) { for i := 0; i < fakeClient.NumberOfTimesteps(); i++ { scenarioLabel := fakeClient.GetScenarioLabel() - err = watcher.SyncToLatestBlock() + err := watcher.SyncToLatestBlock() if strings.HasPrefix(scenarioLabel, "ERROR") { require.Error(t, err) } else { require.NoError(t, err) } - retainedBlocks, err := watcher.getAllRetainedBlocks() - require.NoError(t, err) + retainedBlocks := watcher.stack.PeekAll() expectedRetainedBlocks := fakeClient.ExpectedRetainedBlocks() assert.Equal(t, expectedRetainedBlocks, retainedBlocks, fmt.Sprintf("%s (timestep: %d)", scenarioLabel, i)) @@ -91,15 +80,7 @@ func TestWatcher(t *testing.T) { func TestWatcherStartStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) - fakeClient, err := newFakeClient(basicFakeClientFixture) - require.NoError(t, err) - - require.NoError(t, err) - config.Client = fakeClient - config.DB = database - watcher := New(config) + _, watcher := setupFakeClientAndOrderWatcher(t, ctx, basicFakeClientFixture) // Start the watcher in a goroutine. We use the done channel to signal when // watcher.Watch returns. @@ -196,13 +177,7 @@ func TestGetSubBlockRanges(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) - fakeClient, err := newFakeClient(basicFakeClientFixture) - require.NoError(t, err) - config.Client = fakeClient - config.DB = database - watcher := New(config) + _, watcher := setupFakeClientAndOrderWatcher(t, ctx, basicFakeClientFixture) for _, testCase := range testCases { blockRanges := watcher.getSubBlockRanges(testCase.from, testCase.to, rangeSize) @@ -213,13 +188,10 @@ func TestGetSubBlockRanges(t *testing.T) { func TestFastSyncToLatestBlockLessThan128Missed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) + // Fixture will return block 132 as the tip of the chain (127 blocks from block 5) - fakeClient, err := newFakeClient("testdata/fake_client_fast_sync_fixture.json") - require.NoError(t, err) + _, watcher := setupFakeClientAndOrderWatcher(t, ctx, "testdata/fake_client_fast_sync_fixture.json") - require.NoError(t, err) // Add block number 5 as the last block seen by BlockWatcher lastBlockSeen := &types.MiniHeader{ Number: big.NewInt(5), @@ -227,11 +199,7 @@ func TestFastSyncToLatestBlockLessThan128Missed(t *testing.T) { Parent: common.HexToHash("0x26b13ac89500f7fcdd141b7d1b30f3a82178431eca325d1cf10998f9d68ff5ba"), Timestamp: time.Now(), } - - config.DB = database - config.Client = fakeClient - watcher := New(config) - err = watcher.stack.Push(lastBlockSeen) + err := watcher.stack.Push(lastBlockSeen) require.NoError(t, err) blocksElapsed, err := watcher.FastSyncToLatestBlock(ctx) @@ -239,8 +207,7 @@ func TestFastSyncToLatestBlockLessThan128Missed(t *testing.T) { assert.Equal(t, 127, blocksElapsed) // Check that block 132 is now in the DB, and block 5 was removed. - headers, err := watcher.stack.PeekAll() - require.NoError(t, err) + headers := watcher.stack.PeekAll() require.Len(t, headers, 1) assert.Equal(t, big.NewInt(132), headers[0].Number) } @@ -248,13 +215,10 @@ func TestFastSyncToLatestBlockLessThan128Missed(t *testing.T) { func TestFastSyncToLatestBlockMoreThanOrExactly128Missed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) + // Fixture will return block 133 as the tip of the chain (128 blocks from block 5) - fakeClient, err := newFakeClient("testdata/fake_client_reset_fixture.json") - require.NoError(t, err) + _, watcher := setupFakeClientAndOrderWatcher(t, ctx, "testdata/fake_client_reset_fixture.json") - require.NoError(t, err) // Add block number 5 as the last block seen by BlockWatcher lastBlockSeen := &types.MiniHeader{ Number: big.NewInt(5), @@ -262,11 +226,7 @@ func TestFastSyncToLatestBlockMoreThanOrExactly128Missed(t *testing.T) { Parent: common.HexToHash("0x26b13ac89500f7fcdd141b7d1b30f3a82178431eca325d1cf10998f9d68ff5ba"), Timestamp: time.Now(), } - - config.DB = database - config.Client = fakeClient - watcher := New(config) - err = watcher.stack.Push(lastBlockSeen) + err := watcher.stack.Push(lastBlockSeen) require.NoError(t, err) blocksElapsed, err := watcher.FastSyncToLatestBlock(ctx) @@ -274,21 +234,17 @@ func TestFastSyncToLatestBlockMoreThanOrExactly128Missed(t *testing.T) { assert.Equal(t, 128, blocksElapsed) // Check that all blocks have been removed from BlockWatcher - headers, err := watcher.stack.PeekAll() - require.NoError(t, err) + headers := watcher.stack.PeekAll() require.Len(t, headers, 0) } func TestFastSyncToLatestBlockNoneMissed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) + // Fixture will return block 5 as the tip of the chain - fakeClient, err := newFakeClient("testdata/fake_client_basic_fixture.json") - require.NoError(t, err) + _, watcher := setupFakeClientAndOrderWatcher(t, ctx, "testdata/fake_client_basic_fixture.json") - require.NoError(t, err) // Add block number 5 as the last block seen by BlockWatcher lastBlockSeen := &types.MiniHeader{ Number: big.NewInt(5), @@ -296,12 +252,7 @@ func TestFastSyncToLatestBlockNoneMissed(t *testing.T) { Parent: common.HexToHash("0x26b13ac89500f7fcdd141b7d1b30f3a82178431eca325d1cf10998f9d68ff5ba"), Timestamp: time.Now(), } - - config.DB = database - config.Client = fakeClient - watcher := New(config) - - err = watcher.stack.Push(lastBlockSeen) + err := watcher.stack.Push(lastBlockSeen) require.NoError(t, err) blocksElapsed, err := watcher.FastSyncToLatestBlock(ctx) @@ -309,8 +260,7 @@ func TestFastSyncToLatestBlockNoneMissed(t *testing.T) { assert.Equal(t, blocksElapsed, 0) // Check that block 5 is still in the DB - headers, err := watcher.stack.PeekAll() - require.NoError(t, err) + headers := watcher.stack.PeekAll() require.Len(t, headers, 1) assert.Equal(t, big.NewInt(5), headers[0].Number) } @@ -347,7 +297,7 @@ func TestFilterLogsRecursively(t *testing.T) { { Label: "HAPPY_PATH", rangeToFilterLogsResponse: map[string]filterLogsResponse{ - "10-20": filterLogsResponse{ + "10-20": { Logs: []ethtypes.Log{ logStub, }, @@ -432,13 +382,9 @@ func TestFilterLogsRecursively(t *testing.T) { for _, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) fakeLogClient, err := newFakeLogClient(testCase.rangeToFilterLogsResponse) require.NoError(t, err) - config.Client = fakeLogClient - config.DB = database - watcher := New(config) + watcher := setupOrderWatcher(t, ctx, fakeLogClient) logs, err := watcher.filterLogsRecurisively(from, to, []ethtypes.Log{}) require.Equal(t, testCase.Err, err, testCase.Label) @@ -460,12 +406,12 @@ func TestGetLogsInBlockRange(t *testing.T) { from := 10 to := 20 testCases := []logsInBlockRangeTestCase{ - logsInBlockRangeTestCase{ + { Label: "HAPPY_PATH", From: from, To: to, RangeToFilterLogsResponse: map[string]filterLogsResponse{ - aRange(from, to): filterLogsResponse{ + aRange(from, to): { Logs: []ethtypes.Log{ logStub, }, @@ -474,17 +420,17 @@ func TestGetLogsInBlockRange(t *testing.T) { Logs: []ethtypes.Log{logStub}, FurthestBlockProcessed: to, }, - logsInBlockRangeTestCase{ + { Label: "SPLIT_REQUEST_BY_MAX_BLOCKS_IN_QUERY", From: from, To: from + maxBlocksInGetLogsQuery + 10, RangeToFilterLogsResponse: map[string]filterLogsResponse{ - aRange(from, from+maxBlocksInGetLogsQuery-1): filterLogsResponse{ + aRange(from, from+maxBlocksInGetLogsQuery-1): { Logs: []ethtypes.Log{ logStub, }, }, - aRange(from+maxBlocksInGetLogsQuery, from+maxBlocksInGetLogsQuery+10): filterLogsResponse{ + aRange(from+maxBlocksInGetLogsQuery, from+maxBlocksInGetLogsQuery+10): { Logs: []ethtypes.Log{ logStub, }, @@ -493,22 +439,22 @@ func TestGetLogsInBlockRange(t *testing.T) { Logs: []ethtypes.Log{logStub, logStub}, FurthestBlockProcessed: from + maxBlocksInGetLogsQuery + 10, }, - logsInBlockRangeTestCase{ + { Label: "SHORT_CIRCUIT_SEMAPHORE_BLOCKED_REQUESTS_ON_ERROR", From: from, To: from + (maxBlocksInGetLogsQuery * (getLogsRequestChunkSize + 1)), RangeToFilterLogsResponse: map[string]filterLogsResponse{ // Same number of responses as the getLogsRequestChunkSize since the // error response will stop any further requests. - aRange(from, from+maxBlocksInGetLogsQuery-1): filterLogsResponse{ + aRange(from, from+maxBlocksInGetLogsQuery-1): { Err: errUnexpected, }, - aRange(from+maxBlocksInGetLogsQuery, from+(maxBlocksInGetLogsQuery*2)-1): filterLogsResponse{ + aRange(from+maxBlocksInGetLogsQuery, from+(maxBlocksInGetLogsQuery*2)-1): { Logs: []ethtypes.Log{ logStub, }, }, - aRange(from+(maxBlocksInGetLogsQuery*2), from+(maxBlocksInGetLogsQuery*3)-1): filterLogsResponse{ + aRange(from+(maxBlocksInGetLogsQuery*2), from+(maxBlocksInGetLogsQuery*3)-1): { Logs: []ethtypes.Log{ logStub, }, @@ -517,17 +463,17 @@ func TestGetLogsInBlockRange(t *testing.T) { Logs: []ethtypes.Log{}, FurthestBlockProcessed: from - 1, }, - logsInBlockRangeTestCase{ + { Label: "CORRECT_FURTHEST_BLOCK_PROCESSED_ON_ERROR", From: from, To: from + maxBlocksInGetLogsQuery + 10, RangeToFilterLogsResponse: map[string]filterLogsResponse{ - aRange(from, from+maxBlocksInGetLogsQuery-1): filterLogsResponse{ + aRange(from, from+maxBlocksInGetLogsQuery-1): { Logs: []ethtypes.Log{ logStub, }, }, - aRange(from+maxBlocksInGetLogsQuery, from+maxBlocksInGetLogsQuery+10): filterLogsResponse{ + aRange(from+maxBlocksInGetLogsQuery, from+maxBlocksInGetLogsQuery+10): { Err: errUnexpected, }}, Logs: []ethtypes.Log{logStub}, @@ -538,13 +484,9 @@ func TestGetLogsInBlockRange(t *testing.T) { for _, testCase := range testCases { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - database, err := db.New(ctx, dbOptions()) - require.NoError(t, err) fakeLogClient, err := newFakeLogClient(testCase.RangeToFilterLogsResponse) require.NoError(t, err) - config.DB = database - config.Client = fakeLogClient - watcher := New(config) + watcher := setupOrderWatcher(t, ctx, fakeLogClient) logs, furthestBlockProcessed := watcher.getLogsInBlockRange(ctx, testCase.From, testCase.To) require.Equal(t, testCase.FurthestBlockProcessed, furthestBlockProcessed, testCase.Label) @@ -566,6 +508,20 @@ func TestIsWarning(t *testing.T) { } } +func setupFakeClientAndOrderWatcher(t *testing.T, ctx context.Context, testdataPath string) (*fakeClient, *Watcher) { + fakeClient, err := newFakeClient(testdataPath) + require.NoError(t, err) + return fakeClient, setupOrderWatcher(t, ctx, fakeClient) +} + +func setupOrderWatcher(t *testing.T, ctx context.Context, client Client) *Watcher { + database, err := db.New(ctx, dbOptions()) + require.NoError(t, err) + config.Client = client + config.DB = database + return New(blockRetentionLimit, config) +} + func aRange(from, to int) string { r := fmt.Sprintf("%d-%d", from, to) return r diff --git a/ethereum/blockwatch/client.go b/ethereum/blockwatch/client.go index a323cbad5..38bd26ac8 100644 --- a/ethereum/blockwatch/client.go +++ b/ethereum/blockwatch/client.go @@ -30,6 +30,9 @@ type Client interface { FilterLogs(q ethereum.FilterQuery) ([]ethtypes.Log, error) } +// Ensure that RpcClient is compliant with the Client interface. +var _ Client = &RpcClient{} + // RpcClient is a Client for fetching Ethereum blocks from a specific JSON-RPC endpoint. type RpcClient struct { ethRPCClient ethrpcclient.Client @@ -51,7 +54,7 @@ type GetBlockByNumberResponse struct { } // UnknownBlockNumberError is the error returned from a filter logs RPC call when the block number -// specified is not recognized +// specified is not recognized. type UnknownBlockNumberError struct { Message string BlockNumber *big.Int @@ -110,8 +113,8 @@ func (rc *RpcClient) HeaderByNumber(number *big.Int) (*types.MiniHeader, error) return miniHeader, nil } -// UnknownBlockHashError is the error returned from a filter logs RPC call when the blockHash -// specified is not recognized +// UnknownBlockHashError is the error returned from a filter logs RPC call when +// the blockHash specified is not recognized. type UnknownBlockHashError struct { BlockHash common.Hash } @@ -144,8 +147,8 @@ func (rc *RpcClient) HeaderByHash(hash common.Hash) (*types.MiniHeader, error) { return miniHeader, nil } -// FilterUnknownBlockError is the error returned from a filter logs RPC call when the blockHash -// specified is not recognized +// FilterUnknownBlockError is the error returned from a filter logs RPC call when +// the blockHash specified is not recognized. type FilterUnknownBlockError struct { Message string FilterQuery ethereum.FilterQuery diff --git a/ethereum/blockwatch/stack.go b/ethereum/blockwatch/stack.go deleted file mode 100644 index 905dc6355..000000000 --- a/ethereum/blockwatch/stack.go +++ /dev/null @@ -1,81 +0,0 @@ -package blockwatch - -import ( - "fmt" - - "github.com/0xProject/0x-mesh/common/types" - "github.com/0xProject/0x-mesh/db" -) - -type MiniHeaderAlreadyExistsError struct { - miniHeader *types.MiniHeader -} - -func (e MiniHeaderAlreadyExistsError) Error() string { - return fmt.Sprintf("cannot add miniHeader with the same number (%s) or hash (%s) as an existing miniHeader", e.miniHeader.Number.String(), e.miniHeader.Hash.Hex()) -} - -// Stack is a simple in-memory stack used in tests -type Stack struct { - db *db.DB -} - -// New instantiates a new Stack -func NewStack(db *db.DB) *Stack { - return &Stack{ - db: db, - } -} - -// Peek returns the top of the stack -func (s *Stack) Peek() (*types.MiniHeader, error) { - latestMiniHeader, err := s.db.GetLatestMiniHeader() - if err != nil { - if err == db.ErrNotFound { - return nil, nil - } - return nil, err - } - return latestMiniHeader, nil -} - -// Pop returns the top of the stack and removes it from the stack -func (s *Stack) Pop() (*types.MiniHeader, error) { - removed, err := s.db.DeleteMiniHeaders(&db.MiniHeaderQuery{ - Limit: 1, - Sort: []db.MiniHeaderSort{ - { - Field: db.MFNumber, - Direction: db.Descending, - }, - }, - }) - if err != nil { - return nil, err - } else if len(removed) == 0 { - return nil, nil - } - return removed[0], nil -} - -// Push adds a db.MiniHeader to the stack. It returns an error if -// the stack already contains a miniHeader with the same number or -// hash. -func (s *Stack) Push(miniHeader *types.MiniHeader) error { - added, _, err := s.db.AddMiniHeaders([]*types.MiniHeader{miniHeader}) - if len(added) == 0 { - return MiniHeaderAlreadyExistsError{miniHeader: miniHeader} - } - return err -} - -// PeekAll returns all the miniHeaders currently in the stack -func (s *Stack) PeekAll() ([]*types.MiniHeader, error) { - return s.db.FindMiniHeaders(nil) -} - -// Clear removes all items from the stack and clears any set checkpoint -func (s *Stack) Clear() error { - _, err := s.db.DeleteMiniHeaders(nil) - return err -} diff --git a/ethereum/blockwatch/stack_test.go b/ethereum/blockwatch/stack_test.go deleted file mode 100644 index f2abf626d..000000000 --- a/ethereum/blockwatch/stack_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package blockwatch - -import ( - "context" - "math/big" - "testing" - "time" - - "github.com/0xProject/0x-mesh/common/types" - "github.com/0xProject/0x-mesh/db" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var ( - miniHeaderOne = &types.MiniHeader{ - Number: big.NewInt(1), - Hash: common.HexToHash("0x1"), - Parent: common.HexToHash("0x0"), - Timestamp: time.Now().UTC(), - } -) - -func newTestStack(t *testing.T, ctx context.Context) *Stack { - database, err := db.New(ctx, db.TestOptions()) - require.NoError(t, err) - return NewStack(database) -} - -func TestStackPushPeekPop(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stack := newTestStack(t, ctx) - - err := stack.Push(miniHeaderOne) - require.NoError(t, err) - expectedMiniHeader := miniHeaderOne - - actualMiniHeaders, err := stack.PeekAll() - require.NoError(t, err) - require.Len(t, actualMiniHeaders, 1) - assert.Equal(t, expectedMiniHeader, actualMiniHeaders[0]) - - actualMiniHeader, err := stack.Peek() - require.NoError(t, err) - assert.Equal(t, expectedMiniHeader, actualMiniHeader) - - actualMiniHeaders, err = stack.PeekAll() - require.NoError(t, err) - assert.Len(t, actualMiniHeaders, 1) - - actualMiniHeader, err = stack.Pop() - require.NoError(t, err) - assert.Equal(t, expectedMiniHeader, actualMiniHeader) - - actualMiniHeaders, err = stack.PeekAll() - require.NoError(t, err) - assert.Len(t, actualMiniHeaders, 0) -} - -func TestStackErrorIfPushTwoHeadersWithSameNumber(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stack := newTestStack(t, ctx) - // Push miniHeaderOne - err := stack.Push(miniHeaderOne) - require.NoError(t, err) - // Push miniHeaderOne again - err = stack.Push(miniHeaderOne) - assert.Error(t, err) -} - -func TestStackClear(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stack := newTestStack(t, ctx) - - err := stack.Push(miniHeaderOne) - require.NoError(t, err) - - miniHeader, err := stack.Peek() - require.NoError(t, err) - assert.Equal(t, miniHeaderOne, miniHeader) - - err = stack.Clear() - require.NoError(t, err) - - miniHeader, err = stack.Peek() - require.NoError(t, err) - require.Nil(t, miniHeader) -} diff --git a/ethereum/simplestack/simple_stack.go b/ethereum/simplestack/simple_stack.go new file mode 100644 index 000000000..255853fbc --- /dev/null +++ b/ethereum/simplestack/simple_stack.go @@ -0,0 +1,173 @@ +package simplestack + +import ( + "fmt" + "sync" + + "github.com/0xProject/0x-mesh/common/types" +) + +// UpdateType is the type of update applied to the in-memory stack. +type UpdateType int + +const ( + Pop UpdateType = iota + Push +) + +// Update represents one update to the stack, either a pop or push of a miniHeader. +type Update struct { + Type UpdateType + MiniHeader *types.MiniHeader +} + +// SimpleStack is a simple in-memory stack used in tests. +type SimpleStack struct { + limit int + miniHeaders []*types.MiniHeader + updates []*Update + mu sync.RWMutex + latestCheckpointID int +} + +// New instantiates a new SimpleStack. +func New(retentionLimit int, miniHeaders []*types.MiniHeader) *SimpleStack { + return &SimpleStack{ + limit: retentionLimit, + miniHeaders: miniHeaders, + updates: []*Update{}, + } +} + +// Peek returns the top of the stack. +func (s *SimpleStack) Peek() *types.MiniHeader { + s.mu.RLock() + defer s.mu.RUnlock() + + if len(s.miniHeaders) == 0 { + return nil + } + return s.miniHeaders[len(s.miniHeaders)-1] +} + +// Pop returns the top of the stack and removes it from the stack. +func (s *SimpleStack) Pop() *types.MiniHeader { + s.mu.Lock() + defer s.mu.Unlock() + + return s.pop() +} + +// you MUST acquire a lock on the mutex `mu` before calling `pop()`. +func (s *SimpleStack) pop() *types.MiniHeader { + if len(s.miniHeaders) == 0 { + return nil + } + top := s.miniHeaders[len(s.miniHeaders)-1] + s.miniHeaders = s.miniHeaders[:len(s.miniHeaders)-1] + s.updates = append(s.updates, &Update{ + Type: Pop, + MiniHeader: top, + }) + return top +} + +// Push adds a types.MiniHeader to the stack. +func (s *SimpleStack) Push(miniHeader *types.MiniHeader) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.push(miniHeader) +} + +// you MUST acquire a lock on the mutex `mu` before calling `push()`. +func (s *SimpleStack) push(miniHeader *types.MiniHeader) error { + for _, h := range s.miniHeaders { + if h.Number.Int64() == miniHeader.Number.Int64() { + return fmt.Errorf("attempted to push multiple blocks with block number %d to the stack", miniHeader.Number.Int64()) + } + } + + if len(s.miniHeaders) == s.limit { + s.miniHeaders = s.miniHeaders[1:] + } + s.miniHeaders = append(s.miniHeaders, miniHeader) + s.updates = append(s.updates, &Update{ + Type: Push, + MiniHeader: miniHeader, + }) + return nil +} + +// PeekAll returns all the miniHeaders currently in the stack. +func (s *SimpleStack) PeekAll() []*types.MiniHeader { + s.mu.RLock() + defer s.mu.RUnlock() + + // Return copy of miniHeaders array + m := make([]*types.MiniHeader, len(s.miniHeaders)) + copy(m, s.miniHeaders) + + return m +} + +// Clear removes all items from the stack and clears any set checkpoint. +func (s *SimpleStack) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + s.miniHeaders = []*types.MiniHeader{} + s.updates = []*Update{} + s.latestCheckpointID = 0 +} + +// Checkpoint checkpoints the changes to the stack such that a subsequent +// call to `Reset(checkpointID)` with the checkpointID returned from this +// call will reset any subsequent changes back to the state of the stack +// at the time of this checkpoint. +func (s *SimpleStack) Checkpoint() int { + s.mu.Lock() + defer s.mu.Unlock() + + s.updates = []*Update{} + s.latestCheckpointID++ + return s.latestCheckpointID +} + +// Reset resets the in-memory stack with the contents from the latest checkpoint. +func (s *SimpleStack) Reset(checkpointID int) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.latestCheckpointID == 0 { + return fmt.Errorf("Checkpoint() must be called before Reset() since without it the checkpointID is unspecified") + } else if checkpointID != s.latestCheckpointID { + return fmt.Errorf("Attempted to reset the stack to checkpoint %d but the latest checkpoint has ID %d", checkpointID, s.latestCheckpointID) + } + + for i := len(s.updates) - 1; i >= 0; i-- { + u := s.updates[i] + switch u.Type { + case Pop: + if err := s.push(u.MiniHeader); err != nil { + return err + } + case Push: + _ = s.pop() + default: + return fmt.Errorf("Unrecognized update type encountered: %d", u.Type) + } + } + s.updates = []*Update{} + return nil +} + +// GetUpdates returns the updates applied since the last checkpoint. +func (s *SimpleStack) GetUpdates() []*Update { + s.mu.RLock() + defer s.mu.RUnlock() + + // Return copy of updates array + u := make([]*Update, len(s.updates)) + copy(u, s.updates) + return u +} diff --git a/ethereum/simplestack/simple_stack_test.go b/ethereum/simplestack/simple_stack_test.go new file mode 100644 index 000000000..b80d1b1fd --- /dev/null +++ b/ethereum/simplestack/simple_stack_test.go @@ -0,0 +1,202 @@ +package simplestack + +import ( + "math/big" + "testing" + "time" + + "github.com/0xProject/0x-mesh/common/types" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const limit = 10 + +var ( + miniHeaderOne = &types.MiniHeader{ + Number: big.NewInt(1), + Hash: common.HexToHash("0x1"), + Parent: common.HexToHash("0x0"), + Timestamp: time.Now().UTC(), + } + miniHeaderTwo = &types.MiniHeader{ + Number: big.NewInt(2), + Hash: common.HexToHash("0x2"), + Parent: common.HexToHash("0x1"), + Timestamp: time.Now().UTC(), + } +) + +func TestSimpleStackPushPeekPop(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + + expectedLen := 1 + miniHeaders := stack.PeekAll() + assert.Len(t, miniHeaders, expectedLen) + + miniHeader := stack.Peek() + assert.Equal(t, miniHeaders[0], miniHeader) + + expectedLen = 1 + miniHeaders = stack.PeekAll() + assert.Len(t, miniHeaders, expectedLen) + + miniHeader = stack.Pop() + assert.Equal(t, miniHeaders[0], miniHeader) + + expectedLen = 0 + miniHeaders = stack.PeekAll() + assert.Len(t, miniHeaders, expectedLen) +} + +func TestSimpleStackErrorIfPushTwoHeadersWithSameNumber(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + // Push miniHeaderOne + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + // Push miniHeaderOne again + err = stack.Push(miniHeaderOne) + assert.Error(t, err) +} + +func TestSimpleStackErrorIfResetWithoutCheckpointFirst(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + + checkpointID := 123 + err := stack.Reset(checkpointID) + require.Error(t, err) +} + +func TestSimpleStackClear(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + + miniHeader := stack.Peek() + assert.Equal(t, miniHeaderOne, miniHeader) + + stack.Clear() + + miniHeader = stack.Peek() + require.Nil(t, miniHeader) +} + +func TestSimpleStackErrorIfResetWithOldCheckpoint(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + + checkpointIDOne := stack.Checkpoint() + checkpointIDTwo := stack.Checkpoint() + + err := stack.Reset(checkpointIDOne) + require.Error(t, err) + + err = stack.Reset(checkpointIDTwo) + require.NoError(t, err) +} + +func TestSimpleStackCheckpoint(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + err = stack.Push(miniHeaderTwo) + require.NoError(t, err) + + assert.Len(t, stack.updates, 2) + + stack.Checkpoint() + + assert.Len(t, stack.updates, 0) + + miniHeader := stack.Pop() + assert.Equal(t, miniHeaderTwo, miniHeader) + + miniHeader = stack.Pop() + assert.Equal(t, miniHeaderOne, miniHeader) + + assert.Len(t, stack.updates, 2) + + stack.Checkpoint() + + assert.Len(t, stack.updates, 0) +} + +func TestSimpleStackCheckpointAfterSameHeaderPushedAndPopped(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + // Push miniHeaderOne + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + // Pop miniHeaderOne + miniHeader := stack.Pop() + assert.Equal(t, miniHeaderOne, miniHeader) + + assert.Len(t, stack.miniHeaders, 0) + assert.Len(t, stack.updates, 2) + + stack.Checkpoint() + + assert.Len(t, stack.updates, 0) +} + +func TestSimpleStackCheckpointAfterSameHeaderPushedThenPoppedThenPushed(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + // Push miniHeaderOne + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + // Pop miniHeaderOne + miniHeader := stack.Pop() + assert.Equal(t, miniHeaderOne, miniHeader) + // Push miniHeaderOne again + err = stack.Push(miniHeaderOne) + require.NoError(t, err) + + assert.Len(t, stack.miniHeaders, 1) + assert.Len(t, stack.updates, 3) + + stack.Checkpoint() + + assert.Len(t, stack.updates, 0) +} + +func TestSimpleStackCheckpointThenReset(t *testing.T) { + stack := New(10, []*types.MiniHeader{}) + + checkpointID := stack.Checkpoint() + + err := stack.Push(miniHeaderOne) + require.NoError(t, err) + + assert.Len(t, stack.miniHeaders, 1) + assert.Len(t, stack.updates, 1) + + err = stack.Reset(checkpointID) + require.NoError(t, err) + + assert.Len(t, stack.miniHeaders, 0) + assert.Len(t, stack.updates, 0) + + err = stack.Push(miniHeaderTwo) + require.NoError(t, err) + + assert.Len(t, stack.miniHeaders, 1) + assert.Len(t, stack.updates, 1) + + checkpointID = stack.Checkpoint() + + assert.Len(t, stack.miniHeaders, 1) + assert.Len(t, stack.updates, 0) + + miniHeader := stack.Pop() + assert.Equal(t, miniHeader, miniHeaderTwo) + + assert.Len(t, stack.miniHeaders, 0) + assert.Len(t, stack.updates, 1) + + checkpointID = stack.Checkpoint() + + assert.Len(t, stack.miniHeaders, 0) + assert.Len(t, stack.updates, 0) +} diff --git a/packages/mesh-browser-lite/src/database.ts b/packages/mesh-browser-lite/src/database.ts index acd09b12d..17e8b91a4 100644 --- a/packages/mesh-browser-lite/src/database.ts +++ b/packages/mesh-browser-lite/src/database.ts @@ -317,6 +317,27 @@ export class Database { }; } + // ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) + public async resetMiniHeadersAsync(newMiniHeaders: MiniHeader[]): Promise { + await this._db.transaction('rw!', this._miniHeaders, async () => { + // Remove all of the existing miniheaders + await this._miniHeaders.clear(); + + for (const newMiniHeader of newMiniHeaders) { + try { + await this._miniHeaders.add(newMiniHeader); + } catch (e) { + if (e.name === 'ConstraintError') { + // A miniHeader with this hash already exists. This is + // fine based on the semantics of addMiniHeaders. + continue; + } + throw e; + } + } + }); + } + // GetMiniHeader(hash common.Hash) (*types.MiniHeader, error) public async getMiniHeaderAsync(hash: string): Promise { return this._db.transaction('rw!', this._miniHeaders, async () => { diff --git a/zeroex/orderwatch/order_watcher_test.go b/zeroex/orderwatch/order_watcher_test.go index 3e11a7f2e..b612318e6 100644 --- a/zeroex/orderwatch/order_watcher_test.go +++ b/zeroex/orderwatch/order_watcher_test.go @@ -33,6 +33,7 @@ import ( ) const ( + blockRetentionLimit = 20 ethereumRPCRequestTimeout = 30 * time.Second miniHeaderRetentionLimit = 2 blockPollingInterval = 1 * time.Second @@ -1657,7 +1658,7 @@ func setupOrderWatcher(ctx context.Context, t *testing.T, ethRPCClient ethrpccli Topics: topics, Client: blockWatcherClient, } - blockWatcher := blockwatch.New(blockWatcherConfig) + blockWatcher := blockwatch.New(blockRetentionLimit, blockWatcherConfig) orderValidator, err := ordervalidator.New(ethRPCClient, constants.TestChainID, ethereumRPCMaxContentLength, ganacheAddresses) require.NoError(t, err) orderWatcher, err := New(Config{