diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index e642a081440..c61a99ffe77 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -1,6 +1,9 @@ package commands -import "github.com/spf13/cobra" +import ( + "github.com/ledgerwatch/turbo-geth/node" + "github.com/spf13/cobra" +) var ( chaindata string @@ -56,5 +59,5 @@ func withBucket(cmd *cobra.Command) { } func withDatadir(cmd *cobra.Command) { - cmd.Flags().StringVar(&datadir, "datadir", "", "data directory for temporary ELT files") + cmd.Flags().StringVar(&datadir, "datadir", node.DefaultDataDir(), "data directory for temporary ELT files") } diff --git a/common/etl/collector.go b/common/etl/collector.go index 5446e66e139..20242fca11d 100644 --- a/common/etl/collector.go +++ b/common/etl/collector.go @@ -114,27 +114,31 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi } var canUseAppend bool - putTimer := time.Now() + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + i := 0 loadNextFunc := func(originalK, k, v []byte) error { if i == 0 { isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1 canUseAppend = haveSortingGuaranties && isEndOfBucket } - i++ - i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) { + + select { + default: + case <-logEvery.C: + logArs := []interface{}{"into", bucket} + if args.LogDetailsLoad != nil { + logArs = append(logArs, args.LogDetailsLoad(k, v)...) + } else { + logArs = append(logArs, "current key", makeCurrentKeyStr(k)) + } + runtime.ReadMemStats(&m) - log.Info( - "ETL [2/2] Loading", - "into", bucket, - "size", common.StorageSize(batch.BatchSize()), - "keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000), - "progress", progress+50, // loading is the second stage, from 50..100 - "use append", canUseAppend, - "current key", makeCurrentKeyStr(originalK), - "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) - }) + logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) + log.Info("ETL [2/2] Loading", logArs...) + } if canUseAppend && len(v) == 0 { return nil // nothing to delete after end of bucket @@ -191,7 +195,7 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi "Committed batch", "bucket", bucket, "commit", commitTook, - "size", common.StorageSize(batch.BatchSize()), + "records", i, "current key", makeCurrentKeyStr(nil), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) diff --git a/common/etl/dataprovider.go b/common/etl/dataprovider.go index 908ecaf8356..21a401d6e3b 100644 --- a/common/etl/dataprovider.go +++ b/common/etl/dataprovider.go @@ -46,7 +46,6 @@ func FlushToDisk(encoder Encoder, currentKey []byte, b Buffer, datadir string) ( runtime.ReadMemStats(&m) log.Info( "Flushed buffer file", - "current key", makeCurrentKeyStr(currentKey), "name", bufferFile.Name(), "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) }() diff --git a/common/etl/etl.go b/common/etl/etl.go index 7d40513e057..e8a12ad0e70 100644 --- a/common/etl/etl.go +++ b/common/etl/etl.go @@ -57,6 +57,7 @@ func NextKey(key []byte) ([]byte, error) { // * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) // * `isDone`: true, if everything is processed type LoadCommitHandler func(db ethdb.Putter, key []byte, isDone bool) error +type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{}) type TransformArgs struct { ExtractStartKey []byte @@ -67,6 +68,9 @@ type TransformArgs struct { Quit <-chan struct{} OnLoadCommit LoadCommitHandler loadBatchSize int // used in testing + + LogDetailsExtract AdditionalLogArguments + LogDetailsLoad AdditionalLogArguments } func Transform( @@ -86,7 +90,7 @@ func Transform( collector := NewCollector(datadir, buffer) t := time.Now() - if err := extractBucketIntoFiles(db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit); err != nil { + if err := extractBucketIntoFiles(db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit, args.LogDetailsExtract); err != nil { disposeProviders(collector.dataProviders) return err } @@ -105,28 +109,31 @@ func extractBucketIntoFiles( collector *Collector, extractFunc ExtractFunc, quit <-chan struct{}, + additionalLogArguments AdditionalLogArguments, ) error { - - i := 0 - putTimer := time.Now() + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() var m runtime.MemStats if err := db.Walk(bucket, startkey, fixedBits, func(k, v []byte) (bool, error) { if err := common.Stopped(quit); err != nil { return false, err } - i++ - i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) { + + select { + default: + case <-logEvery.C: + logArs := []interface{}{"from", bucket} + if additionalLogArguments != nil { + logArs = append(logArs, additionalLogArguments(k, v)...) + } else { + logArs = append(logArs, "current key", makeCurrentKeyStr(k)) + } + runtime.ReadMemStats(&m) - log.Info( - "ETL [1/2] Extracting", - "from", bucket, - "keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000), - "progress", progress, // extracting is the first stage, from 0..50 - "current key", makeCurrentKeyStr(k), - "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC), - ) - }) + logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) + log.Info("ETL [1/2] Extracting", logArs...) + } if endkey != nil && bytes.Compare(k, endkey) > 0 { return false, nil } diff --git a/common/etl/etl_test.go b/common/etl/etl_test.go index b53947dca7f..fcf3f89c314 100644 --- a/common/etl/etl_test.go +++ b/common/etl/etl_test.go @@ -87,7 +87,7 @@ func TestFileDataProviders(t *testing.T) { collector := NewCollector("", NewSortableBuffer(1)) - err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil) + err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil) assert.NoError(t, err) assert.Equal(t, 10, len(collector.dataProviders)) @@ -116,7 +116,7 @@ func TestRAMDataProviders(t *testing.T) { generateTestData(t, db, sourceBucket, 10) collector := NewCollector("", NewSortableBuffer(BufferOptimalSize)) - err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil) + err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil) assert.NoError(t, err) assert.Equal(t, 1, len(collector.dataProviders)) diff --git a/common/etl/progress.go b/common/etl/progress.go index 85df153c5cb..f12ec7f9e83 100644 --- a/common/etl/progress.go +++ b/common/etl/progress.go @@ -1,18 +1,8 @@ package etl -import "time" - -func progressFromKey(k []byte) int { +func ProgressFromKey(k []byte) int { if len(k) < 1 { return 0 } return int(float64(k[0]>>4) * 3.3) } - -func printProgressIfNeeded(i int, t time.Time, k []byte, printFunc func(int)) (int, time.Time) { - if i%1_000_000 == 0 && time.Since(t) > 30*time.Second { - printFunc(progressFromKey(k)) - return i + 1, time.Now() - } - return i + 1, t -} diff --git a/core/generate_index.go b/core/generate_index.go index 111db5ae4eb..a4d471efae8 100644 --- a/core/generate_index.go +++ b/core/generate_index.go @@ -53,6 +53,12 @@ func (ig *IndexGenerator) GenerateIndex(startBlock, endBlock uint64, changeSetBu BufferType: etl.SortableAppendBuffer, BufferSize: ig.ChangeSetBufSize, Quit: ig.quitCh, + LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k)} + }, + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100 + }, }, ) if err != nil { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 58e10dc5dff..c94dfb54b8a 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -22,7 +22,7 @@ import ( ) const ( - logInterval = 30 // seconds + logInterval = 30 * time.Second ) type HasChangeSetWriter interface { @@ -68,7 +68,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig engine := chainContext.Engine() stageProgress := s.BlockNumber - logTime, logBlock := time.Now(), stageProgress + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + logBlock := stageProgress for blockNum := stageProgress + 1; blockNum <= to; blockNum++ { if err := common.Stopped(quit); err != nil { @@ -123,7 +125,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig } } - logTime, logBlock = logProgress(logTime, logBlock, blockNum, batch) + select { + default: + case <-logEvery.C: + logBlock = logProgress(logBlock, blockNum, batch) + } } if err := s.Update(batch, stageProgress); err != nil { @@ -137,12 +143,8 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig return nil } -func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPendingMutations) (time.Time, uint64) { - if now%64 != 0 || time.Since(lastLogTime).Seconds() < logInterval { - return lastLogTime, prev // return old values because no logging happened - } - - speed := float64(now-prev) / float64(logInterval) +func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 { + speed := float64(now-prev) / float64(logInterval/time.Second) var m runtime.MemStats runtime.ReadMemStats(&m) log.Info("Executed blocks:", @@ -153,7 +155,7 @@ func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPend "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC)) - return time.Now(), now + return now } func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, writeReceipts bool) error { @@ -210,6 +212,9 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, } } + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + for i := s.BlockNumber; i > u.UnwindPoint; i-- { if err = deleteChangeSets(batch, i, accountChangeSetBucket, storageChangeSetBucket); err != nil { return err @@ -218,6 +223,19 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, blockHash := rawdb.ReadCanonicalHash(batch, i) rawdb.DeleteReceipts(batch, blockHash, i) } + + select { + default: + case <-logEvery.C: + var m runtime.MemStats + runtime.ReadMemStats(&m) + log.Info("Executed blocks:", + "currentBlock", i, + "batch", common.StorageSize(batch.BatchSize()), + "alloc", common.StorageSize(m.Alloc), + "sys", common.StorageSize(m.Sys), + "numGC", int(m.NumGC)) + } } if err = u.Done(batch); err != nil { diff --git a/eth/stagedsync/stage_hashstate.go b/eth/stagedsync/stage_hashstate.go index 964b01c99fc..fc40e7a964e 100644 --- a/eth/stagedsync/stage_hashstate.go +++ b/eth/stagedsync/stage_hashstate.go @@ -393,6 +393,12 @@ func (p *Promoter) Unwind(s *StageState, u *UnwindState, storage bool, codes boo BufferType: etl.SortableOldestAppearedBuffer, ExtractStartKey: startkey, Quit: p.quitCh, + LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k)} + }, + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100 + }, }, ) } diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index b48edb54a5f..72945759022 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -234,7 +234,19 @@ func incrementIntermediateHashes(s *StageState, db ethdb.Database, to uint64, da "gen IH", generationIHTook, ) - if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil { + if err := collector.Load(db, + dbutils.IntermediateTrieHashBucket, + etl.IdentityLoadFunc, + etl.TransformArgs{ + Quit: quit, + LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k)} + }, + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100 + }, + }, + ); err != nil { return err } return nil @@ -301,7 +313,19 @@ func unwindIntermediateHashesStageImpl(u *UnwindState, s *StageState, db ethdb.D "root hash", subTries.Hashes[0].Hex(), "gen IH", generationIHTook, ) - if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil { + if err := collector.Load(db, + dbutils.IntermediateTrieHashBucket, + etl.IdentityLoadFunc, + etl.TransformArgs{ + Quit: quit, + LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k)} + }, + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100 + }, + }, + ); err != nil { return err } return nil diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index bc0970303b9..7b771c218e7 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -167,6 +167,8 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database }() collector := etl.NewCollector(datadir, etl.NewSortableBuffer(etl.BufferOptimalSize)) + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() for j := range out { if j.err != nil { return j.err @@ -175,6 +177,11 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database return err } k := make([]byte, 4) + select { + default: + case <-logEvery.C: + log.Info("Senders recovery", "block", j.index) + } binary.BigEndian.PutUint32(k, uint32(j.index)) if err := collector.Collect(k, j.senders); err != nil { return err @@ -184,7 +191,19 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database index := int(binary.BigEndian.Uint32(k)) return next(k, dbutils.BlockBodyKey(s.BlockNumber+uint64(index)+1, canonical[index]), value) } - if err := collector.Load(db, dbutils.Senders, loadFunc, etl.TransformArgs{Quit: quitCh}); err != nil { + if err := collector.Load(db, + dbutils.Senders, + loadFunc, + etl.TransformArgs{ + Quit: quitCh, + LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"block", binary.BigEndian.Uint64(k)} + }, + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"block", binary.BigEndian.Uint64(k)} + }, + }, + ); err != nil { return err } return s.DoneAndUpdate(db, to) diff --git a/eth/stagedsync/stage_txlookup.go b/eth/stagedsync/stage_txlookup.go index 5c11796b85f..7ac18fa5ce5 100644 --- a/eth/stagedsync/stage_txlookup.go +++ b/eth/stagedsync/stage_txlookup.go @@ -59,6 +59,9 @@ func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh <-chan Quit: quitCh, ExtractStartKey: startKey, ExtractEndKey: endKey, + LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"block", binary.BigEndian.Uint64(k)} + }, }) } diff --git a/ethdb/tx_db.go b/ethdb/tx_db.go index 781e81040b5..bad00d685dc 100644 --- a/ethdb/tx_db.go +++ b/ethdb/tx_db.go @@ -131,7 +131,9 @@ func (m *TxDb) MultiPut(tuples ...[]byte) (uint64, error) { } func MultiPut(tx Tx, tuples ...[]byte) error { - putTimer := time.Now() + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + count := 0 total := float64(len(tuples)) / 3 for bucketStart := 0; bucketStart < len(tuples); { @@ -174,10 +176,12 @@ func MultiPut(tx Tx, tuples ...[]byte) error { } count++ - if count%100_000 == 0 && time.Since(putTimer) > 30*time.Second { + + select { + default: + case <-logEvery.C: progress := fmt.Sprintf("%.1fM/%.1fM", float64(count)/1_000_000, total/1_000_000) log.Info("Write to db", "progress", progress) - putTimer = time.Now() } } diff --git a/trie/flatdb_sub_trie_loader.go b/trie/flatdb_sub_trie_loader.go index e8fc3aeca51..4f14912735c 100644 --- a/trie/flatdb_sub_trie_loader.go +++ b/trie/flatdb_sub_trie_loader.go @@ -611,21 +611,27 @@ func (fstl *FlatDbSubTrieLoader) LoadSubTries() (SubTries, error) { if err := fstl.iteration(c, ih, true /* first */); err != nil { return err } - var counter uint64 - t := time.Now() + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + for fstl.rangeIdx < len(fstl.dbPrefixes) { for !fstl.itemPresent { if err := fstl.iteration(c, ih, false /* first */); err != nil { return err } - counter++ - t = fstl.logProgress(t, counter) + } if fstl.itemPresent { if err := fstl.receiver.Receive(fstl.itemType, fstl.accountKey, fstl.storageKey, &fstl.accountValue, fstl.storageValue, fstl.hashValue, fstl.streamCutoff); err != nil { return err } fstl.itemPresent = false + + select { + default: + case <-logEvery.C: + fstl.logProgress() + } } } return nil @@ -635,18 +641,14 @@ func (fstl *FlatDbSubTrieLoader) LoadSubTries() (SubTries, error) { return fstl.receiver.Result(), nil } -func (fstl *FlatDbSubTrieLoader) logProgress(lastLogTime time.Time, counter uint64) time.Time { - if counter%100_000 == 0 && time.Since(lastLogTime) > 30*time.Second { - var k string - if fstl.accountKey != nil { - k = makeCurrentKeyStr(fstl.accountKey) - } else { - k = makeCurrentKeyStr(fstl.ihK) - } - log.Info("Calculating Merkle root", "current key", k) - return time.Now() +func (fstl *FlatDbSubTrieLoader) logProgress() { + var k string + if fstl.accountKey != nil { + k = makeCurrentKeyStr(fstl.accountKey) + } else { + k = makeCurrentKeyStr(fstl.ihK) } - return lastLogTime + log.Info("Calculating Merkle root", "current key", k) } func makeCurrentKeyStr(k []byte) string {