From df221885ac908210f995459bb4c4db826505bbd4 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Thu, 11 Apr 2019 19:41:24 +0200 Subject: [PATCH] core: import header chains in batches --- consensus/consensus.go | 3 + consensus/ethash/consensus.go | 25 +++-- core/blockchain.go | 29 ++++-- core/chain_makers.go | 1 + core/headerchain.go | 183 ++++++++++++++++++---------------- light/lightchain.go | 7 +- 6 files changed, 136 insertions(+), 112 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index f753af550ca0..f8d29fe15cd8 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -39,6 +39,9 @@ type ChainReader interface { // GetHeader retrieves a block header from the database by hash and number. GetHeader(hash common.Hash, number uint64) *types.Header + // HasHeader checks presence of header in the database by hash and number. + HasHeader(hash common.Hash, number uint64) bool + // GetHeaderByNumber retrieves a block header from the database by number. GetHeaderByNumber(number uint64) *types.Header diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index d271518f4fed..58e13a1bfa7e 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -88,7 +88,7 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He } // Short circuit if the header is known, or it's parent not number := header.Number.Uint64() - if chain.GetHeader(header.Hash(), number) != nil { + if chain.HasHeader(header.Hash(), number) { return nil } parent := chain.GetHeader(header.ParentHash, number-1) @@ -174,7 +174,7 @@ func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers [] if parent == nil { return consensus.ErrUnknownAncestor } - if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil { + if chain.HasHeader(headers[index].Hash(), headers[index].Number.Uint64()) { return nil // known block } return ethash.verifyHeader(chain, headers[index], parent, false, seals[index]) @@ -199,15 +199,24 @@ func (ethash *Ethash) VerifyUncles(chain consensus.ChainReader, block *types.Blo number, parent := block.NumberU64()-1, block.ParentHash() for i := 0; i < 7; i++ { - ancestor := chain.GetBlock(parent, number) - if ancestor == nil { + ancestorHeader := chain.GetHeader(parent, number) + if ancestorHeader == nil { break } - ancestors[ancestor.Hash()] = ancestor.Header() - for _, uncle := range ancestor.Uncles() { - uncles.Add(uncle.Hash()) + ancestors[parent] = ancestorHeader + + // If the ancestor doesn't have any uncles, we don't have to iterate them + if ancestorHeader.UncleHash != types.EmptyUncleHash { + // Need to add those uncles to the blacklist too + ancestorBlock := chain.GetBlock(parent, number) + if ancestorBlock == nil { + break + } + for _, uncle := range ancestorBlock.Uncles() { + uncles.Add(uncle.Hash()) + } } - parent, number = ancestor.ParentHash(), number-1 + parent, number = ancestorHeader.ParentHash, number-1 } ancestors[block.Hash()] = block.Header() uncles.Add(block.Hash()) diff --git a/core/blockchain.go b/core/blockchain.go index 9fb02b1482d1..752e85ae520d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -977,9 +977,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } var ( - stats = struct{ processed, ignored int32 }{} - start = time.Now() - size = 0 + stats = struct{ processed, ignored int32 }{} + start = time.Now() + size = 0 + checkBodyPresent = true ) // updateHead updates the head fast sync block if the inserted blocks are better // and returns a indicator whether the inserted blocks are canonical. @@ -1161,9 +1162,18 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !bc.HasHeader(block.Hash(), block.NumberU64()) { return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4]) } - if bc.HasBlock(block.Hash(), block.NumberU64()) { - stats.ignored++ - continue + + // Ignore if the entire data is already known + if checkBodyPresent { + if bc.HasBlock(block.Hash(), block.NumberU64()) { + stats.ignored++ + continue + } else { + // If block N is unavailable, so are the next blocks. + // This should hold, but if it does not, the shortcut + // here will only cause overwriting some existing data + checkBodyPresent = false + } } // Write all the data out into the database rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()) @@ -2103,11 +2113,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i bc.wg.Add(1) defer bc.wg.Done() - whFunc := func(header *types.Header) error { - _, err := bc.hc.WriteHeader(header) - return err - } - return bc.hc.InsertHeaderChain(chain, whFunc, start) + errIndex, _, err := bc.hc.InsertHeaderChain(chain, start, nil) + return errIndex, err } // CurrentHeader retrieves the current head header of the canonical chain. The diff --git a/core/chain_makers.go b/core/chain_makers.go index 0b0fcdb4aa39..e72bb4a6e940 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -297,3 +297,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil } func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } +func (cr *fakeChainReader) HasHeader(hash common.Hash, number uint64) bool { return false } diff --git a/core/headerchain.go b/core/headerchain.go index 4682069cff0c..0083a9691a56 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -39,7 +39,7 @@ import ( const ( headerCacheLimit = 512 tdCacheLimit = 1024 - numberCacheLimit = 2048 + numberCacheLimit = 4096 ) // HeaderChain implements the basic block header chain logic that is shared by @@ -123,42 +123,63 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 { return number } -// WriteHeader writes a header into the local chain, given that its parent is -// already known. If the total difficulty of the newly inserted header becomes -// greater than the current known TD, the canonical chain is re-routed. +// WriteHeaders writes a chain of headers into the local chain, given that the parents +// are already known. If the total difficulty of the newly inserted chain becomes +// greater than the current known TD, the canonical chain is reorged. // // Note: This method is not concurrent-safe with inserting blocks simultaneously // into the chain, as side effects caused by reorganisations cannot be emulated // without the real blocks. Hence, writing headers directly should only be done // in two scenarios: pure-header mode of operation (light clients), or properly // separated header/block phases (non-archive clients). -func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) { - // Cache some values to prevent constant recalculation +func (hc *HeaderChain) WriteHeaders(headers []*types.Header, pwCallbackFn PWCallback) (ignored, imported int, status WriteStatus, lastHash common.Hash, lastHeader *types.Header, err error) { + if len(headers) == 0 { + return ignored, imported, NonStatTy, lastHash, nil, err + } + ptd := hc.GetTd(headers[0].ParentHash, headers[0].Number.Uint64()-1) + if ptd == nil { + return ignored, imported, NonStatTy, lastHash, nil, consensus.ErrUnknownAncestor + } + type numberHash struct { + number uint64 + hash common.Hash + } var ( - hash = header.Hash() - number = header.Number.Uint64() + lastNumber uint64 // Last successfully imported number + externTd *big.Int // TD of successfully imported chain + inserted []numberHash // Quick lookup of number/hash for the chain ) - // Calculate the total difficulty of the header - ptd := hc.GetTd(header.ParentHash, number-1) - if ptd == nil { - return NonStatTy, consensus.ErrUnknownAncestor + lastHash, lastNumber = headers[0].ParentHash, headers[0].Number.Uint64()-1 // Already validated above + batch := hc.chainDb.NewBatch() + for _, header := range headers { + hash, number := header.Hash(), header.Number.Uint64() + if header.ParentHash != lastHash { + log.Warn("Non-contiguous header insertion", "header.parent", header.ParentHash, "expected", hash, "number", number) + break + } + externTd = new(big.Int).Add(header.Difficulty, ptd) + // If the header's already known, skip it, otherwise store + if !hc.HasHeader(hash, number) { + // Irrelevant of the canonical status, write the td and header to the database + rawdb.WriteTd(batch, hash, number, externTd) + hc.tdCache.Add(hash, new(big.Int).Set(externTd)) + + rawdb.WriteHeader(batch, header) + inserted = append(inserted, numberHash{number, hash}) + hc.headerCache.Add(hash, header) + hc.numberCache.Add(hash, number) + } + lastHeader, lastHash, lastNumber, ptd = header, hash, number, externTd } + batch.Write() + batch.Reset() localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64()) - externTd := new(big.Int).Add(header.Difficulty, ptd) - - // Irrelevant of the canonical status, write the td and header to the database - if err := hc.WriteTd(hash, number, externTd); err != nil { - log.Crit("Failed to write header total difficulty", "err", err) - } - rawdb.WriteHeader(hc.chainDb, header) - // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { // Delete any canonical number assignments above the new head - batch := hc.chainDb.NewBatch() - for i := number + 1; ; i++ { + for i := lastNumber + 1; ; i++ { hash := rawdb.ReadCanonicalHash(hc.chainDb, i) if hash == (common.Hash{}) { break @@ -166,44 +187,54 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er rawdb.DeleteCanonicalHash(batch, i) } batch.Write() + batch.Reset() - // Overwrite any stale canonical number assignments + // Overwrite any stale canonical number assignments, work backwards + // from the first inserted header var ( - headHash = header.ParentHash - headNumber = header.Number.Uint64() - 1 - headHeader = hc.GetHeader(headHash, headNumber) + headHash = headers[0].ParentHash + headNumber = headers[0].Number.Uint64() - 1 ) for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { - rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber) - + rawdb.WriteCanonicalHash(batch, headHash, headNumber) + headHeader := hc.GetHeader(headHash, headNumber) headHash = headHeader.ParentHash headNumber = headHeader.Number.Uint64() - 1 - headHeader = hc.GetHeader(headHash, headNumber) } - // Extend the canonical chain with the new header - rawdb.WriteCanonicalHash(hc.chainDb, hash, number) - rawdb.WriteHeadHeaderHash(hc.chainDb, hash) - - hc.currentHeaderHash = hash - hc.currentHeader.Store(types.CopyHeader(header)) - headHeaderGauge.Update(header.Number.Int64()) - + // Now write the current chain in a batch + for _, hdr := range inserted { + rawdb.WriteCanonicalHash(batch, hdr.hash, hdr.number) + } + rawdb.WriteHeadHeaderHash(batch, lastHash) + batch.Write() + batch.Reset() + hc.currentHeaderHash = lastHash + hc.currentHeader.Store(types.CopyHeader(lastHeader)) + headHeaderGauge.Update(lastHeader.Number.Int64()) status = CanonStatTy } else { status = SideStatTy } - hc.headerCache.Add(hash, header) - hc.numberCache.Add(hash, number) - - return + // Execute any post-write callback function + if pwCallbackFn != nil { + for _, header := range headers { + if header.Number.Uint64() > lastNumber { + break + } + pwCallbackFn(header, status) + } + } + imported = len(inserted) + ignored = len(headers) - imported + return ignored, imported, status, lastHash, lastHeader, nil } -// WhCallback is a callback function for inserting individual headers. +// PWCallback (Post-Write Callback) is a callback function for inserting +// that is called after each header is inserted. // A callback is used for two reasons: first, in a LightChain, status should be // processed and light chain events sent, while in a BlockChain this is not -// necessary since chain events are sent after inserting blocks. Second, the -// header writes should be protected by the parent chain mutex individually. -type WhCallback func(*types.Header) error +// necessary since chain events are sent after inserting blocks. +type PWCallback func(header *types.Header, status WriteStatus) func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) { // Do a sanity check that the provided chain is actually ordered and linked @@ -256,55 +287,31 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) return 0, nil } -// InsertHeaderChain attempts to insert the given header chain in to the local -// chain, possibly creating a reorg. If an error is returned, it will return the -// index number of the failing header as well an error describing what went wrong. -// -// The verify parameter can be used to fine tune whether nonce verification -// should be done or not. The reason behind the optional check is because some -// of the header retrieval mechanisms already need to verfy nonces, as well as -// because nonces can be verified sparsely, not needing to check each. -func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) { +// InsertHeaderChain inserts the given headers, and returns the +// index at which something went wrong, the status for the last imported block, and +// the error (if any) +// The caller should hold applicable mutexes +func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time, pwCallBackFn PWCallback) (int, WriteStatus, error) { // Collect some import statistics to report on - stats := struct{ processed, ignored int }{} - // All headers passed verification, import them into the database - for i, header := range chain { - // Short circuit insertion if shutting down - if hc.procInterrupt() { - log.Debug("Premature abort during headers import") - return i, errors.New("aborted") - } - // If the header's already known, skip it, otherwise store - hash := header.Hash() - if hc.HasHeader(hash, header.Number.Uint64()) { - externTd := hc.GetTd(hash, header.Number.Uint64()) - localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64()) - if externTd == nil || externTd.Cmp(localTd) <= 0 { - stats.ignored++ - continue - } - } - if err := writeHeader(header); err != nil { - return i, err - } - stats.processed++ - } - // Report some public statistics so the user has a clue what's going on - last := chain[len(chain)-1] - + ignored, imported, status, lastHash, last, err := hc.WriteHeaders(chain, pwCallBackFn) context := []interface{}{ - "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), - "number", last.Number, "hash", last.Hash(), + "count", imported, + "elapsed", common.PrettyDuration(time.Since(start)), } - if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute { - context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) + if err != nil { + context = append(context, "err", err) + } + if last != nil { + context = append(context, "number", last.Number, "hash", lastHash) + if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute { + context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) + } } - if stats.ignored > 0 { - context = append(context, []interface{}{"ignored", stats.ignored}...) + if ignored > 0 { + context = append(context, []interface{}{"ignored", ignored}...) } log.Info("Imported new block headers", context...) - - return 0, nil + return 0, status, nil } // GetBlockHashesFromHash retrieves a number of block hashes starting at a given diff --git a/light/lightchain.go b/light/lightchain.go index 02b90138a2d5..ecc928a1e973 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -371,9 +371,7 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i defer lc.wg.Done() var events []interface{} - whFunc := func(header *types.Header) error { - status, err := lc.hc.WriteHeader(header) - + postWriteCallback := func(header *types.Header, status core.WriteStatus) { switch status { case core.CanonStatTy: log.Debug("Inserted new header", "number", header.Number, "hash", header.Hash()) @@ -383,9 +381,8 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i log.Debug("Inserted forked header", "number", header.Number, "hash", header.Hash()) events = append(events, core.ChainSideEvent{Block: types.NewBlockWithHeader(header)}) } - return err } - i, err := lc.hc.InsertHeaderChain(chain, whFunc, start) + i, _, err := lc.hc.InsertHeaderChain(chain, start, postWriteCallback) lc.postChainEvents(events) return i, err }