Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/les: import header chains in batches #19456

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type ChainReader interface {
// GetHeader retrieves a block header from the database by hash and number.
GetHeader(hash common.Hash, number uint64) *types.Header

// HasHeader checks presence of header in the database by hash and number.
HasHeader(hash common.Hash, number uint64) bool

// GetHeaderByNumber retrieves a block header from the database by number.
GetHeaderByNumber(number uint64) *types.Header

Expand Down
25 changes: 17 additions & 8 deletions consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He
}
// Short circuit if the header is known, or it's parent not
number := header.Number.Uint64()
if chain.GetHeader(header.Hash(), number) != nil {
if chain.HasHeader(header.Hash(), number) {
return nil
}
parent := chain.GetHeader(header.ParentHash, number-1)
Expand Down Expand Up @@ -174,7 +174,7 @@ func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []
if parent == nil {
return consensus.ErrUnknownAncestor
}
if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
if chain.HasHeader(headers[index].Hash(), headers[index].Number.Uint64()) {
return nil // known block
}
return ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
Expand All @@ -199,15 +199,24 @@ func (ethash *Ethash) VerifyUncles(chain consensus.ChainReader, block *types.Blo

number, parent := block.NumberU64()-1, block.ParentHash()
for i := 0; i < 7; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be the worst thing to turn this into a constant while you're here, such as maxKin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really a fan of that. The way it's written, it's very clear upon inspection what the limit is, without having to look up the constant in some params-file. Also, since we're not likely to change it ever, I don't see what the benefit would be in parameterizing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries. It was not about parameterizing it, but about making it clear what the 7 means as some people might not be able to immediately know the kin relation for ommers.

ancestor := chain.GetBlock(parent, number)
if ancestor == nil {
ancestorHeader := chain.GetHeader(parent, number)
if ancestorHeader == nil {
break
}
ancestors[ancestor.Hash()] = ancestor.Header()
for _, uncle := range ancestor.Uncles() {
uncles.Add(uncle.Hash())
ancestors[parent] = ancestorHeader

// If the ancestor doesn't have any uncles, we don't have to iterate them
holiman marked this conversation as resolved.
Show resolved Hide resolved
if ancestorHeader.UncleHash != types.EmptyUncleHash {
// Need to add those uncles to the blacklist too
ancestorBlock := chain.GetBlock(parent, number)
if ancestorBlock == nil {
break
}
for _, uncle := range ancestorBlock.Uncles() {
uncles.Add(uncle.Hash())
}
}
parent, number = ancestor.ParentHash(), number-1
parent, number = ancestorHeader.ParentHash, number-1
}
ancestors[block.Hash()] = block.Header()
uncles.Add(block.Hash())
Expand Down
29 changes: 18 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,9 +977,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}

var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = 0
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = 0
checkBodyPresent = true
)
// updateHead updates the head fast sync block if the inserted blocks are better
// and returns a indicator whether the inserted blocks are canonical.
Expand Down Expand Up @@ -1161,9 +1162,18 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
}
if bc.HasBlock(block.Hash(), block.NumberU64()) {
stats.ignored++
continue

// Ignore if the entire data is already known
if checkBodyPresent {
if bc.HasBlock(block.Hash(), block.NumberU64()) {
stats.ignored++
continue
} else {
// If block N is unavailable, so are the next blocks.
// This should hold, but if it does not, the shortcut
// here will only cause overwriting some existing data
checkBodyPresent = false
}
}
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
Expand Down Expand Up @@ -2103,11 +2113,8 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
bc.wg.Add(1)
defer bc.wg.Done()

whFunc := func(header *types.Header) error {
_, err := bc.hc.WriteHeader(header)
return err
}
return bc.hc.InsertHeaderChain(chain, whFunc, start)
errIndex, _, err := bc.hc.InsertHeaderChain(chain, start, nil)
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
return errIndex, err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
Expand Down
1 change: 1 addition & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header
func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil }
func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil }
func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil }
func (cr *fakeChainReader) HasHeader(hash common.Hash, number uint64) bool { return false }
183 changes: 95 additions & 88 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
const (
headerCacheLimit = 512
tdCacheLimit = 1024
numberCacheLimit = 2048
numberCacheLimit = 4096
)

// HeaderChain implements the basic block header chain logic that is shared by
Expand Down Expand Up @@ -123,87 +123,118 @@ func (hc *HeaderChain) GetBlockNumber(hash common.Hash) *uint64 {
return number
}

// WriteHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
// WriteHeaders writes a chain of headers into the local chain, given that the parents
// are already known. If the total difficulty of the newly inserted chain becomes
holiman marked this conversation as resolved.
Show resolved Hide resolved
// greater than the current known TD, the canonical chain is reorged.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) {
// Cache some values to prevent constant recalculation
func (hc *HeaderChain) WriteHeaders(headers []*types.Header, pwCallbackFn PWCallback) (ignored, imported int, status WriteStatus, lastHash common.Hash, lastHeader *types.Header, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this feels like a lot of vales to return from a single function. It makes me wonder if it is trying to do too much or there is another way to split this up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's a bit silly. I'll see if I can shave off some of it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The easy way out is to combine all of these into a WriteHeadersResult since you've already given all the fields name. I'd still leave error separate if you decide to go that route.

if len(headers) == 0 {
return ignored, imported, NonStatTy, lastHash, nil, err
}
ptd := hc.GetTd(headers[0].ParentHash, headers[0].Number.Uint64()-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it was not immediately clear to me that ptd is parentTD just by looking at its name. You may want to consider a name change.

if ptd == nil {
return ignored, imported, NonStatTy, lastHash, nil, consensus.ErrUnknownAncestor
}
type numberHash struct {
number uint64
hash common.Hash
}
var (
hash = header.Hash()
number = header.Number.Uint64()
lastNumber uint64 // Last successfully imported number
externTd *big.Int // TD of successfully imported chain
inserted []numberHash // Quick lookup of number/hash for the chain
)
// Calculate the total difficulty of the header
ptd := hc.GetTd(header.ParentHash, number-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
lastHash, lastNumber = headers[0].ParentHash, headers[0].Number.Uint64()-1 // Already validated above
batch := hc.chainDb.NewBatch()
for _, header := range headers {
hash, number := header.Hash(), header.Number.Uint64()
if header.ParentHash != lastHash {
log.Warn("Non-contiguous header insertion", "header.parent", header.ParentHash, "expected", hash, "number", number)
break
}
externTd = new(big.Int).Add(header.Difficulty, ptd)
holiman marked this conversation as resolved.
Show resolved Hide resolved
// If the header's already known, skip it, otherwise store
if !hc.HasHeader(hash, number) {
// Irrelevant of the canonical status, write the td and header to the database
rawdb.WriteTd(batch, hash, number, externTd)
hc.tdCache.Add(hash, new(big.Int).Set(externTd))

rawdb.WriteHeader(batch, header)
inserted = append(inserted, numberHash{number, hash})
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
}
lastHeader, lastHash, lastNumber, ptd = header, hash, number, externTd
}
batch.Write()
batch.Reset()
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
externTd := new(big.Int).Add(header.Difficulty, ptd)

// Irrelevant of the canonical status, write the td and header to the database
if err := hc.WriteTd(hash, number, externTd); err != nil {
log.Crit("Failed to write header total difficulty", "err", err)
}
rawdb.WriteHeader(hc.chainDb, header)

// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
// Delete any canonical number assignments above the new head
batch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ {
for i := lastNumber + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(batch, i)
}
batch.Write()
batch.Reset()

// Overwrite any stale canonical number assignments
// Overwrite any stale canonical number assignments, work backwards
// from the first inserted header
var (
headHash = header.ParentHash
headNumber = header.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
headHash = headers[0].ParentHash
headNumber = headers[0].Number.Uint64() - 1
)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber)

rawdb.WriteCanonicalHash(batch, headHash, headNumber)
headHeader := hc.GetHeader(headHash, headNumber)
headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
}
// Extend the canonical chain with the new header
rawdb.WriteCanonicalHash(hc.chainDb, hash, number)
rawdb.WriteHeadHeaderHash(hc.chainDb, hash)

hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
headHeaderGauge.Update(header.Number.Int64())

// Now write the current chain in a batch
for _, hdr := range inserted {
rawdb.WriteCanonicalHash(batch, hdr.hash, hdr.number)
}
rawdb.WriteHeadHeaderHash(batch, lastHash)
batch.Write()
batch.Reset()
hc.currentHeaderHash = lastHash
hc.currentHeader.Store(types.CopyHeader(lastHeader))
headHeaderGauge.Update(lastHeader.Number.Int64())
status = CanonStatTy
} else {
status = SideStatTy
}
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)

return
// Execute any post-write callback function
if pwCallbackFn != nil {
for _, header := range headers {
holiman marked this conversation as resolved.
Show resolved Hide resolved
if header.Number.Uint64() > lastNumber {
break
}
pwCallbackFn(header, status)
}
}
imported = len(inserted)
ignored = len(headers) - imported
return ignored, imported, status, lastHash, lastHeader, nil
}

// WhCallback is a callback function for inserting individual headers.
// PWCallback (Post-Write Callback) is a callback function for inserting
// that is called after each header is inserted.
// A callback is used for two reasons: first, in a LightChain, status should be
// processed and light chain events sent, while in a BlockChain this is not
// necessary since chain events are sent after inserting blocks. Second, the
// header writes should be protected by the parent chain mutex individually.
type WhCallback func(*types.Header) error
// necessary since chain events are sent after inserting blocks.
type PWCallback func(header *types.Header, status WriteStatus)

func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
Expand Down Expand Up @@ -256,55 +287,31 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int)
return 0, nil
}

// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
// InsertHeaderChain inserts the given headers, and returns the
// index at which something went wrong, the status for the last imported block, and
// the error (if any)
// The caller should hold applicable mutexes
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time, pwCallBackFn PWCallback) (int, WriteStatus, error) {
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
// All headers passed verification, import them into the database
for i, header := range chain {
// Short circuit insertion if shutting down
if hc.procInterrupt() {
log.Debug("Premature abort during headers import")
return i, errors.New("aborted")
}
// If the header's already known, skip it, otherwise store
hash := header.Hash()
if hc.HasHeader(hash, header.Number.Uint64()) {
externTd := hc.GetTd(hash, header.Number.Uint64())
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
if externTd == nil || externTd.Cmp(localTd) <= 0 {
stats.ignored++
continue
}
}
if err := writeHeader(header); err != nil {
return i, err
}
stats.processed++
}
// Report some public statistics so the user has a clue what's going on
last := chain[len(chain)-1]

ignored, imported, status, lastHash, last, err := hc.WriteHeaders(chain, pwCallBackFn)
context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", last.Number, "hash", last.Hash(),
"count", imported,
"elapsed", common.PrettyDuration(time.Since(start)),
}
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
if err != nil {
context = append(context, "err", err)
}
if last != nil {
context = append(context, "number", last.Number, "hash", lastHash)
if timestamp := time.Unix(int64(last.Time), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
}
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
if ignored > 0 {
context = append(context, []interface{}{"ignored", ignored}...)
}
log.Info("Imported new block headers", context...)

return 0, nil
return 0, status, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also return the last index of the inserted header instead of returning 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understood the return parameters (which are poorly documented) is that the first return value is the "index of the erroring header".

}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
Expand Down
7 changes: 2 additions & 5 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,7 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
defer lc.wg.Done()

var events []interface{}
whFunc := func(header *types.Header) error {
status, err := lc.hc.WriteHeader(header)

postWriteCallback := func(header *types.Header, status core.WriteStatus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this function declaration outside this function and naming it to something that describes that it's doing.

switch status {
case core.CanonStatTy:
log.Debug("Inserted new header", "number", header.Number, "hash", header.Hash())
Expand All @@ -383,9 +381,8 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
log.Debug("Inserted forked header", "number", header.Number, "hash", header.Hash())
events = append(events, core.ChainSideEvent{Block: types.NewBlockWithHeader(header)})
}
return err
}
i, err := lc.hc.InsertHeaderChain(chain, whFunc, start)
i, _, err := lc.hc.InsertHeaderChain(chain, start, postWriteCallback)
lc.postChainEvents(events)
return i, err
}
Expand Down