Skip to content

Commit

Permalink
[R4R]db: freezer batch compatible offline prunblock command (#1005)
Browse files Browse the repository at this point in the history
[R4R]db: freezer batch compatible offline prunblock command (#1005)
  • Loading branch information
joeylichang authored Jul 22, 2022
1 parent 403b19c commit d93211b
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 48 deletions.
48 changes: 39 additions & 9 deletions cmd/geth/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,48 @@ func accessDb(ctx *cli.Context, stack *node.Node) (ethdb.Database, error) {
}

func pruneBlock(ctx *cli.Context) error {
stack, config := makeConfigNode(ctx)
var (
stack *node.Node
config gethConfig
chaindb ethdb.Database
err error

oldAncientPath string
newAncientPath string
blockAmountReserved uint64
blockpruner *pruner.BlockPruner
)

stack, config = makeConfigNode(ctx)
defer stack.Close()
blockAmountReserved := ctx.GlobalUint64(utils.BlockAmountReserved.Name)
chaindb, err := accessDb(ctx, stack)
blockAmountReserved = ctx.GlobalUint64(utils.BlockAmountReserved.Name)
chaindb, err = accessDb(ctx, stack)
if err != nil {
return err
}
var newAncientPath string
oldAncientPath := ctx.GlobalString(utils.AncientFlag.Name)
if !filepath.IsAbs(oldAncientPath) {
// force absolute paths, which often fail due to the splicing of relative paths
return errors.New("datadir.ancient not abs path")

// Most of the problems reported by users when first using the prune-block
// tool are due to incorrect directory settings.Here, the default directory
// and relative directory are canceled, and the user is forced to formulate
// an absolute path to guide users to run the prune-block command correctly.
if !ctx.GlobalIsSet(utils.DataDirFlag.Name) {
return errors.New("datadir must be set")
} else {
datadir := ctx.GlobalString(utils.DataDirFlag.Name)
if !filepath.IsAbs(datadir) {
// force absolute paths, which often fail due to the splicing of relative paths
return errors.New("datadir not abs path")
}
}

if !ctx.GlobalIsSet(utils.AncientFlag.Name) {
return errors.New("datadir.ancient must be set")
} else {
oldAncientPath = ctx.GlobalString(utils.AncientFlag.Name)
if !filepath.IsAbs(oldAncientPath) {
// force absolute paths, which often fail due to the splicing of relative paths
return errors.New("datadir.ancient not abs path")
}
}

path, _ := filepath.Split(oldAncientPath)
Expand All @@ -350,7 +380,7 @@ func pruneBlock(ctx *cli.Context) error {
}
newAncientPath = filepath.Join(path, "ancient_back")

blockpruner := pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)
blockpruner = pruner.NewBlockPruner(chaindb, stack, oldAncientPath, newAncientPath, blockAmountReserved)

lock, exist, err := fileutil.Flock(filepath.Join(oldAncientPath, "PRUNEFLOCK"))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ var (
}
AncientFlag = DirectoryFlag{
Name: "datadir.ancient",
Usage: "Data directory for ancient chain segments (default = inside chaindata)",
Usage: "Data directory for ancient chain segments (default = inside chaindata, '${datadir}/geth/chaindata/ancient/')",
}
DiffFlag = DirectoryFlag{
Name: "datadir.diff",
Expand Down
21 changes: 7 additions & 14 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
// NewFreezerDb only create a freezer without statedb.
func NewFreezerDb(db ethdb.KeyValueStore, frz, namespace string, readonly bool, newOffSet uint64) (*freezer, error) {
// Create the idle freezer instance, this operation should be atomic to avoid mismatch between offset and acientDB.
frdb, err := newFreezer(frz, namespace, readonly, freezerTableSize, FreezerNoSnappy)
frdb, err := newFreezer(frz, namespace, readonly, newOffSet, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}
frdb.offset = newOffSet
frdb.frozen += newOffSet
return frdb, nil
}

Expand Down Expand Up @@ -213,12 +211,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st

if ReadAncientType(db) == PruneFreezerType {
log.Warn("prune ancinet flag is set, may start fail, can add pruneancient parameter resolve")
}

// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
return nil, errors.New("pruneancient was set, please add pruneancient parameter")
}

var offset uint64
Expand All @@ -229,11 +222,11 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, freezer string, namespace st
offset = ReadOffSetOfCurrentAncientFreezer(db)
}

frdb.offset = offset

// Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to
// reprensent the absolute number of blocks already frozen.
frdb.frozen += offset
// Create the idle freezer instance
frdb, err := newFreezer(freezer, namespace, readonly, offset, freezerTableSize, FreezerNoSnappy)
if err != nil {
return nil, err
}

// Since the freezer can be stored separately from the user's key-value database,
// there's a fairly high probability that the user requests invalid combinations
Expand Down
10 changes: 8 additions & 2 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type freezer struct {
//
// The 'tables' argument defines the data tables. If the value of a map
// entry is true, snappy compression is disabled for the table.
func newFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
func newFreezer(datadir string, namespace string, readonly bool, offset uint64, maxTableSize uint32, tables map[string]bool) (*freezer, error) {
// Create the initial freezer object
var (
readMeter = metrics.NewRegisteredMeter(namespace+"ancient/read", nil)
Expand Down Expand Up @@ -131,6 +131,7 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
instanceLock: lock,
trigger: make(chan chan struct{}),
quit: make(chan struct{}),
offset: offset,
}

// Create the tables.
Expand Down Expand Up @@ -162,10 +163,14 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
return nil, err
}

// Some blocks in ancientDB may have already been frozen and been pruned, so adding the offset to
// reprensent the absolute number of blocks already frozen.
freezer.frozen += offset

// Create the write batch.
freezer.writeBatch = newFreezerBatch(freezer)

log.Info("Opened ancient database", "database", datadir, "readonly", readonly)
log.Info("Opened ancient database", "database", datadir, "readonly", readonly, "frozen", freezer.frozen)
return freezer, nil
}

Expand Down Expand Up @@ -368,6 +373,7 @@ func (f *freezer) repair() error {
return err
}
}
log.Info("AncientDB item count", "items", min)
atomic.StoreUint64(&f.frozen, min)
return nil
}
Expand Down
16 changes: 11 additions & 5 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type freezerBatch struct {
func newFreezerBatch(f *freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
batch.tables[kind] = table.newBatch(f.offset)
}
return batch
}
Expand Down Expand Up @@ -91,11 +91,15 @@ type freezerTableBatch struct {
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
offset uint64
}

// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
func (t *freezerTable) newBatch(offset uint64) *freezerTableBatch {
var batch = &freezerTableBatch{
t: t,
offset: offset,
}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
Expand All @@ -107,7 +111,8 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
curItem := batch.t.items + batch.offset
batch.curItem = atomic.LoadUint64(&curItem)
batch.totalBytes = 0
}

Expand Down Expand Up @@ -201,7 +206,8 @@ func (batch *freezerTableBatch) commit() error {

// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
items := batch.curItem - batch.offset
atomic.StoreUint64(&batch.t.items, items)

// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
Expand Down
18 changes: 9 additions & 9 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
// In-between writes, the table is closed and re-opened.
for x := 0; x < 255; x++ {
data := getChunk(15, x)
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(uint64(x), data))
require.NoError(t, batch.commit())
f.Close()
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Errorf("Expected error for missing index entry")
}
// We should now be able to store items again, from item = 1
batch := f.newBatch()
batch := f.newBatch(0)
for x := 1; x < 0xff; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
t.Fatal(err)
}
// Write 80 bytes, splitting out into two files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE)))
require.NoError(t, batch.commit())
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
}

// Write 40 bytes
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -507,7 +507,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
f.truncate(0)

// Write the data again
batch := f.newBatch()
batch := f.newBatch(0)
for x := 0; x < 30; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand All @@ -529,7 +529,7 @@ func TestFreezerOffset(t *testing.T) {
}

// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestFreezerOffset(t *testing.T) {
t.Log(f.dumpIndexString(0, 100))

// It should allow writing item 6.
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -710,7 +710,7 @@ func getChunk(size int, b int) []byte {
func writeChunks(t *testing.T, ft *freezerTable, n int, length int) {
t.Helper()

batch := ft.newBatch()
batch := ft.newBatch(0)
for i := 0; i < n; i++ {
if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil {
t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err)
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestFreezerReadonly(t *testing.T) {

// Case 5: Now write some data via a batch.
// This should fail either during AppendRaw or Commit
batch := f.newBatch()
batch := f.newBatch(0)
writeErr := batch.AppendRaw(32, make([]byte, 1))
if writeErr == nil {
writeErr = batch.commit()
Expand Down
12 changes: 6 additions & 6 deletions core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestFreezerModifyRollback(t *testing.T) {

// Reopen and check that the rolled-back data doesn't reappear.
tables := map[string]bool{"test": true}
f2, err := newFreezer(dir, "", false, 2049, tables)
f2, err := newFreezer(dir, "", false, 0, 2049, tables)
if err != nil {
t.Fatalf("can't reopen freezer after failed ModifyAncients: %v", err)
}
Expand Down Expand Up @@ -262,17 +262,17 @@ func TestFreezerReadonlyValidate(t *testing.T) {
defer os.RemoveAll(dir)
// Open non-readonly freezer and fill individual tables
// with different amount of data.
f, err := newFreezer(dir, "", false, 2049, tables)
f, err := newFreezer(dir, "", false, 0, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
aBatch := f.tables["a"].newBatch()
aBatch := f.tables["a"].newBatch(0)
require.NoError(t, aBatch.AppendRaw(0, item))
require.NoError(t, aBatch.AppendRaw(1, item))
require.NoError(t, aBatch.AppendRaw(2, item))
require.NoError(t, aBatch.commit())
bBatch := f.tables["b"].newBatch()
bBatch := f.tables["b"].newBatch(0)
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())
if f.tables["a"].items != 3 {
Expand All @@ -285,7 +285,7 @@ func TestFreezerReadonlyValidate(t *testing.T) {

// Re-openening as readonly should fail when validating
// table lengths.
f, err = newFreezer(dir, "", true, 2049, tables)
f, err = newFreezer(dir, "", true, 0, 2049, tables)
if err == nil {
t.Fatal("readonly freezer should fail with differing table lengths")
}
Expand All @@ -300,7 +300,7 @@ func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, strin
}
// note: using low max table size here to ensure the tests actually
// switch between multiple files.
f, err := newFreezer(dir, "", false, 2049, tables)
f, err := newFreezer(dir, "", false, 0, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
Expand Down
4 changes: 3 additions & 1 deletion core/rawdb/prunedfreezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ func newPrunedFreezer(datadir string, db ethdb.KeyValueStore) (*prunedfreezer, e
func (f *prunedfreezer) repair(datadir string) error {
// compatible prune-block-tool
offset := ReadOffSetOfCurrentAncientFreezer(f.db)
log.Info("Read last offline prune-block start block number", "offset", offset)

// compatible freezer
min := uint64(math.MaxUint64)
for name, disableSnappy := range FreezerNoSnappy {
table, err := NewFreezerTable(datadir, name, disableSnappy, true)
table, err := NewFreezerTable(datadir, name, disableSnappy, false)
if err != nil {
return err
}
Expand All @@ -76,6 +77,7 @@ func (f *prunedfreezer) repair(datadir string) error {
}
table.Close()
}
log.Info("Read ancientdb item counts", "items", min)
offset += min

if frozen := ReadFrozenOfAncientFreezer(f.db); frozen > offset {
Expand Down
5 changes: 4 additions & 1 deletion core/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,10 @@ func (p *BlockPruner) backUpOldDb(name string, cache, handles int, namespace str
return consensus.ErrUnknownAncestor
}
// Write into new ancient_back db.
rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td)
if _, err := rawdb.WriteAncientBlocks(frdbBack, []*types.Block{block}, []types.Receipts{receipts}, td); err != nil {
log.Error("failed to write new ancient", "error", err)
return err
}
// Print the log every 5s for better trace.
if common.PrettyDuration(time.Since(start)) > common.PrettyDuration(5*time.Second) {
log.Info("block backup process running successfully", "current blockNumber for backup", blockNumber)
Expand Down

0 comments on commit d93211b

Please sign in to comment.