Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

Commit

Permalink
In-Memory Blockwatcher Syncing (#857)
Browse files Browse the repository at this point in the history
* Added simplestack back to Mesh

* Use an in-memory stack for blockwatcher syncing

* Addressed review feedback from @albrow

* Simplified `getSubBlockRanges` in `ethereum/blockwatcher`

* Light refactors of the blockwatcher

* Addressed review feedback from @z2trillion

* Addressed review feedback from @albrow

* Revert "Light refactors of the blockwatcher"

This reverts commit 63caae9.

* Addressed lingering review feedback from @albrow
  • Loading branch information
jalextowle authored Jul 15, 2020
1 parent 62c92e9 commit 0acb901
Show file tree
Hide file tree
Showing 13 changed files with 537 additions and 355 deletions.
3 changes: 2 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

const (
blockRetentionLimit = 20
ethereumRPCRequestTimeout = 30 * time.Second
peerConnectTimeout = 60 * time.Second
checkNewAddrInterval = 20 * time.Second
Expand Down Expand Up @@ -333,7 +334,7 @@ func newWithPrivateConfig(ctx context.Context, config Config, pConfig privateCon
Topics: topics,
Client: blockWatcherClient,
}
blockWatcher := blockwatch.New(blockWatcherConfig)
blockWatcher := blockwatch.New(blockRetentionLimit, blockWatcherConfig)

// Initialize the order validator
orderValidator, err := ordervalidator.New(
Expand Down
1 change: 1 addition & 0 deletions db/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Database interface {
FindMiniHeaders(opts *MiniHeaderQuery) ([]*types.MiniHeader, error)
DeleteMiniHeader(hash common.Hash) error
DeleteMiniHeaders(opts *MiniHeaderQuery) ([]*types.MiniHeader, error)
ResetMiniHeaders(miniHeaders []*types.MiniHeader) error
GetMetadata() (*types.Metadata, error)
SaveMetadata(metadata *types.Metadata) error
UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMetadata *types.Metadata)) error
Expand Down
16 changes: 16 additions & 0 deletions db/dexie_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil
}

// ResetMiniHeaders deletes all of the existing miniheaders and then stores new
// miniheaders in the database.
func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) {
defer func() {
if r := recover(); r != nil {
err = recoverError(r)
}
}()
jsNewMiniHeaders := dexietypes.MiniHeadersFromCommonType(newMiniHeaders)
_, err = jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("resetMiniHeadersAsync", jsNewMiniHeaders))
if err != nil {
return convertJSError(err)
}
return nil
}

func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) {
defer func() {
if r := recover(); r != nil {
Expand Down
23 changes: 23 additions & 0 deletions db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,29 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return added, removed, nil
}

// ResetMiniHeaders deletes all of the existing miniheaders and then stores new
// miniheaders in the database.
func (db *DB) ResetMiniHeaders(newMiniHeaders []*types.MiniHeader) (err error) {
defer func() {
err = convertErr(err)
}()

err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
_, err := txn.DeleteFrom("miniHeaders").ExecContext(db.ctx)
if err != nil {
return err
}
for _, newMiniHeader := range newMiniHeaders {
_, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(newMiniHeader))
if err != nil {
return err
}
}
return nil
})
return nil
}

func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) {
defer func() {
err = convertErr(err)
Expand Down
124 changes: 41 additions & 83 deletions ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum/simplestack"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand All @@ -29,7 +30,7 @@ const rpcClientNotFoundError = "not found"
var maxBlocksInGetLogsQuery = 60

// warningLevelErrorMessages are certain blockwatch.Watch errors that we want to report as warnings
// because they do not represent a bug or issue with Mesh and are expected to happen from time to time
// because they do not represent a bug or issue with Mesh and are expected to happen from time to time.
var warningLevelErrorMessages = []string{
constants.GethFilterUnknownBlock,
rpcClientNotFoundError,
Expand Down Expand Up @@ -77,7 +78,8 @@ type Config struct {
// supplied stack) handling block re-orgs and network disruptions gracefully. It can be started from
// any arbitrary block height, and will emit both block added and removed events.
type Watcher struct {
stack *Stack
stack *simplestack.SimpleStack
db *db.DB
client Client
blockFeed event.Feed
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
Expand All @@ -90,10 +92,11 @@ type Watcher struct {
}

// New creates a new Watcher instance.
func New(config Config) *Watcher {
func New(retentionLimit int, config Config) *Watcher {
return &Watcher{
pollingInterval: config.PollingInterval,
stack: NewStack(config.DB),
db: config.DB,
stack: simplestack.New(retentionLimit, []*types.MiniHeader{}),
client: config.Client,
withLogs: config.WithLogs,
topics: config.Topics,
Expand All @@ -114,10 +117,8 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int,
}
w.mu.Unlock()

latestBlockProcessed, err := w.stack.Peek()
if err != nil {
return 0, err
}
latestBlockProcessed := w.stack.Peek()

// No previously stored block so no blocks have elapsed
if latestBlockProcessed == nil {
return 0, nil
Expand All @@ -139,13 +140,19 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int,
return blocksElapsed, err
}
if len(events) > 0 {
newMiniHeaders := w.stack.PeekAll()
if err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil {
return blocksElapsed, err
}
w.blockFeed.Send(events)
}
} else {
// Clear all block headers from stack so BlockWatcher starts again from latest block
if err := w.stack.Clear(); err != nil {
// Clear all block headers from stack and database so BlockWatcher
// starts again from latest block
if _, err := w.db.DeleteMiniHeaders(nil); err != nil {
return blocksElapsed, err
}
w.stack.Clear()
}

return blocksElapsed, nil
Expand Down Expand Up @@ -224,25 +231,19 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {
}

// SyncToLatestBlock syncs our local state of the chain to the latest block found via
// Ethereum RPC
// Ethereum RPC.
func (w *Watcher) SyncToLatestBlock() error {
w.syncToLatestBlockMu.Lock()
defer w.syncToLatestBlockMu.Unlock()

existingMiniHeaders, err := w.stack.PeekAll()
if err != nil {
return err
}
checkpoint := w.stack.Checkpoint()

latestHeader, err := w.client.HeaderByNumber(nil)
if err != nil {
return err
}
latestBlockNumber := latestHeader.Number.Int64()
lastStoredHeader, err := w.stack.Peek()
if err != nil {
return err
}
lastStoredHeader := w.stack.Peek()
var lastStoredBlockNumber int64
if lastStoredHeader != nil {
lastStoredBlockNumber = lastStoredHeader.Number.Int64()
Expand Down Expand Up @@ -277,11 +278,7 @@ func (w *Watcher) SyncToLatestBlock() error {
// stored and fetch it
nextHeader := latestHeader
if numBlocksToFetch != 1 {
lastStoredHeader, err := w.stack.Peek()
if err != nil {
syncErr = err
break
}
lastStoredHeader := w.stack.Peek()
nextBlockNumber := big.NewInt(0).Add(lastStoredHeader.Number, big.NewInt(1))
nextHeader, err = w.client.HeaderByNumber(nextBlockNumber)
if err != nil {
Expand All @@ -302,14 +299,15 @@ func (w *Watcher) SyncToLatestBlock() error {
return syncErr
}
if w.shouldRevertChanges(lastStoredHeader, allEvents) {
// TODO(albrow): Take another look at this. Maybe extract to method.
if err := w.stack.Clear(); err != nil {
if err := w.stack.Reset(checkpoint); err != nil {
return err
}
if _, _, err := w.stack.db.AddMiniHeaders(existingMiniHeaders); err != nil {
} else {
w.stack.Checkpoint()
newMiniHeaders := w.stack.PeekAll()
if err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil {
return err
}
} else {
w.blockFeed.Send(allEvents)
}

Expand All @@ -328,10 +326,7 @@ func (w *Watcher) shouldRevertChanges(lastStoredHeader *types.MiniHeader, events
}

func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
}
latestHeader := w.stack.Peek()
// Is the stack empty or is it the next block?
if latestHeader == nil || nextHeader.Parent == latestHeader.Hash {
nextHeader, err := w.addLogs(nextHeader)
Expand All @@ -350,9 +345,7 @@ func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Ev
}

// Pop latestHeader from the stack. We already have a reference to it.
if _, err := w.stack.Pop(); err != nil {
return events, err
}
w.stack.Pop()
events = append(events, &Event{
Type: Removed,
BlockHeader: latestHeader,
Expand Down Expand Up @@ -397,7 +390,7 @@ func (w *Watcher) addLogs(header *types.MiniHeader) (*types.MiniHeader, error) {
return header, nil
}

// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was
// getMissedEventsToBackfill finds missed events that might have occurred while the Mesh node was
// offline. It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older then the latest block, it batch fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
Expand All @@ -411,9 +404,8 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, blocksElapsed i
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
// Doing so will cause the BlockWatcher to start from that furthestBlockProcessed.
if err := w.stack.Clear(); err != nil {
return events, err
}
w.stack.Clear()

// Add furthest block processed into the DB
latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
if err != nil {
Expand Down Expand Up @@ -470,7 +462,7 @@ type logRequestResult struct {
Err error
}

// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch
// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch.
const getLogsRequestChunkSize = 3

// getLogsInBlockRange attempts to fetch all logs in the block range supplied. It implements a
Expand All @@ -486,7 +478,7 @@ func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]etht
for len(blockRanges) != 0 {
var chunk []*blockRange
if len(blockRanges) < getLogsRequestChunkSize {
chunk = blockRanges[:len(blockRanges)]
chunk = blockRanges
} else {
chunk = blockRanges[:getLogsRequestChunkSize]
}
Expand Down Expand Up @@ -586,38 +578,16 @@ type blockRange struct {
// blocks' logs twice.
func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange {
chunks := []*blockRange{}
numBlocksLeft := to - from
if numBlocksLeft < rangeSize {
for ; from+rangeSize-1 < to; from += rangeSize {
chunks = append(chunks, &blockRange{
FromBlock: from,
ToBlock: to,
ToBlock: from + rangeSize - 1,
})
} else {
blocks := []int{}
for i := 0; i <= numBlocksLeft; i++ {
blocks = append(blocks, from+i)
}
numChunks := len(blocks) / rangeSize
remainder := len(blocks) % rangeSize
if remainder > 0 {
numChunks = numChunks + 1
}

for i := 0; i < numChunks; i = i + 1 {
fromIndex := i * rangeSize
toIndex := fromIndex + rangeSize
if toIndex > len(blocks) {
toIndex = len(blocks)
}
bs := blocks[fromIndex:toIndex]
blockRange := &blockRange{
FromBlock: bs[0],
ToBlock: bs[len(bs)-1],
}
chunks = append(chunks, blockRange)
}
}
return chunks
return append(chunks, &blockRange{
FromBlock: from,
ToBlock: to,
})
}

const infuraTooManyResultsErrMsg = "query returned more than 10000 results"
Expand All @@ -627,7 +597,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
"from": from,
"to": to,
}).Trace("Fetching block logs")
numBlocks := to - from
numBlocks := to - from + 1
topics := [][]common.Hash{}
if len(w.topics) > 0 {
topics = append(topics, w.topics)
Expand All @@ -650,14 +620,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
return allLogs, fmt.Errorf("Unable to get the logs for block #%d, because it contains too many logs", from)
}

r := numBlocks % 2
firstBatchSize := numBlocks / 2
secondBatchSize := firstBatchSize
if r == 1 {
secondBatchSize = secondBatchSize + 1
}

endFirstHalf := from + firstBatchSize
endFirstHalf := from + numBlocks/2
startSecondHalf := endFirstHalf + 1
allLogs, err := w.filterLogsRecurisively(from, endFirstHalf, allLogs)
if err != nil {
Expand All @@ -676,11 +639,6 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
return allLogs, nil
}

// getAllRetainedBlocks returns the blocks retained in-memory by the Watcher.
func (w *Watcher) getAllRetainedBlocks() ([]*types.MiniHeader, error) {
return w.stack.PeekAll()
}

func isWarning(err error) bool {
message := err.Error()
for _, warningLevelErrorMessage := range warningLevelErrorMessages {
Expand Down
Loading

0 comments on commit 0acb901

Please sign in to comment.