Skip to content

Commit

Permalink
core/rawdb: backport from bnb-chain/bsc#543
Browse files Browse the repository at this point in the history
Signed-off-by: Delweng <delweng@gmail.com>
  • Loading branch information
jsvisa committed Feb 21, 2023
1 parent ca30659 commit b450f81
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 23 deletions.
6 changes: 3 additions & 3 deletions core/rawdb/accessors_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func TestAncientStorage(t *testing.T) {
}
defer os.RemoveAll(frdir)

db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
Expand Down Expand Up @@ -582,7 +582,7 @@ func BenchmarkWriteAncientBlocks(b *testing.B) {
b.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.RemoveAll(frdir)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
b.Fatalf("failed to create database with ancient backend")
}
Expand Down Expand Up @@ -892,7 +892,7 @@ func TestHeadersRLPStorage(t *testing.T) {
}
defer os.Remove(frdir)

db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false)
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), frdir, "", false, false, false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
Expand Down
5 changes: 3 additions & 2 deletions core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// injects into the database the block hash->number mappings.
func InitDatabaseFromFreezer(db ethdb.Database) {
// If we can't access the freezer or it's empty, abort
frozen, err := db.Ancients()
frozen, err := db.ItemAmountInAncient()
if err != nil || frozen == 0 {
return
}
Expand All @@ -43,8 +43,9 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
start = time.Now()
logged = start.Add(-7 * time.Second) // Unindex during import is fast, don't double log
hash common.Hash
offset = db.AncientOffSet()
)
for i := uint64(0); i < frozen; {
for i := uint64(0) + offset; i < frozen+offset; {
// We read 100K hashes at a time, for a total of 3.2M
count := uint64(100_000)
if i+count > frozen {
Expand Down
100 changes: 93 additions & 7 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"math/big"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -124,6 +125,15 @@ func (db *nofreezedb) TruncateTail(items uint64) error {
return errNotSupported
}

// Ancients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ItemAmountInAncient() (uint64, error) {
return 0, errNotSupported
}

func (db *nofreezedb) AncientOffSet() uint64 {
return 0
}

// Sync returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) Sync() error {
return errNotSupported
Expand Down Expand Up @@ -157,15 +167,61 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
return &nofreezedb{KeyValueStore: db}
}

func ReadOffSetOfCurrentAncientFreezer(db ethdb.KeyValueReader) uint64 {
offset, _ := db.Get(offSetOfCurrentAncientFreezer)
if offset == nil {
return 0
}
return new(big.Int).SetBytes(offset).Uint64()
}

func ReadOffSetOfLastAncientFreezer(db ethdb.KeyValueReader) uint64 {
offset, _ := db.Get(offSetOfLastAncientFreezer)
if offset == nil {
return 0
}
return new(big.Int).SetBytes(offset).Uint64()
}

func WriteOffSetOfCurrentAncientFreezer(db ethdb.KeyValueWriter, offset uint64) {
if err := db.Put(offSetOfCurrentAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}
func WriteOffSetOfLastAncientFreezer(db ethdb.KeyValueWriter, offset uint64) {
if err := db.Put(offSetOfLastAncientFreezer, new(big.Int).SetUint64(offset).Bytes()); err != nil {
log.Crit("Failed to store offSetOfAncientFreezer", "err", err)
}
}

// NewFreezerDb only create a freezer without statedb.
func NewFreezerDb(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*freezer, error) {
// Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB.
frdb, err := newFreezer(frz, namespace, readonly, newOffSet, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}
return frdb, nil
}

// NewDatabaseWithFreezer creates a high level database on top of a given key-
// value data store with a freezer moving immutable chain segments into cold
// storage.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) {
var offset uint64
// The offset of ancientDB should be handled differently in different scenarios.
if isLastOffset {
offset = ReadOffSetOfLastAncientFreezer(db)
} else {
offset = ReadOffSetOfCurrentAncientFreezer(db)
}

// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy)
frdb, err := newFreezer(freezer, namespace, readonly, offset, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}

// Since the freezer can be stored separately from the user's key-value database,
// there's a fairly high probability that the user requests invalid combinations
// of the freezer and database. Ensure that we don't shoot ourselves in the foot
Expand All @@ -188,7 +244,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
// If the genesis hash is empty, we have a new key-value store, so nothing to
// validate in this method. If, however, the genesis hash is not nil, compare
// it to the freezer content.
if kvgenesis, _ := db.Get(headerHashKey(0)); len(kvgenesis) > 0 {
if kvgenesis, _ := db.Get(headerHashKey(0)); offset == 0 && len(kvgenesis) > 0 {
if frozen, _ := frdb.Ancients(); frozen > 0 {
// If the freezer already contains something, ensure that the genesis blocks
// match, otherwise we might mix up freezers across chains and destroy both
Expand Down Expand Up @@ -231,7 +287,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
}
}
// Freezer is consistent with the key-value database, permit combining the two
if !frdb.readonly {
if !disableFreeze && !frdb.readonly {
frdb.wg.Add(1)
go func() {
frdb.freeze(db)
Expand Down Expand Up @@ -269,12 +325,12 @@ func NewLevelDBDatabase(file string, cache int, handles int, namespace string, r

// NewLevelDBDatabaseWithFreezer creates a persistent key-value database with a
// freezer moving immutable chain segments into cold storage.
func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly bool) (ethdb.Database, error) {
func NewLevelDBDatabaseWithFreezer(file string, cache int, handles int, freezer string, namespace string, readonly, disableFreeze, isLastOffset bool) (ethdb.Database, error) {
kvdb, err := leveldb.New(file, cache, handles, namespace, readonly)
if err != nil {
return nil, err
}
frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly)
frdb, err := NewDatabaseWithFreezer(kvdb, freezer, namespace, readonly, disableFreeze, isLastOffset)
if err != nil {
kvdb.Close()
return nil, err
Expand Down Expand Up @@ -312,6 +368,36 @@ func (s *stat) Count() string {
return s.count.String()
}

func AncientInspect(db ethdb.Database) error {
offset := counter(ReadOffSetOfCurrentAncientFreezer(db))
// Get number of ancient rows inside the freezer.
ancients := counter(0)
if count, err := db.ItemAmountInAncient(); err != nil {
log.Error("failed to get the items amount in ancientDB", "err", err)
return err
} else {
ancients = counter(count)
}
var endNumber counter
if offset+ancients <= 0 {
endNumber = 0
} else {
endNumber = offset + ancients - 1
}
stats := [][]string{
{"Offset/StartBlockNumber", "Offset/StartBlockNumber of ancientDB", offset.String()},
{"Amount of remained items in AncientStore", "Remaining items of ancientDB", ancients.String()},
{"The last BlockNumber within ancientDB", "The last BlockNumber", endNumber.String()},
}
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Database", "Category", "Items"})
table.SetFooter([]string{"", "AncientStore information", ""})
table.AppendBulk(stats)
table.Render()

return nil
}

// InspectDatabase traverses the entire database and checks the size
// of all different categories of data.
func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
Expand Down Expand Up @@ -444,7 +530,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
}
// Get number of ancient rows inside the freezer
ancients := counter(0)
if count, err := db.Ancients(); err == nil {
if count, err := db.ItemAmountInAncient(); err == nil {
ancients = counter(count)
}
// Display the database statistic.
Expand Down
39 changes: 28 additions & 11 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ const (
// freezer is a memory mapped append-only database to store immutable chain data
// into flat files:
//
// - The append only nature ensures that disk writes are minimized.
// - The memory mapping ensures we can max out system memory for caching without
// reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
// - The append only nature ensures that disk writes are minimized.
// - The memory mapping ensures we can max out system memory for caching without
// reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead.
type freezer struct {
// WARNING: The `frozen` field is accessed atomically. On 32 bit platforms, only
// 64-bit aligned fields can be atomic. The struct is guaranteed to be so aligned,
Expand All @@ -96,14 +96,16 @@ type freezer struct {
quit chan struct{}
wg sync.WaitGroup
closeOnce sync.Once

offset uint64 // Starting BlockNumber in current freezer
}

// newFreezer creates a chain freezer that moves ancient chain data into
// append-only flat file containers.
//
// The 'tables' argument defines the data tables. If the value of a map
// entry is true, snappy compression is disabled for the table.
func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
func newFreezer(datadir string, namespace string, readonly bool, offset uint64, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
Expand Down Expand Up @@ -131,6 +133,7 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
instanceLock: lock,
trigger: make(chan chan struct{}),
quit: make(chan struct{}),
offset: offset,
}

// Create the tables.
Expand Down Expand Up @@ -175,6 +178,10 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
return nil, err
}

// Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to
// reprensent the absolute number of blocks already frozen.
freezer.frozen += offset

// Create the write batch.
freezer.writeBatch = newFreezerBatch(freezer)

Expand Down Expand Up @@ -211,24 +218,24 @@ func (f *freezer) Close() error {
// in the freezer.
func (f *freezer) HasAncient(kind string, number uint64) (bool, error) {
if table := f.tables[kind]; table != nil {
return table.has(number), nil
return table.has(number - f.offset), nil
}
return false, nil
}

// Ancient retrieves an ancient binary blob from the append-only immutable files.
func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
if table := f.tables[kind]; table != nil {
return table.Retrieve(number)
return table.Retrieve(number - f.offset)
}
return nil, errUnknownTable
}

// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
func (f *freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes)
Expand All @@ -241,6 +248,16 @@ func (f *freezer) Ancients() (uint64, error) {
return atomic.LoadUint64(&f.frozen), nil
}

// ItemAmountInAncient returns the actual length of current ancientDB.
func (f *freezer) ItemAmountInAncient() (uint64, error) {
return atomic.LoadUint64(&f.frozen) - atomic.LoadUint64(&f.offset), nil
}

// AncientOffSet returns the offset of current ancientDB.
func (f *freezer) AncientOffSet() uint64 {
return atomic.LoadUint64(&f.offset)
}

// Tail returns the number of first stored item in the freezer.
func (f *freezer) Tail() (uint64, error) {
return atomic.LoadUint64(&f.tail), nil
Expand Down Expand Up @@ -333,7 +350,7 @@ func (f *freezer) TruncateTail(tail uint64) error {
return nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
if err := table.truncateTail(tail - f.offset); err != nil {
return err
}
}
Expand Down
6 changes: 6 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ var (
// fastTxLookupLimitKey tracks the transaction lookup limit during fast sync.
fastTxLookupLimitKey = []byte("FastTransactionLookupLimit")

// offSet of new updated ancientDB.
offSetOfCurrentAncientFreezer = []byte("offSetOfCurrentAncientFreezer")

// offSet of the ancientDB before updated version.
offSetOfLastAncientFreezer = []byte("offSetOfLastAncientFreezer")

// badBlockKey tracks the list of bad blocks seen by local
badBlockKey = []byte("InvalidBlock")

Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err err
return t.db.ReadAncients(fn)
}

// ItemAmountInAncient returns the actual length of current ancientDB.
func (t *table) ItemAmountInAncient() (uint64, error) {
return t.db.ItemAmountInAncient()
}

// AncientOffSet returns the offset of current ancientDB.
func (t *table) AncientOffSet() uint64 {
return t.db.AncientOffSet()
}

// TruncateHead is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) TruncateHead(items uint64) error {
Expand Down

0 comments on commit b450f81

Please sign in to comment.