Skip to content

Commit

Permalink
4844: bugfix and improve (#2337)
Browse files Browse the repository at this point in the history
* core: add debug log for CheckDataAvailableInBatch
* narrow the semantics of func resetItems
* freezer: refactor ResetTable & ResetItems;
* fix: fix some lint issues;
* only newSnapshot for genesis block
* freezer: opt reset blob table logic;
* fix: opt da check logic;
* freezer: opt reset blob table logic;
* fix: fix failed UTs;
* core/types: fix EmptyBody
* freezer: refactor write ancient blocks logic;
* code: update code owner file

---------

Co-authored-by: GalaIO <GalaIO@users.noreply.github.com>
Co-authored-by: zzzckck <152148891+zzzckck@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 28, 2024
1 parent e7c5ce2 commit 7c89c65
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 163 deletions.
3 changes: 1 addition & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Lines starting with '#' are comments.
# Each line is a file pattern followed by one or more owners.
* @zzzckck
* @zjubfd
* @zzzckck @zjubfd
14 changes: 5 additions & 9 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,8 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash
}
}

// If we're at the genesis, snapshot the initial state. Alternatively if we have
// piled up more headers than allowed to be reorged (chain reinit from a freezer),
// consider the checkpoint trusted and snapshot it.
if number == 0 || (number%p.config.Epoch == 0 && (len(headers) > int(params.FullImmutabilityThreshold)/10)) {
// If we're at the genesis, snapshot the initial state.
if number == 0 {
checkpoint := chain.GetHeaderByNumber(number)
if checkpoint != nil {
// get checkpoint data
Expand All @@ -718,12 +716,10 @@ func (p *Parlia) snapshot(chain consensus.ChainHeaderReader, number uint64, hash

// new snapshot
snap = newSnapshot(p.config, p.signatures, number, hash, validators, voteAddrs, p.ethAPI)
if snap.Number%checkpointInterval == 0 { // snapshot will only be loaded when snap.Number%checkpointInterval == 0
if err := snap.store(p.db); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
if err := snap.store(p.db); err != nil {
return nil, err
}
log.Info("Stored checkpoint snapshot to disk", "number", number, "hash", hash)
break
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// The header, total difficulty and canonical hash will be
// removed in the hc.SetHead function.
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteBlobSidecars(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
// Todo(rjl493456442) txlookup, bloombits, etc
Expand Down Expand Up @@ -1340,6 +1341,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
lastBlk := blockChain[len(blockChain)-1]
if bc.chainConfig.Parlia != nil && bc.chainConfig.IsCancun(lastBlk.Number(), lastBlk.Time()) {
if _, err := CheckDataAvailableInBatch(bc, blockChain); err != nil {
log.Debug("CheckDataAvailableInBatch", "err", err)
return 0, err
}
}
Expand Down Expand Up @@ -1404,8 +1406,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)

writeSize, err := rawdb.WriteAncientBlocksWithBlobs(bc.db, blockChain, receiptChain, td)
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (bc *BlockChain) GetSidecarsByHash(hash common.Hash) types.BlobSidecars {
if number == nil {
return nil
}
sidecars := rawdb.ReadRawBlobSidecars(bc.db, hash, *number)
sidecars := rawdb.ReadBlobSidecars(bc.db, hash, *number)
if sidecars == nil {
return nil
}
Expand Down
15 changes: 6 additions & 9 deletions core/data_availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -49,13 +48,10 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobSidecar) error
func IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) (err error) {
// refer logic in ValidateBody
if !chain.Config().IsCancun(block.Number(), block.Time()) {
if block.Sidecars() == nil {
return nil
} else {
if block.Sidecars() != nil {
return errors.New("sidecars present in block body before cancun")
}
} else if block.Sidecars() == nil {
return errors.New("missing sidecars in block body after cancun")
return nil
}

// only required to check within MinBlocksForBlobRequests block's DA
Expand All @@ -64,15 +60,16 @@ func IsDataAvailable(chain consensus.ChainHeaderReader, block *types.Block) (err
if highest == nil || highest.Number.Cmp(current.Number) < 0 {
highest = current
}
defer func() {
log.Info("IsDataAvailable", "block", block.Number(), "hash", block.Hash(), "highest", highest.Number, "sidecars", len(block.Sidecars()), "err", err)
}()
if block.NumberU64()+params.MinBlocksForBlobRequests < highest.Number.Uint64() {
// if we needn't check DA of this block, just clean it
block.CleanSidecars()
return nil
}

// if sidecar is nil, just clean it. And it will be used for saving in ancient.
if block.Sidecars() == nil {
block.CleanSidecars()
}
sidecars := block.Sidecars()
for _, s := range sidecars {
if err := s.SanityCheck(block.Number(), block.Hash()); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/data_availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestIsDataAvailable(t *testing.T) {
}, nil),
chasingHead: params.MinBlocksForBlobRequests + 1,
withSidecar: false,
err: true,
err: false,
},
}

Expand Down
38 changes: 33 additions & 5 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,8 +798,9 @@ func WriteBlock(db ethdb.KeyValueWriter, block *types.Block) {
WriteHeader(db, block.Header())
}

// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
// WriteAncientBlocksWithBlobs writes entire block data with blobs into ancient store and returns the total written size.
func WriteAncientBlocksWithBlobs(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
// find cancun index, it's used for new added blob ancient table
cancunIndex := -1
for i, block := range blocks {
if block.Sidecars() != nil {
Expand All @@ -808,12 +809,39 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
}
}
log.Info("WriteAncientBlocks", "startAt", blocks[0].Number(), "cancunIndex", cancunIndex, "len", len(blocks))

var (
tdSum = new(big.Int).Set(td)
preSize int64
err error
)
if cancunIndex > 0 {
preSize, err = WriteAncientBlocks(db, blocks[:cancunIndex], receipts[:cancunIndex], td)
if err != nil {
return preSize, err
}
for i, block := range blocks[:cancunIndex] {
if i > 0 {
tdSum.Add(tdSum, block.Difficulty())
}
}
tdSum.Add(tdSum, blocks[cancunIndex].Difficulty())
}

// It will reset blob ancient table at cancunIndex
if cancunIndex >= 0 {
if err := ResetEmptyBlobAncientTable(db, blocks[cancunIndex].NumberU64()); err != nil {
if err = ResetEmptyBlobAncientTable(db, blocks[cancunIndex].NumberU64()); err != nil {
return 0, err
}
blocks = blocks[cancunIndex:]
receipts = receipts[cancunIndex:]
}
postSize, err := WriteAncientBlocks(db, blocks, receipts, tdSum)
return preSize + postSize, err
}

// WriteAncientBlocks writes entire block data into ancient store and returns the total written size.
func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts []types.Receipts, td *big.Int) (int64, error) {
var (
tdSum = new(big.Int).Set(td)
stReceipts []*types.ReceiptForStorage
Expand Down Expand Up @@ -853,8 +881,8 @@ func ReadBlobSidecarsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.R
return data
}

// ReadRawBlobSidecars retrieves all the transaction blobs belonging to a block.
func ReadRawBlobSidecars(db ethdb.Reader, hash common.Hash, number uint64) types.BlobSidecars {
// ReadBlobSidecars retrieves all the transaction blobs belonging to a block.
func ReadBlobSidecars(db ethdb.Reader, hash common.Hash, number uint64) types.BlobSidecars {
data := ReadBlobSidecarsRLP(db, hash, number)
if len(data) == 0 {
return nil
Expand Down
6 changes: 3 additions & 3 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,13 @@ func TestBlockBlobSidecarsStorage(t *testing.T) {
sidecars := types.BlobSidecars{types.NewBlobSidecarFromTx(tx1)}

// Check that no sidecars entries are in a pristine database
if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) != 0 {
if bs := ReadBlobSidecars(db, blkHash, 0); len(bs) != 0 {
t.Fatalf("non existent sidecars returned: %v", bs)
}
WriteBody(db, blkHash, 0, body)
WriteBlobSidecars(db, blkHash, 0, sidecars)

if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) == 0 {
if bs := ReadBlobSidecars(db, blkHash, 0); len(bs) == 0 {
t.Fatalf("no sidecars returned")
} else {
if err := checkBlobSidecarsRLP(bs, sidecars); err != nil {
Expand All @@ -470,7 +470,7 @@ func TestBlockBlobSidecarsStorage(t *testing.T) {
}

DeleteBlobSidecars(db, blkHash, 0)
if bs := ReadRawBlobSidecars(db, blkHash, 0); len(bs) != 0 {
if bs := ReadBlobSidecars(db, blkHash, 0); len(bs) != 0 {
t.Fatalf("deleted sidecars returned: %v", bs)
}
}
Expand Down
22 changes: 6 additions & 16 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
env, _ := f.freezeEnv.Load().(*ethdb.FreezerEnv)
// try prune blob data after cancun fork
if isCancun(env, head.Number, head.Time) {
f.tryPruneBlobAncient(env, *number)
f.tryPruneBlobAncientTable(env, *number)
}

// Avoid database thrashing with tiny writes
Expand All @@ -262,7 +262,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
}
}

func (f *chainFreezer) tryPruneBlobAncient(env *ethdb.FreezerEnv, num uint64) {
func (f *chainFreezer) tryPruneBlobAncientTable(env *ethdb.FreezerEnv, num uint64) {
extraReserve := getBlobExtraReserveFromEnv(env)
// It means that there is no need for pruning
if extraReserve == 0 {
Expand All @@ -273,13 +273,8 @@ func (f *chainFreezer) tryPruneBlobAncient(env *ethdb.FreezerEnv, num uint64) {
return
}
expectTail := num - reserveThreshold
h, err := f.TableAncients(ChainFreezerBlobSidecarTable)
if err != nil {
log.Error("Cannot get blob ancient head when prune", "block", num)
return
}
start := time.Now()
if err = f.ResetTable(ChainFreezerBlobSidecarTable, expectTail, h, false); err != nil {
if _, err := f.TruncateTableTail(ChainFreezerBlobSidecarTable, expectTail); err != nil {
log.Error("Cannot prune blob ancient", "block", num, "expectTail", expectTail, "err", err)
return
}
Expand Down Expand Up @@ -312,9 +307,8 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint

var (
cancunNumber uint64
found bool
preHashes []common.Hash
)

for i := number; i <= limit; i++ {
hash := ReadCanonicalHash(nfdb, i)
if hash == (common.Hash{}) {
Expand All @@ -326,16 +320,12 @@ func (f *chainFreezer) freezeRangeWithBlobs(nfdb *nofreezedb, number, limit uint
}
if isCancun(env, h.Number, h.Time) {
cancunNumber = i
found = true
break
}
}
if !found {
return f.freezeRange(nfdb, number, limit)
}

// freeze pre cancun
preHashes, err := f.freezeRange(nfdb, number, cancunNumber-1)
preHashes, err = f.freezeRange(nfdb, number, cancunNumber-1)
if err != nil {
return preHashes, err
}
Expand Down Expand Up @@ -432,5 +422,5 @@ func isCancun(env *ethdb.FreezerEnv, num *big.Int, time uint64) bool {
}

func ResetEmptyBlobAncientTable(db ethdb.AncientWriter, next uint64) error {
return db.ResetTable(ChainFreezerBlobSidecarTable, next, next, true)
return db.ResetTable(ChainFreezerBlobSidecarTable, next, true)
}
14 changes: 10 additions & 4 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,6 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e
return 0, errNotSupported
}

func (db *nofreezedb) ResetTable(kind string, tail uint64, head uint64, onlyEmpty bool) error {
return errNotSupported
}

// TruncateHead returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) TruncateHead(items uint64) (uint64, error) {
return 0, errNotSupported
Expand All @@ -191,6 +187,16 @@ func (db *nofreezedb) TruncateTail(items uint64) (uint64, error) {
return 0, errNotSupported
}

// TruncateTableTail will truncate certain table to new tail
func (db *nofreezedb) TruncateTableTail(kind string, tail uint64) (uint64, error) {
return 0, errNotSupported
}

// ResetTable will reset certain table with new start point
func (db *nofreezedb) ResetTable(kind string, startAt uint64, onlyEmpty bool) error {
return errNotSupported
}

// Sync returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Sync() error {
return errNotSupported
Expand Down
Loading

0 comments on commit 7c89c65

Please sign in to comment.