Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/state/snapshot: update generator marker in sync with flushes #21804

Merged
merged 1 commit into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
if _, err := chain.InsertChain(blocks[startPoint:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
// Set the flag for writing legacy journal if ncessary
// Set the flag for writing legacy journal if necessary
if tt.legacy {
chain.writeLegacyJournal = true
}
Expand Down Expand Up @@ -708,7 +708,6 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
} else if tt.gapped > 0 {
// Insert blocks without enabling snapshot if gapping is required.
chain.Stop()

gappedBlocks, _ := GenerateChain(params.TestChainConfig, blocks[len(blocks)-1], engine, gendb, tt.gapped, func(i int, b *BlockGen) {})

// Insert a few more blocks without enabling snapshot
Expand Down Expand Up @@ -766,6 +765,7 @@ func testSnapshot(t *testing.T, tt *snapshotTest) {
defer chain.Stop()
} else {
chain.Stop()

// Restart the chain normally
chain, err = NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil)
if err != nil {
Expand Down
48 changes: 46 additions & 2 deletions core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package snapshot
import (
"bytes"
"encoding/binary"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -116,6 +117,38 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache i
return base
}

// journalProgress persists the generator stats into the database to resume later.
func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
// Write out the generator marker. Note it's a standalone disk layer generator
// which is not mixed with journal. It's ok if the generator is persisted while
// journal is not.
entry := journalGenerator{
Done: marker == nil,
Marker: marker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(err) // Cannot happen, here to catch dev errors
}
var logstr string
switch len(marker) {
case 0:
logstr = "done"
case common.HashLength:
logstr = fmt.Sprintf("%#x", marker)
default:
logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
}
log.Debug("Journalled generator progress", "progress", logstr)
rawdb.WriteSnapshotGenerator(db, blob)
}

// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gethering and logging, since the method surfs the blocks as they arrive, often
Expand Down Expand Up @@ -187,11 +220,15 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Only write and set the marker if we actually did something useful
if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
marker := accountHash[:]
journalProgress(batch, marker, stats)

batch.Write()
batch.Reset()

dl.lock.Lock()
dl.genMarker = accountHash[:]
dl.genMarker = marker
dl.lock.Unlock()
}
if abort != nil {
Expand Down Expand Up @@ -228,11 +265,15 @@ func (dl *diskLayer) generate(stats *generatorStats) {
if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
// Only write and set the marker if we actually did something useful
if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
marker := append(accountHash[:], storeIt.Key...)
journalProgress(batch, marker, stats)

batch.Write()
batch.Reset()

dl.lock.Lock()
dl.genMarker = append(accountHash[:], storeIt.Key...)
dl.genMarker = marker
dl.lock.Unlock()
}
if abort != nil {
Expand Down Expand Up @@ -264,6 +305,9 @@ func (dl *diskLayer) generate(stats *generatorStats) {
}
// Snapshot fully generated, set the marker to nil
if batch.ValueSize() > 0 {
// Ensure the generator entry is in sync with the data
journalProgress(batch, nil, stats)

batch.Write()
}
log.Info("Generated state snapshot", "accounts", stats.accounts, "slots", stats.slots,
Expand Down
30 changes: 8 additions & 22 deletions core/state/snapshot/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) {
return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r)
}

// Journal writes the persistent layer generator stats into a buffer to be stored
// in the database as the snapshot journal.
// Journal terminates any in-progress snapshot generation, also implicitly pushing
// the progress into the database.
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
// If the snapshot is currently being generated, abort it
var stats *generatorStats
Expand All @@ -296,25 +296,10 @@ func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
if dl.stale {
return common.Hash{}, ErrSnapshotStale
}
// Write out the generator marker. Note it's a standalone disk layer generator
// which is not mixed with journal. It's ok if the generator is persisted while
// journal is not.
entry := journalGenerator{
Done: dl.genMarker == nil,
Marker: dl.genMarker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
return common.Hash{}, err
}
log.Debug("Journalled disk layer", "root", dl.root, "complete", dl.genMarker == nil)
rawdb.WriteSnapshotGenerator(dl.diskdb, blob)
// Ensure the generator stats is written even if none was ran this cycle
journalProgress(dl.diskdb, dl.genMarker, stats)

log.Debug("Journalled disk layer", "root", dl.root)
return dl.root, nil
}

Expand Down Expand Up @@ -401,6 +386,7 @@ func (dl *diskLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
log.Debug("Legacy journalled disk layer", "root", dl.root)
if err := rlp.Encode(buffer, entry); err != nil {
return common.Hash{}, err
}
Expand Down Expand Up @@ -455,6 +441,6 @@ func (dl *diffLayer) LegacyJournal(buffer *bytes.Buffer) (common.Hash, error) {
if err := rlp.Encode(buffer, storage); err != nil {
return common.Hash{}, err
}
log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
log.Debug("Legacy journalled disk layer", "root", dl.root, "parent", dl.parent.Root())
return base, nil
}
18 changes: 2 additions & 16 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,22 +512,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
// Update the snapshot block marker and write any remainder data
rawdb.WriteSnapshotRoot(batch, bottom.root)

// Write out the generator marker
entry := journalGenerator{
Done: base.genMarker == nil,
Marker: base.genMarker,
}
if stats != nil {
entry.Wiping = (stats.wiping != nil)
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(fmt.Sprintf("Failed to RLP encode generator %v", err))
}
rawdb.WriteSnapshotGenerator(batch, blob)
// Write out the generator progress marker and report
journalProgress(batch, base.genMarker, stats)

// Flush all the updates in the single db operation. Ensure the
// disk layer transition is atomic.
Expand Down