Skip to content

Commit

Permalink
ticker-based logs (#954)
Browse files Browse the repository at this point in the history
* timer-based logs

* timer-based logs

* delegate progress calculation to user

* delegate progress calculation to user

* delegate progress calculation to user

* clear

* add logs to senders recovery

* use default dir in integration

* more logs

* more logs
  • Loading branch information
AskAlexSharov authored Aug 22, 2020
1 parent 18df6cd commit a6be18b
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 76 deletions.
7 changes: 5 additions & 2 deletions cmd/integration/commands/flags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package commands

import "github.com/spf13/cobra"
import (
"github.com/ledgerwatch/turbo-geth/node"
"github.com/spf13/cobra"
)

var (
chaindata string
Expand Down Expand Up @@ -56,5 +59,5 @@ func withBucket(cmd *cobra.Command) {
}

func withDatadir(cmd *cobra.Command) {
cmd.Flags().StringVar(&datadir, "datadir", "", "data directory for temporary ELT files")
cmd.Flags().StringVar(&datadir, "datadir", node.DefaultDataDir(), "data directory for temporary ELT files")
}
32 changes: 18 additions & 14 deletions common/etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,31 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi
}
var canUseAppend bool

putTimer := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

i := 0
loadNextFunc := func(originalK, k, v []byte) error {
if i == 0 {
isEndOfBucket := lastKey == nil || bytes.Compare(lastKey, k) == -1
canUseAppend = haveSortingGuaranties && isEndOfBucket
}

i++
i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) {

select {
default:
case <-logEvery.C:
logArs := []interface{}{"into", bucket}
if args.LogDetailsLoad != nil {
logArs = append(logArs, args.LogDetailsLoad(k, v)...)
} else {
logArs = append(logArs, "current key", makeCurrentKeyStr(k))
}

runtime.ReadMemStats(&m)
log.Info(
"ETL [2/2] Loading",
"into", bucket,
"size", common.StorageSize(batch.BatchSize()),
"keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000),
"progress", progress+50, // loading is the second stage, from 50..100
"use append", canUseAppend,
"current key", makeCurrentKeyStr(originalK),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
})
logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
log.Info("ETL [2/2] Loading", logArs...)
}

if canUseAppend && len(v) == 0 {
return nil // nothing to delete after end of bucket
Expand Down Expand Up @@ -191,7 +195,7 @@ func loadFilesIntoBucket(db ethdb.Database, bucket string, providers []dataProvi
"Committed batch",
"bucket", bucket,
"commit", commitTook,
"size", common.StorageSize(batch.BatchSize()),
"records", i,
"current key", makeCurrentKeyStr(nil),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))

Expand Down
1 change: 0 additions & 1 deletion common/etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func FlushToDisk(encoder Encoder, currentKey []byte, b Buffer, datadir string) (
runtime.ReadMemStats(&m)
log.Info(
"Flushed buffer file",
"current key", makeCurrentKeyStr(currentKey),
"name", bufferFile.Name(),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
}()
Expand Down
37 changes: 22 additions & 15 deletions common/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NextKey(key []byte) ([]byte, error) {
// * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey)
// * `isDone`: true, if everything is processed
type LoadCommitHandler func(db ethdb.Putter, key []byte, isDone bool) error
type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{})

type TransformArgs struct {
ExtractStartKey []byte
Expand All @@ -67,6 +68,9 @@ type TransformArgs struct {
Quit <-chan struct{}
OnLoadCommit LoadCommitHandler
loadBatchSize int // used in testing

LogDetailsExtract AdditionalLogArguments
LogDetailsLoad AdditionalLogArguments
}

func Transform(
Expand All @@ -86,7 +90,7 @@ func Transform(
collector := NewCollector(datadir, buffer)

t := time.Now()
if err := extractBucketIntoFiles(db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit); err != nil {
if err := extractBucketIntoFiles(db, fromBucket, args.ExtractStartKey, args.ExtractEndKey, args.FixedBits, collector, extractFunc, args.Quit, args.LogDetailsExtract); err != nil {
disposeProviders(collector.dataProviders)
return err
}
Expand All @@ -105,28 +109,31 @@ func extractBucketIntoFiles(
collector *Collector,
extractFunc ExtractFunc,
quit <-chan struct{},
additionalLogArguments AdditionalLogArguments,
) error {

i := 0
putTimer := time.Now()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
var m runtime.MemStats

if err := db.Walk(bucket, startkey, fixedBits, func(k, v []byte) (bool, error) {
if err := common.Stopped(quit); err != nil {
return false, err
}
i++
i, putTimer = printProgressIfNeeded(i, putTimer, k, func(progress int) {

select {
default:
case <-logEvery.C:
logArs := []interface{}{"from", bucket}
if additionalLogArguments != nil {
logArs = append(logArs, additionalLogArguments(k, v)...)
} else {
logArs = append(logArs, "current key", makeCurrentKeyStr(k))
}

runtime.ReadMemStats(&m)
log.Info(
"ETL [1/2] Extracting",
"from", bucket,
"keys", fmt.Sprintf("%.1fM", float64(i)/1_000_000),
"progress", progress, // extracting is the first stage, from 0..50
"current key", makeCurrentKeyStr(k),
"alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC),
)
})
logArs = append(logArs, "alloc", common.StorageSize(m.Alloc), "sys", common.StorageSize(m.Sys), "numGC", int(m.NumGC))
log.Info("ETL [1/2] Extracting", logArs...)
}
if endkey != nil && bytes.Compare(k, endkey) > 0 {
return false, nil
}
Expand Down
4 changes: 2 additions & 2 deletions common/etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestFileDataProviders(t *testing.T) {

collector := NewCollector("", NewSortableBuffer(1))

err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil)
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil)
assert.NoError(t, err)

assert.Equal(t, 10, len(collector.dataProviders))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestRAMDataProviders(t *testing.T) {
generateTestData(t, db, sourceBucket, 10)

collector := NewCollector("", NewSortableBuffer(BufferOptimalSize))
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil)
err := extractBucketIntoFiles(db, sourceBucket, nil, nil, 0, collector, testExtractToMapFunc, nil, nil)
assert.NoError(t, err)

assert.Equal(t, 1, len(collector.dataProviders))
Expand Down
12 changes: 1 addition & 11 deletions common/etl/progress.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
package etl

import "time"

func progressFromKey(k []byte) int {
func ProgressFromKey(k []byte) int {
if len(k) < 1 {
return 0
}
return int(float64(k[0]>>4) * 3.3)
}

func printProgressIfNeeded(i int, t time.Time, k []byte, printFunc func(int)) (int, time.Time) {
if i%1_000_000 == 0 && time.Since(t) > 30*time.Second {
printFunc(progressFromKey(k))
return i + 1, time.Now()
}
return i + 1, t
}
6 changes: 6 additions & 0 deletions core/generate_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (ig *IndexGenerator) GenerateIndex(startBlock, endBlock uint64, changeSetBu
BufferType: etl.SortableAppendBuffer,
BufferSize: ig.ChangeSetBufSize,
Quit: ig.quitCh,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
)
if err != nil {
Expand Down
38 changes: 28 additions & 10 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

const (
logInterval = 30 // seconds
logInterval = 30 * time.Second
)

type HasChangeSetWriter interface {
Expand Down Expand Up @@ -68,7 +68,9 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
engine := chainContext.Engine()

stageProgress := s.BlockNumber
logTime, logBlock := time.Now(), stageProgress
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
logBlock := stageProgress

for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
if err := common.Stopped(quit); err != nil {
Expand Down Expand Up @@ -123,7 +125,11 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
}
}

logTime, logBlock = logProgress(logTime, logBlock, blockNum, batch)
select {
default:
case <-logEvery.C:
logBlock = logProgress(logBlock, blockNum, batch)
}
}

if err := s.Update(batch, stageProgress); err != nil {
Expand All @@ -137,12 +143,8 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
return nil
}

func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPendingMutations) (time.Time, uint64) {
if now%64 != 0 || time.Since(lastLogTime).Seconds() < logInterval {
return lastLogTime, prev // return old values because no logging happened
}

speed := float64(now-prev) / float64(logInterval)
func logProgress(prev, now uint64, batch ethdb.DbWithPendingMutations) uint64 {
speed := float64(now-prev) / float64(logInterval/time.Second)
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
Expand All @@ -153,7 +155,7 @@ func logProgress(lastLogTime time.Time, prev, now uint64, batch ethdb.DbWithPend
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))

return time.Now(), now
return now
}

func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database, writeReceipts bool) error {
Expand Down Expand Up @@ -210,6 +212,9 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
}
}

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

for i := s.BlockNumber; i > u.UnwindPoint; i-- {
if err = deleteChangeSets(batch, i, accountChangeSetBucket, storageChangeSetBucket); err != nil {
return err
Expand All @@ -218,6 +223,19 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, stateDB ethdb.Database,
blockHash := rawdb.ReadCanonicalHash(batch, i)
rawdb.DeleteReceipts(batch, blockHash, i)
}

select {
default:
case <-logEvery.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("Executed blocks:",
"currentBlock", i,
"batch", common.StorageSize(batch.BatchSize()),
"alloc", common.StorageSize(m.Alloc),
"sys", common.StorageSize(m.Sys),
"numGC", int(m.NumGC))
}
}

if err = u.Done(batch); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions eth/stagedsync/stage_hashstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ func (p *Promoter) Unwind(s *StageState, u *UnwindState, storage bool, codes boo
BufferType: etl.SortableOldestAppearedBuffer,
ExtractStartKey: startkey,
Quit: p.quitCh,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
)
}
Expand Down
28 changes: 26 additions & 2 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,19 @@ func incrementIntermediateHashes(s *StageState, db ethdb.Database, to uint64, da
"gen IH", generationIHTook,
)

if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collector.Load(db,
dbutils.IntermediateTrieHashBucket,
etl.IdentityLoadFunc,
etl.TransformArgs{
Quit: quit,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -301,7 +313,19 @@ func unwindIntermediateHashesStageImpl(u *UnwindState, s *StageState, db ethdb.D
"root hash", subTries.Hashes[0].Hex(),
"gen IH", generationIHTook,
)
if err := collector.Load(db, dbutils.IntermediateTrieHashBucket, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
if err := collector.Load(db,
dbutils.IntermediateTrieHashBucket,
etl.IdentityLoadFunc,
etl.TransformArgs{
Quit: quit,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"progress", etl.ProgressFromKey(k) + 50} // loading is the second stage, from 50..100
},
},
); err != nil {
return err
}
return nil
Expand Down
21 changes: 20 additions & 1 deletion eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
}()

collector := etl.NewCollector(datadir, etl.NewSortableBuffer(etl.BufferOptimalSize))
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
for j := range out {
if j.err != nil {
return j.err
Expand All @@ -175,6 +177,11 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
return err
}
k := make([]byte, 4)
select {
default:
case <-logEvery.C:
log.Info("Senders recovery", "block", j.index)
}
binary.BigEndian.PutUint32(k, uint32(j.index))
if err := collector.Collect(k, j.senders); err != nil {
return err
Expand All @@ -184,7 +191,19 @@ func SpawnRecoverSendersStage(cfg Stage3Config, s *StageState, db ethdb.Database
index := int(binary.BigEndian.Uint32(k))
return next(k, dbutils.BlockBodyKey(s.BlockNumber+uint64(index)+1, canonical[index]), value)
}
if err := collector.Load(db, dbutils.Senders, loadFunc, etl.TransformArgs{Quit: quitCh}); err != nil {
if err := collector.Load(db,
dbutils.Senders,
loadFunc,
etl.TransformArgs{
Quit: quitCh,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
},
); err != nil {
return err
}
return s.DoneAndUpdate(db, to)
Expand Down
3 changes: 3 additions & 0 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func TxLookupTransform(db ethdb.Database, startKey, endKey []byte, quitCh <-chan
Quit: quitCh,
ExtractStartKey: startKey,
ExtractEndKey: endKey,
LogDetailsExtract: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
})
}

Expand Down
Loading

0 comments on commit a6be18b

Please sign in to comment.