Skip to content
This repository has been archived by the owner on Oct 11, 2024. It is now read-only.

In-Memory Blockwatcher Syncing #857

Merged
merged 9 commits into from
Jul 15, 2020
3 changes: 2 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

const (
blockRetentionLimit = 20
ethereumRPCRequestTimeout = 30 * time.Second
peerConnectTimeout = 60 * time.Second
checkNewAddrInterval = 20 * time.Second
Expand Down Expand Up @@ -333,7 +334,7 @@ func newWithPrivateConfig(ctx context.Context, config Config, pConfig privateCon
Topics: topics,
Client: blockWatcherClient,
}
blockWatcher := blockwatch.New(blockWatcherConfig)
blockWatcher := blockwatch.New(blockRetentionLimit, blockWatcherConfig)

// Initialize the order validator
orderValidator, err := ordervalidator.New(
Expand Down
1 change: 1 addition & 0 deletions db/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Database interface {
FindMiniHeaders(opts *MiniHeaderQuery) ([]*types.MiniHeader, error)
DeleteMiniHeader(hash common.Hash) error
DeleteMiniHeaders(opts *MiniHeaderQuery) ([]*types.MiniHeader, error)
ResetMiniHeaders(miniHeaders []*types.MiniHeader) error
GetMetadata() (*types.Metadata, error)
SaveMetadata(metadata *types.Metadata) error
UpdateMetadata(updateFunc func(oldmetadata *types.Metadata) (newMetadata *types.Metadata)) error
Expand Down
16 changes: 16 additions & 0 deletions db/dexie_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,22 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return dexietypes.MiniHeadersToCommonType(jsAdded), dexietypes.MiniHeadersToCommonType(jsRemoved), nil
}

// ResetMiniHeaders deletes all of the existing miniheaders and then stores new
// miniheaders in the database.
func (db *DB) ResetMiniHeaders(miniHeaders []*types.MiniHeader) (err error) {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if r := recover(); r != nil {
err = recoverError(r)
}
}()
jsMiniHeaders := dexietypes.MiniHeadersFromCommonType(miniHeaders)
_, err = jsutil.AwaitPromiseContext(db.ctx, db.dexie.Call("resetMiniHeadersAsync", jsMiniHeaders))
if err != nil {
return convertJSError(err)
}
return nil
}

func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) {
defer func() {
if r := recover(); r != nil {
Expand Down
23 changes: 23 additions & 0 deletions db/sql_implementation.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,29 @@ func (db *DB) AddMiniHeaders(miniHeaders []*types.MiniHeader) (added []*types.Mi
return added, removed, nil
}

// ResetMiniHeaders deletes all of the existing miniheaders and then stores new
// miniheaders in the database.
func (db *DB) ResetMiniHeaders(miniHeaders []*types.MiniHeader) (err error) {
defer func() {
err = convertErr(err)
}()

err = db.sqldb.TransactionalContext(db.ctx, nil, func(txn *sqlz.Tx) error {
_, err := txn.DeleteFrom("miniHeaders").ExecContext(db.ctx)
if err != nil {
return err
}
for _, miniHeader := range miniHeaders {
_, err := txn.NamedExecContext(db.ctx, insertMiniHeaderQuery, sqltypes.MiniHeaderFromCommonType(miniHeader))
if err != nil {
return err
}
}
return nil
})
return nil
}

func (db *DB) GetMiniHeader(hash common.Hash) (miniHeader *types.MiniHeader, err error) {
defer func() {
err = convertErr(err)
Expand Down
125 changes: 42 additions & 83 deletions ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/0xProject/0x-mesh/common/types"
"github.com/0xProject/0x-mesh/constants"
"github.com/0xProject/0x-mesh/db"
"github.com/0xProject/0x-mesh/ethereum/simplestack"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand All @@ -29,7 +30,7 @@ const rpcClientNotFoundError = "not found"
var maxBlocksInGetLogsQuery = 60

// warningLevelErrorMessages are certain blockwatch.Watch errors that we want to report as warnings
// because they do not represent a bug or issue with Mesh and are expected to happen from time to time
// because they do not represent a bug or issue with Mesh and are expected to happen from time to time.
var warningLevelErrorMessages = []string{
constants.GethFilterUnknownBlock,
rpcClientNotFoundError,
Expand Down Expand Up @@ -77,7 +78,8 @@ type Config struct {
// supplied stack) handling block re-orgs and network disruptions gracefully. It can be started from
// any arbitrary block height, and will emit both block added and removed events.
type Watcher struct {
stack *Stack
stack *simplestack.SimpleStack
db *db.DB
client Client
blockFeed event.Feed
blockScope event.SubscriptionScope // Subscription scope tracking current live listeners
Expand All @@ -90,10 +92,11 @@ type Watcher struct {
}

// New creates a new Watcher instance.
func New(config Config) *Watcher {
func New(retentionLimit int, config Config) *Watcher {
return &Watcher{
pollingInterval: config.PollingInterval,
stack: NewStack(config.DB),
db: config.DB,
stack: simplestack.New(retentionLimit, []*types.MiniHeader{}),
client: config.Client,
withLogs: config.WithLogs,
topics: config.Topics,
Expand All @@ -114,10 +117,8 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int,
}
w.mu.Unlock()

latestBlockProcessed, err := w.stack.Peek()
if err != nil {
return 0, err
}
latestBlockProcessed := w.stack.Peek()

// No previously stored block so no blocks have elapsed
if latestBlockProcessed == nil {
return 0, nil
Expand All @@ -139,13 +140,19 @@ func (w *Watcher) FastSyncToLatestBlock(ctx context.Context) (blocksElapsed int,
return blocksElapsed, err
}
if len(events) > 0 {
newMiniHeaders := w.stack.PeekAll()
if err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil {
return blocksElapsed, err
}
w.blockFeed.Send(events)
}
} else {
// Clear all block headers from stack so BlockWatcher starts again from latest block
if err := w.stack.Clear(); err != nil {
// Clear all block headers from stack and database so BlockWatcher
// starts again from latest block
if _, err := w.db.DeleteMiniHeaders(nil); err != nil {
return blocksElapsed, err
}
w.stack.Clear()
}

return blocksElapsed, nil
Expand Down Expand Up @@ -224,25 +231,19 @@ func (w *Watcher) Subscribe(sink chan<- []*Event) event.Subscription {
}

// SyncToLatestBlock syncs our local state of the chain to the latest block found via
// Ethereum RPC
// Ethereum RPC.
func (w *Watcher) SyncToLatestBlock() error {
w.syncToLatestBlockMu.Lock()
defer w.syncToLatestBlockMu.Unlock()

existingMiniHeaders, err := w.stack.PeekAll()
if err != nil {
return err
}
checkpoint := w.stack.Checkpoint()

latestHeader, err := w.client.HeaderByNumber(nil)
if err != nil {
return err
}
latestBlockNumber := latestHeader.Number.Int64()
lastStoredHeader, err := w.stack.Peek()
if err != nil {
return err
}
lastStoredHeader := w.stack.Peek()
var lastStoredBlockNumber int64
if lastStoredHeader != nil {
lastStoredBlockNumber = lastStoredHeader.Number.Int64()
Expand Down Expand Up @@ -277,11 +278,7 @@ func (w *Watcher) SyncToLatestBlock() error {
// stored and fetch it
nextHeader := latestHeader
if numBlocksToFetch != 1 {
lastStoredHeader, err := w.stack.Peek()
if err != nil {
syncErr = err
break
}
lastStoredHeader := w.stack.Peek()
nextBlockNumber := big.NewInt(0).Add(lastStoredHeader.Number, big.NewInt(1))
nextHeader, err = w.client.HeaderByNumber(nextBlockNumber)
if err != nil {
Expand All @@ -302,14 +299,15 @@ func (w *Watcher) SyncToLatestBlock() error {
return syncErr
}
if w.shouldRevertChanges(lastStoredHeader, allEvents) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function returns false if len(events) == 0 || lastStoredHeader == nil, but len(events) can't be since we would have returned in line 301 if so. I think that if lastStoredHeader is nil in line 242 (old) / 256 (new), then it will still be nil here, so maybe we should just do the revert there?

Copy link
Contributor Author

@jalextowle jalextowle Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic doesn't make sense to me. If the function returns false, then we don't want to revert the changes. Am I missing something here?

As a follow up, we don't know nextLatestHeader until after the for-loop (because events is added to by buildCanonicalChain). Having said this, I don't think that we can do the revert earlier unless me make a different refactor first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. For some reason, i thought shouldRevertChanges would return true instead of false. The len(events) == 0 check in the function is still dead code though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, this happens to me all the time. I'll check out the deadcode. If the event length is 0, then we really don't want to revert changes, so I want to be careful that this change wouldn't introduce problems if the function was re-used elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to leave this in for now. Let's address this in a refactor, which may entail completely eliminating this function.

My rationale is that removing the deadcode makes the function more context specific (you have to check len(events) == 0 elsewhere for it to work properly). This is okay in this case because it's only called once, but I don't really like that since we are encapsulating logic in a function that should be reusable.

// TODO(albrow): Take another look at this. Maybe extract to method.
if err := w.stack.Clear(); err != nil {
if err := w.stack.Reset(checkpoint); err != nil {
return err
}
if _, _, err := w.stack.db.AddMiniHeaders(existingMiniHeaders); err != nil {
} else {
w.stack.Checkpoint()
newMiniHeaders := w.stack.PeekAll()
if err := w.db.ResetMiniHeaders(newMiniHeaders); err != nil {
return err
}
} else {
w.blockFeed.Send(allEvents)
}

Expand All @@ -328,10 +326,7 @@ func (w *Watcher) shouldRevertChanges(lastStoredHeader *types.MiniHeader, events
}

func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Event) ([]*Event, error) {
latestHeader, err := w.stack.Peek()
if err != nil {
return nil, err
}
latestHeader := w.stack.Peek()
// Is the stack empty or is it the next block?
if latestHeader == nil || nextHeader.Parent == latestHeader.Hash {
nextHeader, err := w.addLogs(nextHeader)
Expand All @@ -350,9 +345,7 @@ func (w *Watcher) buildCanonicalChain(nextHeader *types.MiniHeader, events []*Ev
}

// Pop latestHeader from the stack. We already have a reference to it.
if _, err := w.stack.Pop(); err != nil {
return events, err
}
w.stack.Pop()
events = append(events, &Event{
Type: Removed,
BlockHeader: latestHeader,
Expand Down Expand Up @@ -397,7 +390,7 @@ func (w *Watcher) addLogs(header *types.MiniHeader) (*types.MiniHeader, error) {
return header, nil
}

// getMissedEventsToBackfill finds missed events that might have occured while the Mesh node was
// getMissedEventsToBackfill finds missed events that might have occurred while the Mesh node was
// offline. It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older then the latest block, it batch fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
Expand All @@ -411,9 +404,8 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context, blocksElapsed i
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
// Doing so will cause the BlockWatcher to start from that furthestBlockProcessed.
if err := w.stack.Clear(); err != nil {
return events, err
}
w.stack.Clear()

// Add furthest block processed into the DB
latestHeader, err := w.client.HeaderByNumber(big.NewInt(int64(furthestBlockProcessed)))
if err != nil {
Expand Down Expand Up @@ -470,7 +462,7 @@ type logRequestResult struct {
Err error
}

// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch
// getLogsRequestChunkSize is the number of `eth_getLogs` JSON RPC to send concurrently in each batch fetch.
const getLogsRequestChunkSize = 3

// getLogsInBlockRange attempts to fetch all logs in the block range supplied. It implements a
Expand All @@ -486,7 +478,7 @@ func (w *Watcher) getLogsInBlockRange(ctx context.Context, from, to int) ([]etht
for len(blockRanges) != 0 {
var chunk []*blockRange
if len(blockRanges) < getLogsRequestChunkSize {
chunk = blockRanges[:len(blockRanges)]
chunk = blockRanges
} else {
chunk = blockRanges[:getLogsRequestChunkSize]
}
Expand Down Expand Up @@ -586,38 +578,17 @@ type blockRange struct {
// blocks' logs twice.
func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange {
chunks := []*blockRange{}
numBlocksLeft := to - from
if numBlocksLeft < rangeSize {
for from+rangeSize-1 < to {
jalextowle marked this conversation as resolved.
Show resolved Hide resolved
chunks = append(chunks, &blockRange{
FromBlock: from,
ToBlock: to,
ToBlock: from + rangeSize - 1,
})
} else {
blocks := []int{}
for i := 0; i <= numBlocksLeft; i++ {
blocks = append(blocks, from+i)
}
numChunks := len(blocks) / rangeSize
remainder := len(blocks) % rangeSize
if remainder > 0 {
numChunks = numChunks + 1
}

for i := 0; i < numChunks; i = i + 1 {
fromIndex := i * rangeSize
toIndex := fromIndex + rangeSize
if toIndex > len(blocks) {
toIndex = len(blocks)
}
bs := blocks[fromIndex:toIndex]
blockRange := &blockRange{
FromBlock: bs[0],
ToBlock: bs[len(bs)-1],
}
chunks = append(chunks, blockRange)
}
from += rangeSize
}
return chunks
return append(chunks, &blockRange{
FromBlock: from,
ToBlock: to,
})
}

const infuraTooManyResultsErrMsg = "query returned more than 10000 results"
Expand All @@ -627,7 +598,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
"from": from,
"to": to,
}).Trace("Fetching block logs")
numBlocks := to - from
numBlocks := to - from + 1
topics := [][]common.Hash{}
if len(w.topics) > 0 {
topics = append(topics, w.topics)
Expand All @@ -650,14 +621,7 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
return allLogs, fmt.Errorf("Unable to get the logs for block #%d, because it contains too many logs", from)
}

r := numBlocks % 2
firstBatchSize := numBlocks / 2
secondBatchSize := firstBatchSize
if r == 1 {
secondBatchSize = secondBatchSize + 1
}

endFirstHalf := from + firstBatchSize
endFirstHalf := from + numBlocks/2
startSecondHalf := endFirstHalf + 1
allLogs, err := w.filterLogsRecurisively(from, endFirstHalf, allLogs)
if err != nil {
Expand All @@ -676,11 +640,6 @@ func (w *Watcher) filterLogsRecurisively(from, to int, allLogs []ethtypes.Log) (
return allLogs, nil
}

// getAllRetainedBlocks returns the blocks retained in-memory by the Watcher.
func (w *Watcher) getAllRetainedBlocks() ([]*types.MiniHeader, error) {
return w.stack.PeekAll()
}

func isWarning(err error) bool {
message := err.Error()
for _, warningLevelErrorMessage := range warningLevelErrorMessages {
Expand Down
Loading