Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
Addressed review feedback from @albrow
Browse files Browse the repository at this point in the history
  • Loading branch information
jalextowle committed Jul 10, 2020
1 parent a794d50 commit 8b4f762
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 191 deletions.
16 changes: 16 additions & 0 deletions db/dexie_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil
}

func (db *DB) ResetMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.MiniHeader, removed []*types.MiniHeader, err error) {
defer func() {
if r := recover(); r != nil {
err = recoverError(r)
}
}()
jsMiniHeaders := dexietypes.MiniHeadersFromCommonType(miniHeaders)
jsResult, err := jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("resetMiniHeadersAsync", jsMiniHeaders))
if err != nil {
return nil, nil, convertJSError(err)
}
jsAdded := jsResult.Get("added")
jsRemoved := jsResult.Get("removed")
return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil
}

func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) {
defer func() {
if r := recover(); r != nil {
Expand Down
48 changes: 48 additions & 0 deletions db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,54 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return added, removed, nil
}

func (db *DB) ResetMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.MiniHeader, removed []*types.MiniHeader, err error) {
defer func() {
err = convertErr(err)
}()

removedMap := map[common.Hash]*types.MiniHeader{}
err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
removeQuery := txn.Select("*").From("miniHeaders")
var miniHeadersToRemove []*sqltypes.MiniHeader
if err := removeQuery.GetAllContext(db.ctx, &miniHeadersToRemove); err != nil {
return err
}
for _, miniHeader := range miniHeadersToRemove {
_, err := txn.DeleteFrom("miniHeaders").Where(sqlz.Eq(string(MFHash), miniHeader.Hash)).ExecContext(db.ctx)
if err != nil {
return err
}
removedMap[miniHeader.Hash] = sqltypes.MiniHeaderToCommonType(miniHeader)
}

for _, miniHeader := range miniHeaders {
result, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(miniHeader))
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if _, found := removedMap[miniHeader.Hash]; found && affected > 0 {
// If the miniHeader was previously removed, remove it from
// the removed set and don't add it to the added set.
delete(removedMap, miniHeader.Hash)
} else {
added = append(added, miniHeader)
}
}
return nil
})
for _, miniHeader := range removedMap {
removed = append(removed, miniHeader)
}
if err != nil {
return nil, nil, err
}
return added, removed, nil
}

func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) {
defer func() {
err = convertErr(err)
Expand Down
28 changes: 16 additions & 12 deletions ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (e TooMayBlocksBehindError) Error() string {

// Config holds some configuration options for an instance of BlockWatcher.
type Config struct {
MaxMiniHeaders int
DB *db.DB
PollingInterval time.Duration
WithLogs bool
Expand Down Expand Up @@ -143,10 +142,21 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int,
return blocksElapsed, err
}
if len(events) > 0 {
newMiniHeaders, err := w.stack.PeekAll()
if err != nil {
return blocksElapsed, err
}
if _, _, err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil {
return blocksElapsed, err
}
w.blockFeed.Send(events)
}
} else {
// Clear all block headers from stack so BlockWatcher starts again from latest block
// Clear all block headers from stack and database so BlockWatcher
// starts again from latest block
if _, err := w.db.DeleteMiniHeaders(nil); err != nil {
return blocksElapsed, err
}
if err := w.stack.Clear(); err != nil {
return blocksElapsed, err
}
Expand Down Expand Up @@ -314,14 +324,8 @@ func (w *Watcher) SyncToLatestBlock() error {
if err != nil {
return err
}
if _, err := w.db.DeleteMiniHeaders(nil); err != nil {
return err
}
newMiniheaders, err := w.stack.PeekAll()
if err != nil {
return err
}
if _, _, err := w.db.AddMiniHeaders(newMiniheaders); err != nil {
newMiniHeaders, err := w.stack.PeekAll()
if _, _, err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil {
return err
}
w.blockFeed.Send(allEvents)
Expand Down Expand Up @@ -411,7 +415,7 @@ func (w *Watcher) addLogs(header *types.MiniHeader) (*types.MiniHeader, error) {
return header, nil
}

// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was
// getMissedEventsToBackfill finds missed events that might have occurred while the Mesh node was
// offline. It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older then the latest block, it batch fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
Expand Down Expand Up @@ -500,7 +504,7 @@ func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]etht
for len(blockRanges) != 0 {
var chunk []*blockRange
if len(blockRanges) < getLogsRequestChunkSize {
chunk = blockRanges[:len(blockRanges)]
chunk = blockRanges
} else {
chunk = blockRanges[:getLogsRequestChunkSize]
}
Expand Down
3 changes: 0 additions & 3 deletions ethereum/blockwatch/block_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,6 @@ func setupFakeClientAndOrderWatcher(t *testing.T, ctx context.Context, testdataP
func setupOrderWatcher(t *testing.T, ctx context.Context, client Client) *Watcher {
database, err := db.New(ctx, dbOptions())
require.NoError(t, err)

// Polling interval unused because we hijack the ticker for this test
require.NoError(t, err)
config.Client = client
config.DB = database
return New(blockRetentionLimit, config)
Expand Down
81 changes: 0 additions & 81 deletions ethereum/blockwatch/stack.go

This file was deleted.

92 changes: 0 additions & 92 deletions ethereum/blockwatch/stack_test.go

This file was deleted.

3 changes: 0 additions & 3 deletions ethereum/simplestack/simple_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sync"

"github.com/0xProject/0x-mesh/common/types"
// FIXME Remove this
log "github.com/sirupsen/logrus"
)

// UpdateType is the type of update applied to the in-memory stack
Expand Down Expand Up @@ -92,7 +90,6 @@ func (s *SimpleStack) push(miniHeader *types.MiniHeader) error {
}

if len(s.miniHeaders) == s.limit {
log.Info(s.limit)
s.miniHeaders = s.miniHeaders[1:]
}
s.miniHeaders = append(s.miniHeaders, miniHeader)
Expand Down
Loading

0 comments on commit 8b4f762

Please sign in to comment.