diff --git a/db/dexie_implementation.go b/db/dexie_implementation.go index 6118540dc..1deee4274 100644 --- a/db/dexie_implementation.go +++ b/db/dexie_implementation.go @@ -241,6 +241,22 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil } +func (db *DB) ResetMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.MiniHeader, removed []*types.MiniHeader, err error) { + defer func() { + if r := recover(); r != nil { + err = recoverError(r) + } + }() + jsMiniHeaders := dexietypes.MiniHeadersFromCommonType(miniHeaders) + jsResult, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("resetMiniHeadersAsync", jsMiniHeaders)) + if err != nil { + return nil, nil, convertJSError(err) + } + jsAdded := jsResult.Get("added") + jsRemoved := jsResult.Get("removed") + return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil +} + func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) { defer func() { if r := recover(); r != nil { diff --git a/db/sql_implementation.go b/db/sql_implementation.go index e4310bda2..36a5f54ce 100644 --- a/db/sql_implementation.go +++ b/db/sql_implementation.go @@ -567,6 +567,54 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi return added, removed, nil } +func (db *DB) ResetMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.MiniHeader, removed []*types.MiniHeader, err error) { + defer func() { + err = convertErr(err) + }() + + removedMap := map[common.Hash]*types.MiniHeader{} + err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error { + removeQuery := txn.Select("*").From("miniHeaders") + var miniHeadersToRemove []*sqltypes.MiniHeader + if err := removeQuery.GetAllContext(db.ctx, &miniHeadersToRemove); err != nil { + return err + } + for _, miniHeader := range miniHeadersToRemove { + _, err := txn.DeleteFrom("miniHeaders").Where(sqlz.Eq(string(MFHash), miniHeader.Hash)).ExecContext(db.ctx) + if err != nil { + return err + } + removedMap[miniHeader.Hash] = sqltypes.MiniHeaderToCommonType(miniHeader) + } + + for _, miniHeader := range miniHeaders { + result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(miniHeader)) + if err != nil { + return err + } + affected, err := result.RowsAffected() + if err != nil { + return err + } + if _, found := removedMap[miniHeader.Hash]; found && affected > 0 { + // If the miniHeader was previously removed, remove it from + // the removed set and don't add it to the added set. + delete(removedMap, miniHeader.Hash) + } else { + added = append(added, miniHeader) + } + } + return nil + }) + for _, miniHeader := range removedMap { + removed = append(removed, miniHeader) + } + if err != nil { + return nil, nil, err + } + return added, removed, nil +} + func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) { defer func() { err = convertErr(err) diff --git a/ethereum/blockwatch/block_watcher.go b/ethereum/blockwatch/block_watcher.go index eb2664d9c..1a608064e 100644 --- a/ethereum/blockwatch/block_watcher.go +++ b/ethereum/blockwatch/block_watcher.go @@ -67,7 +67,6 @@ func (e TooMayBlocksBehindError) Error() string { // Config holds some configuration options for an instance of BlockWatcher. type Config struct { - MaxMiniHeaders int DB *db.DB PollingInterval time.Duration WithLogs bool @@ -143,10 +142,21 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int, return blocksElapsed, err } if len(events) > 0 { + newMiniHeaders, err := w.stack.PeekAll() + if err != nil { + return blocksElapsed, err + } + if _, _, err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil { + return blocksElapsed, err + } w.blockFeed.Send(events) } } else { - // Clear all block headers from stack so BlockWatcher starts again from latest block + // Clear all block headers from stack and database so BlockWatcher + // starts again from latest block + if _, err := w.db.DeleteMiniHeaders(nil); err != nil { + return blocksElapsed, err + } if err := w.stack.Clear(); err != nil { return blocksElapsed, err } @@ -314,14 +324,8 @@ func (w *Watcher) SyncToLatestBlock() error { if err != nil { return err } - if _, err := w.db.DeleteMiniHeaders(nil); err != nil { - return err - } - newMiniheaders, err := w.stack.PeekAll() - if err != nil { - return err - } - if _, _, err := w.db.AddMiniHeaders(newMiniheaders); err != nil { + newMiniHeaders, err := w.stack.PeekAll() + if _, _, err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil { return err } w.blockFeed.Send(allEvents) @@ -411,7 +415,7 @@ func (w *Watcher) addLogs(header *types.MiniHeader) (*types.MiniHeader, error) { return header, nil } -// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was +// getMissedEventsToBackfill finds missed events that might have occurred while the Mesh node was // offline. It does this by comparing the last block stored with the latest block discoverable via RPC. // If the stored block is older then the latest block, it batch fetches the events for missing blocks, // re-sets the stored blocks and returns the block events found. @@ -500,7 +504,7 @@ func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]etht for len(blockRanges) != 0 { var chunk []*blockRange if len(blockRanges) < getLogsRequestChunkSize { - chunk = blockRanges[:len(blockRanges)] + chunk = blockRanges } else { chunk = blockRanges[:getLogsRequestChunkSize] } diff --git a/ethereum/blockwatch/block_watcher_test.go b/ethereum/blockwatch/block_watcher_test.go index 75cfce4ee..9eba3b63e 100644 --- a/ethereum/blockwatch/block_watcher_test.go +++ b/ethereum/blockwatch/block_watcher_test.go @@ -521,9 +521,6 @@ func setupFakeClientAndOrderWatcher(t *testing.T, ctx context.Context, testdataP func setupOrderWatcher(t *testing.T, ctx context.Context, client Client) *Watcher { database, err := db.New(ctx, dbOptions()) require.NoError(t, err) - - // Polling interval unused because we hijack the ticker for this test - require.NoError(t, err) config.Client = client config.DB = database return New(blockRetentionLimit, config) diff --git a/ethereum/blockwatch/stack.go b/ethereum/blockwatch/stack.go deleted file mode 100644 index 905dc6355..000000000 --- a/ethereum/blockwatch/stack.go +++ /dev/null @@ -1,81 +0,0 @@ -package blockwatch - -import ( - "fmt" - - "github.com/0xProject/0x-mesh/common/types" - "github.com/0xProject/0x-mesh/db" -) - -type MiniHeaderAlreadyExistsError struct { - miniHeader *types.MiniHeader -} - -func (e MiniHeaderAlreadyExistsError) Error() string { - return fmt.Sprintf("cannot add miniHeader with the same number (%s) or hash (%s) as an existing miniHeader", e.miniHeader.Number.String(), e.miniHeader.Hash.Hex()) -} - -// Stack is a simple in-memory stack used in tests -type Stack struct { - db *db.DB -} - -// New instantiates a new Stack -func NewStack(db *db.DB) *Stack { - return &Stack{ - db: db, - } -} - -// Peek returns the top of the stack -func (s *Stack) Peek() (*types.MiniHeader, error) { - latestMiniHeader, err := s.db.GetLatestMiniHeader() - if err != nil { - if err == db.ErrNotFound { - return nil, nil - } - return nil, err - } - return latestMiniHeader, nil -} - -// Pop returns the top of the stack and removes it from the stack -func (s *Stack) Pop() (*types.MiniHeader, error) { - removed, err := s.db.DeleteMiniHeaders(&db.MiniHeaderQuery{ - Limit: 1, - Sort: []db.MiniHeaderSort{ - { - Field: db.MFNumber, - Direction: db.Descending, - }, - }, - }) - if err != nil { - return nil, err - } else if len(removed) == 0 { - return nil, nil - } - return removed[0], nil -} - -// Push adds a db.MiniHeader to the stack. It returns an error if -// the stack already contains a miniHeader with the same number or -// hash. -func (s *Stack) Push(miniHeader *types.MiniHeader) error { - added, _, err := s.db.AddMiniHeaders([]*types.MiniHeader{miniHeader}) - if len(added) == 0 { - return MiniHeaderAlreadyExistsError{miniHeader: miniHeader} - } - return err -} - -// PeekAll returns all the miniHeaders currently in the stack -func (s *Stack) PeekAll() ([]*types.MiniHeader, error) { - return s.db.FindMiniHeaders(nil) -} - -// Clear removes all items from the stack and clears any set checkpoint -func (s *Stack) Clear() error { - _, err := s.db.DeleteMiniHeaders(nil) - return err -} diff --git a/ethereum/blockwatch/stack_test.go b/ethereum/blockwatch/stack_test.go deleted file mode 100644 index f2abf626d..000000000 --- a/ethereum/blockwatch/stack_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package blockwatch - -import ( - "context" - "math/big" - "testing" - "time" - - "github.com/0xProject/0x-mesh/common/types" - "github.com/0xProject/0x-mesh/db" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var ( - miniHeaderOne = &types.MiniHeader{ - Number: big.NewInt(1), - Hash: common.HexToHash("0x1"), - Parent: common.HexToHash("0x0"), - Timestamp: time.Now().UTC(), - } -) - -func newTestStack(t *testing.T, ctx context.Context) *Stack { - database, err := db.New(ctx, db.TestOptions()) - require.NoError(t, err) - return NewStack(database) -} - -func TestStackPushPeekPop(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stack := newTestStack(t, ctx) - - err := stack.Push(miniHeaderOne) - require.NoError(t, err) - expectedMiniHeader := miniHeaderOne - - actualMiniHeaders, err := stack.PeekAll() - require.NoError(t, err) - require.Len(t, actualMiniHeaders, 1) - assert.Equal(t, expectedMiniHeader, actualMiniHeaders[0]) - - actualMiniHeader, err := stack.Peek() - require.NoError(t, err) - assert.Equal(t, expectedMiniHeader, actualMiniHeader) - - actualMiniHeaders, err = stack.PeekAll() - require.NoError(t, err) - assert.Len(t, actualMiniHeaders, 1) - - actualMiniHeader, err = stack.Pop() - require.NoError(t, err) - assert.Equal(t, expectedMiniHeader, actualMiniHeader) - - actualMiniHeaders, err = stack.PeekAll() - require.NoError(t, err) - assert.Len(t, actualMiniHeaders, 0) -} - -func TestStackErrorIfPushTwoHeadersWithSameNumber(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stack := newTestStack(t, ctx) - // Push miniHeaderOne - err := stack.Push(miniHeaderOne) - require.NoError(t, err) - // Push miniHeaderOne again - err = stack.Push(miniHeaderOne) - assert.Error(t, err) -} - -func TestStackClear(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stack := newTestStack(t, ctx) - - err := stack.Push(miniHeaderOne) - require.NoError(t, err) - - miniHeader, err := stack.Peek() - require.NoError(t, err) - assert.Equal(t, miniHeaderOne, miniHeader) - - err = stack.Clear() - require.NoError(t, err) - - miniHeader, err = stack.Peek() - require.NoError(t, err) - require.Nil(t, miniHeader) -} diff --git a/ethereum/simplestack/simple_stack.go b/ethereum/simplestack/simple_stack.go index 1132cab6a..0e1e0c35b 100644 --- a/ethereum/simplestack/simple_stack.go +++ b/ethereum/simplestack/simple_stack.go @@ -5,8 +5,6 @@ import ( "sync" "github.com/0xProject/0x-mesh/common/types" - // FIXME Remove this - log "github.com/sirupsen/logrus" ) // UpdateType is the type of update applied to the in-memory stack @@ -92,7 +90,6 @@ func (s *SimpleStack) push(miniHeader *types.MiniHeader) error { } if len(s.miniHeaders) == s.limit { - log.Info(s.limit) s.miniHeaders = s.miniHeaders[1:] } s.miniHeaders = append(s.miniHeaders, miniHeader) diff --git a/packages/mesh-browser-lite/src/database.ts b/packages/mesh-browser-lite/src/database.ts index acd09b12d..da37893ad 100644 --- a/packages/mesh-browser-lite/src/database.ts +++ b/packages/mesh-browser-lite/src/database.ts @@ -116,6 +116,11 @@ export interface AddMiniHeadersResult { removed: MiniHeader[]; } +export interface ResetMiniHeadersResult { + added: MiniHeader[]; + removed: MiniHeader[]; +} + export interface Metadata { ethereumChainID: number; ethRPCRequestsSentInCurrentUTCDay: number; @@ -317,6 +322,45 @@ export class Database { }; } + // ResetMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.MiniHeader, removed []*types.MiniHeader, err error) + public async resetMiniHeadersAsync(miniHeaders: MiniHeader[]): Promise { + const removedMap = new Map(); + const added: MiniHeader[] = []; + await this._db.transaction('rw!', this._miniHeaders, async () => { + // Remove all of the existing miniheaders + const outdatedMiniHeaders = await this._miniHeaders.toArray(); + for (const outdated of outdatedMiniHeaders) { + await this._miniHeaders.delete(outdated.hash); + removedMap.set(outdated.hash, outdated); + } + + for (const miniHeader of miniHeaders) { + try { + await this._miniHeaders.add(miniHeader); + } catch (e) { + if (e.name === 'ConstraintError') { + // A miniHeader with this hash already exists. This is fine based on the semantics of + // addMiniHeaders. + continue; + } + throw e; + } + if (removedMap.has(miniHeader.hash)) { + // If the order was previously removed, remove it from + // the removed set and don't add it to the added set. + removedMap.delete(miniHeader.hash); + } else { + added.push(miniHeader); + } + } + }); + + return { + added, + removed: Array.from(removedMap.values()), + }; + } + // GetMiniHeader(hash common.Hash) (*types.MiniHeader, error) public async getMiniHeaderAsync(hash: string): Promise { return this._db.transaction('rw!', this._miniHeaders, async () => {