From c59206915aaa59c1c5aa2f3ee9b08e82e09506e9 Mon Sep 17 00:00:00 2001 From: Thomas Jay Rush Date: Thu, 14 Sep 2023 04:25:06 +0200 Subject: [PATCH] Updating scraper --- .../chifra/internal/scrape/handle_scrape.go | 182 ++++++++---- .../chifra/internal/scrape/save_timestamps.go | 77 +++++ .../chifra/internal/scrape/scrape_batch.go | 106 ++----- .../chifra/internal/scrape/scrape_blaze.go | 89 +++--- .../internal/scrape/scrape_consolidate.go | 267 +++++++++--------- .../chifra/internal/scrape/scrape_manager.go | 111 ++++---- .../internal/scrape/scrape_manager_utils.go | 57 ++++ 7 files changed, 523 insertions(+), 366 deletions(-) create mode 100644 src/apps/chifra/internal/scrape/save_timestamps.go create mode 100644 src/apps/chifra/internal/scrape/scrape_manager_utils.go diff --git a/src/apps/chifra/internal/scrape/handle_scrape.go b/src/apps/chifra/internal/scrape/handle_scrape.go index e8ead86996..197b271ac4 100644 --- a/src/apps/chifra/internal/scrape/handle_scrape.go +++ b/src/apps/chifra/internal/scrape/handle_scrape.go @@ -7,111 +7,171 @@ package scrapePkg import ( "fmt" "os" - "path/filepath" + "strings" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/colors" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/file" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils" ) -// HandleScrape enters a forever loop and continually scrapes either BlockCnt blocks -// or until the chain is caught up. It pauses for Sleep --sleep seconds between each scrape. +// HandleScrape enters a forever loop and continually scrapes --block_cnt blocks +// (or less if close to the head). The forever loop pauses each round for +// --sleep seconds (or, if not close to the head, for .25 seconds). func (opts *ScrapeOptions) HandleScrape() error { + var blocks = make([]base.Blknum, 0, opts.BlockCnt) var err error + var ok bool + chain := opts.Globals.Chain testMode := opts.Globals.TestMode - origBlockCnt := opts.BlockCnt - - blazeMan := BlazeManager{ - chain: chain, - timestamps: make([]tslib.TimestampRecord, 0, origBlockCnt), - processedMap: make(map[base.Blknum]bool, origBlockCnt), - nProcessed: 0, - opts: opts, - } - blazeMan.meta, err = opts.Conn.GetMetaData(testMode) - if err != nil { - return err - } // Clean the temporary files and makes sure block zero has been processed if ok, err := opts.Prepare(); !ok || err != nil { return err } - ripeBlock := base.Blknum(0) - unripePath := filepath.Join(config.GetPathToIndex(chain), "unripe") - - // The forever loop. Loop until the user hits Cntl+C or the server tells us to stop. + runCount := uint64(0) + // Loop until the user hits Cntl+C, until runCount runs out, or until + // the server tells us to stop. for { - if blazeMan.meta, err = opts.Conn.GetMetaData(testMode); err != nil { - logger.Error(fmt.Sprintf("Error fetching meta data: %s. Sleeping...", err)) - goto PAUSE + // We create a new manager for each loop...we will populate it in a minute... + bm := BlazeManager{ + chain: chain, } - if blazeMan.meta.NextIndexHeight() > blazeMan.meta.ChainHeight() { - // If the user is re-syncing the chain, the index may be ahead of the chain, - // so we go to sleep and try again later. - msg := fmt.Sprintf("The index (%d) is ahead of the chain (%d).", - blazeMan.meta.NextIndexHeight(), - blazeMan.meta.ChainHeight(), - ) - logger.Error(msg) + // Fetch the meta data which tells us how far along the index is. + if bm.meta, err = opts.Conn.GetMetaData(testMode); err != nil { + var ErrFetchingMeta = fmt.Errorf("error fetching meta data: %s", err) + logger.Error(colors.BrightRed+ErrFetchingMeta.Error(), colors.Off) goto PAUSE } - opts.StartBlock = blazeMan.meta.NextIndexHeight() - opts.BlockCnt = origBlockCnt - if (blazeMan.StartBlock() + blazeMan.BlockCount()) > blazeMan.meta.ChainHeight() { - opts.BlockCnt = (blazeMan.meta.Latest - blazeMan.StartBlock()) + // This only happens if the chain and the index scraper are both started at the + // same time (rarely). This protects against the case where the chain has no ripe blocks. + // Report no error and sleep for a while. + if bm.meta.ChainHeight() < opts.Settings.Unripe_dist { + goto PAUSE } - // The 'ripeBlock' is the head of the chain unless the chain is further along - // than 'UnripeDist.' If it is, the `ripeBlock` is 'UnripeDist' behind the - // head (i.e., 28 blocks usually - six minutes) - ripeBlock = blazeMan.meta.Latest - if ripeBlock > opts.Settings.Unripe_dist { - ripeBlock = blazeMan.meta.Latest - opts.Settings.Unripe_dist + // Another rare case, but here the user has reset his/her node but not removed + // the index. In this case, the index is ahead of the chain. We go to sleep and + // try again later in the hopes that the chain catches up. + if bm.meta.NextIndexHeight() > bm.meta.ChainHeight()+1 { + var ErrIndexAhead = fmt.Errorf( + "index (%d) is ahead of chain (%d)", + bm.meta.NextIndexHeight(), + bm.meta.ChainHeight(), + ) + logger.Error(colors.BrightRed+ErrIndexAhead.Error(), colors.Off) + goto PAUSE } - blazeMan = BlazeManager{ + // Let's start a new round... + bm = BlazeManager{ chain: chain, opts: opts, - nProcessed: 0, - ripeBlock: ripeBlock, - timestamps: make([]tslib.TimestampRecord, 0, origBlockCnt), - processedMap: make(map[base.Blknum]bool, origBlockCnt), - meta: blazeMan.meta, + nRipe: 0, + nUnripe: 0, + timestamps: make([]tslib.TimestampRecord, 0, opts.BlockCnt), + processedMap: make(map[base.Blknum]bool, opts.BlockCnt), + meta: bm.meta, + nChannels: int(opts.Settings.Channel_count), } - // Here we do the actual scrape for this round. If anything goes wrong, the - // function will have cleaned up (i.e. remove the unstaged ripe blocks). Note - // that we don't quit, instead we sleep and we retry continually. - if err := blazeMan.HandleScrapeBlaze(); err != nil { - logger.Error(colors.BrightRed, err, colors.Off) + // Order dependant, be careful! + // first block to scrape (one past end of previous round). + bm.startBlock = bm.meta.NextIndexHeight() + // user supplied, but not so many to pass the chain tip. + bm.blockCount = utils.Min(opts.BlockCnt, bm.meta.ChainHeight()-bm.StartBlock()+1) + // Unripe_dist behind the chain tip. + bm.ripeBlock = bm.meta.ChainHeight() - opts.Settings.Unripe_dist + + // These are the blocks we're going to process this round + blocks = make([]base.Blknum, 0, bm.BlockCount()) + for block := bm.StartBlock(); block < bm.EndBlock(); block++ { + blocks = append(blocks, block) + } + + if len(blocks) == 0 { + logger.Info("no blocks to scrape") goto PAUSE } - // Try to create chunks... - if ok, err := blazeMan.Consolidate(); !ok || err != nil { - logger.Error(err) + if opts.Globals.Verbose { + logger.Info("chain head: ", bm.meta.ChainHeight()) + logger.Info("opts.BlockCnt: ", opts.BlockCnt) + logger.Info("ripe block: ", bm.ripeBlock) + logger.Info("perChunk: ", bm.PerChunk()) + logger.Info("start block: ", bm.StartBlock()) + logger.Info("block count: ", bm.BlockCount()) + logger.Info("len(blocks): ", len(blocks)) + if len(blocks) > 0 { + logger.Info("blocks[0]: ", blocks[0]) + logger.Info("blocks[len(blocks)-1]:", blocks[len(blocks)-1]) + } + } + + // Scrape this round. Only quit on catostrophic errors. Report and sleep otherwise. + if err, ok = bm.ScrapeBatch(blocks); !ok || err != nil { + if err != nil { + logger.Error(colors.BrightRed+err.Error(), colors.Off) + } if !ok { break } goto PAUSE } + if bm.nRipe == 0 { + logger.Info(colors.Green+"no ripe files to consolidate", spaces, colors.Off) + goto PAUSE + + } else { + // Consilidate a chunk (if possible). Only quit on catostrophic errors. Report and sleep otherwise. + if err, ok = bm.Consolidate(blocks); !ok || err != nil { + if err != nil { + logger.Error(colors.BrightRed+err.Error(), colors.Off) + } + if !ok { + break + } + goto PAUSE + } + } + PAUSE: - // The chain frequently re-orgs. Before sleeping, we remove any unripe files so they - // are re-queried in the next round. This is the reason for the unripePath. - if err = os.RemoveAll(unripePath); err != nil { - return err + runCount++ + if opts.RunCount != 0 && runCount >= opts.RunCount { + // No reason to clean up here. Next round will do so and user can use these files in the meantime. + logger.Info("run count reached") + break } - blazeMan.Pause() + // sleep for a bit (there's no new blocks anyway if we're caught up). + opts.pause(bm.meta.ChainHeight() - bm.meta.StageHeight()) + + // defensive programming - just double checking our own understanding... + count := file.NFilesInFolder(bm.RipeFolder()) + if count != 0 { + _ = index.CleanEphemeralIndexFolders(chain) + err := fmt.Errorf("%d unexpected ripe files in %s", count, bm.RipeFolder()) + logger.Error(colors.BrightRed+err.Error(), colors.Off) + } + + // We want to clean up the unripe files. The chain may have (it frequently does) + // re-orged. We want to re-qeury these next round. This is why we have an unripePath. + if err = os.RemoveAll(bm.UnripeFolder()); err != nil { + logger.Error(colors.BrightRed, err, colors.Off) + return err + } } + // We've left the loop and we're done. return nil } + +var spaces = strings.Repeat(" ", 50) diff --git a/src/apps/chifra/internal/scrape/save_timestamps.go b/src/apps/chifra/internal/scrape/save_timestamps.go new file mode 100644 index 0000000000..f517e2a3fe --- /dev/null +++ b/src/apps/chifra/internal/scrape/save_timestamps.go @@ -0,0 +1,77 @@ +package scrapePkg + +// Copyright 2021 The TrueBlocks Authors. All rights reserved. +// Use of this source code is governed by a license that can +// be found in the LICENSE file. + +import ( + "encoding/binary" + "fmt" + "os" + "sort" + + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib" +) + +// TODO: Protect against overwriting files on disc + +func (bm *BlazeManager) WriteTimestamps(blocks []base.Blknum) error { + chain := bm.chain + + sort.Slice(bm.timestamps, func(i, j int) bool { + return bm.timestamps[i].Bn < bm.timestamps[j].Bn + }) + + // Assume that the existing timestamps file always contains valid timestamps in + // a valid order so we can only append as we go (which is very fast) + tsPath := config.GetPathToIndex(chain) + "ts.bin" + fp, err := os.OpenFile(tsPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return err + } + + defer func() { + tslib.ClearCache(chain) + fp.Close() + }() + + cnt := 0 + for _, block := range blocks { + // Append to the timestamps file all the new timestamps but as we do that make sure we're + // not skipping anything at the front, in the middle, or at the end of the list + ts := tslib.TimestampRecord{} + if cnt >= len(bm.timestamps) { + ts = tslib.TimestampRecord{ + Bn: uint32(block), + Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)), + } + } else { + ts = bm.timestamps[cnt] + if bm.timestamps[cnt].Bn != uint32(block) { + ts = tslib.TimestampRecord{ + Bn: uint32(block), + Ts: uint32(bm.opts.Conn.GetBlockTimestamp(block)), + } + cnt-- // set it back + } + } + + msg := fmt.Sprintf("Updating timestamps %-04d of %-04d (%-04d remaining)"+spaces, + block, + blocks[len(blocks)-1], + blocks[len(blocks)-1]-block, + ) + logger.Progress((block%13) == 0, msg) + + if err = binary.Write(fp, binary.LittleEndian, &ts); err != nil { + return err + } + + cnt++ + } + + return nil +} diff --git a/src/apps/chifra/internal/scrape/scrape_batch.go b/src/apps/chifra/internal/scrape/scrape_batch.go index 56bbf8558e..d9377a5c8b 100644 --- a/src/apps/chifra/internal/scrape/scrape_batch.go +++ b/src/apps/chifra/internal/scrape/scrape_batch.go @@ -5,102 +5,44 @@ package scrapePkg // be found in the LICENSE file. import ( - "encoding/binary" - "errors" "fmt" - "os" - "sort" - "strings" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib" ) -// HandleScrapeBlaze is called each time around the forever loop prior to calling into -// Blaze to actually scrape the blocks. -func (bm *BlazeManager) HandleScrapeBlaze() error { +// ScrapeBatch is called each time around the forever loop. It calls into +// HandleBlaze and writes the timestamps if there's no error. +func (bm *BlazeManager) ScrapeBatch(blocks []base.Blknum) (error, bool) { chain := bm.chain - // Do the actual scrape, wait until it finishes, clean up and return on failure - if _, err := bm.HandleBlaze(); err != nil { + if err, ok := bm.HandleBlaze(blocks); !ok || err != nil { _ = index.CleanEphemeralIndexFolders(chain) - return err + return err, ok } - start := bm.StartBlock() - end := bm.StartBlock() + bm.BlockCount() - - for bn := start; bn < end; bn++ { - if !bm.processedMap[bn] { - // At least one block was not processed. This would only happen in the event of an - // error, so clean up, report the error and return. The loop will repeat. + // Check to see if we missed any blocks... + for _, block := range blocks { + if !bm.processedMap[block] { + // We missed a block. We need to clean up and continue + // next time around the loop. This may happen if the + // node returns an error for example. _ = index.CleanEphemeralIndexFolders(chain) - msg := fmt.Sprintf("A block %d was not processed%s", bn, strings.Repeat(" ", 50)) - return errors.New(msg) + return fmt.Errorf("a block (%d) was not processed", block), true } } - _ = bm.WriteTimestamps(end) - - return nil -} - -// TODO: Protect against overwriting files on disc - -func (bm *BlazeManager) WriteTimestamps(endPoint uint64) error { - chain := bm.chain - conn := rpc.TempConnection(chain) - - sort.Slice(bm.timestamps, func(i, j int) bool { - return bm.timestamps[i].Bn < bm.timestamps[j].Bn - }) - - // Assume that the existing timestamps file always contains valid timestamps in a valid order so we can only append - tsPath := config.GetPathToIndex(chain) + "ts.bin" - fp, err := os.OpenFile(tsPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - return err + // defensive programming... + if len(blocks) != len(bm.processedMap) || len(blocks) != bm.nProcessed() { + _ = index.CleanEphemeralIndexFolders(chain) + return fmt.Errorf(`check failed len(blocks): %d len(map): %d nRipe: %d nUnripe: %d nProcessed: %d`, + len(blocks), + len(bm.processedMap), + bm.nRipe, + bm.nUnripe, + bm.nProcessed(), + ), true } - defer func() { - tslib.ClearCache(chain) - fp.Close() - // sigintTrap.Disable(trapCh) - // writeMutex.Unlock() - }() - - nTs, _ := tslib.NTimestamps(chain) - - cnt := 0 - for bn := nTs; bn < endPoint; bn++ { - // Append to the timestamps file all the new timestamps but as we do that make sure we're - // not skipping anything at the front, in the middle, or at the end of the list - ts := tslib.TimestampRecord{} - if cnt >= len(bm.timestamps) { - ts = tslib.TimestampRecord{ - Bn: uint32(bn), - Ts: uint32(conn.GetBlockTimestamp(bn)), - } - } else { - ts = bm.timestamps[cnt] - if bm.timestamps[cnt].Bn != uint32(bn) { - ts = tslib.TimestampRecord{ - Bn: uint32(bn), - Ts: uint32(conn.GetBlockTimestamp(bn)), - } - cnt-- // set it back - } - } - - logger.Progress((bn%13) == 0, fmt.Sprintf("Checking or updating timestamps %-04d of %-04d (%d remaining)%s", bn, endPoint, endPoint-bn, spaces)) - if err = binary.Write(fp, binary.LittleEndian, &ts); err != nil { - return err - } - - cnt++ - } - return nil + return bm.WriteTimestamps(blocks), true } diff --git a/src/apps/chifra/internal/scrape/scrape_blaze.go b/src/apps/chifra/internal/scrape/scrape_blaze.go index d147470f30..045c06422c 100644 --- a/src/apps/chifra/internal/scrape/scrape_blaze.go +++ b/src/apps/chifra/internal/scrape/scrape_blaze.go @@ -14,26 +14,13 @@ import ( "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/types" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils" ) // HandleBlaze does the actual scraping, walking through block_cnt blocks and querying traces and logs // and then extracting addresses and timestamps from those data structures. -func (bm *BlazeManager) HandleBlaze() (ok bool, err error) { - blocks := []base.Blknum{} - - start := bm.StartBlock() - end := bm.StartBlock() + bm.BlockCount() - - for block := start; block < end; block++ { - blocks = append(blocks, block) - } - - return bm.HandleBlaze1(blocks) -} - -func (bm *BlazeManager) HandleBlaze1(blocks []base.Blknum) (ok bool, err error) { - nChannels := int(bm.opts.Settings.Channel_count) +func (bm *BlazeManager) HandleBlaze(blocks []base.Blknum) (err error, ok bool) { // We need three pipelines...we shove into blocks, blocks shoves into appearances and timestamps blockChannel := make(chan base.Blknum) @@ -43,26 +30,26 @@ func (bm *BlazeManager) HandleBlaze1(blocks []base.Blknum) (ok bool, err error) // TODO: The go routines below may fail. Question -- how does one respond to an error inside a go routine? blockWg := sync.WaitGroup{} - blockWg.Add(nChannels) - for i := 0; i < nChannels; i++ { + blockWg.Add(bm.nChannels) + for i := 0; i < bm.nChannels; i++ { go func() { - _ = bm.BlazeProcessBlocks(blockChannel, &blockWg, appearanceChannel, tsChannel) + _ = bm.ProcessBlocks(blockChannel, &blockWg, appearanceChannel, tsChannel) }() } appWg := sync.WaitGroup{} - appWg.Add(nChannels) - for i := 0; i < nChannels; i++ { + appWg.Add(bm.nChannels) + for i := 0; i < bm.nChannels; i++ { go func() { - _ = bm.BlazeProcessAppearances(appearanceChannel, &appWg) + _ = bm.ProcessAppearances(appearanceChannel, &appWg) }() } tsWg := sync.WaitGroup{} - tsWg.Add(nChannels) - for i := 0; i < nChannels; i++ { + tsWg.Add(bm.nChannels) + for i := 0; i < bm.nChannels; i++ { go func() { - _ = bm.BlazeProcessTimestamps(tsChannel, &tsWg) + _ = bm.ProcessTimestamps(tsChannel, &tsWg) }() } @@ -81,12 +68,12 @@ func (bm *BlazeManager) HandleBlaze1(blocks []base.Blknum) (ok bool, err error) close(tsChannel) tsWg.Wait() - return true, nil + return nil, true } -// BlazeProcessBlocks processes the block channel and for each block query the node for both +// ProcessBlocks processes the block channel and for each block query the node for both // traces and logs. Send results down appearanceChannel. -func (bm *BlazeManager) BlazeProcessBlocks(blockChannel chan base.Blknum, blockWg *sync.WaitGroup, appearanceChannel chan scrapedData, tsChannel chan tslib.TimestampRecord) (err error) { +func (bm *BlazeManager) ProcessBlocks(blockChannel chan base.Blknum, blockWg *sync.WaitGroup, appearanceChannel chan scrapedData, tsChannel chan tslib.TimestampRecord) (err error) { defer blockWg.Done() for bn := range blockChannel { @@ -122,8 +109,8 @@ func (bm *BlazeManager) BlazeProcessBlocks(blockChannel chan base.Blknum, blockW var blazeMutex sync.Mutex -// BlazeProcessAppearances processes scrapedData objects shoved down the appearanceChannel -func (bm *BlazeManager) BlazeProcessAppearances(appearanceChannel chan scrapedData, appWg *sync.WaitGroup) (err error) { +// ProcessAppearances processes scrapedData objects shoved down the appearanceChannel +func (bm *BlazeManager) ProcessAppearances(appearanceChannel chan scrapedData, appWg *sync.WaitGroup) (err error) { defer appWg.Done() for sData := range appearanceChannel { @@ -139,7 +126,7 @@ func (bm *BlazeManager) BlazeProcessAppearances(appearanceChannel chan scrapedDa return err } - err = bm.WriteAppearancesBlaze(sData.bn, addrMap) + err = bm.WriteAppearances(sData.bn, addrMap) if err != nil { return err } @@ -148,8 +135,8 @@ func (bm *BlazeManager) BlazeProcessAppearances(appearanceChannel chan scrapedDa return } -// BlazeProcessTimestamps processes timestamp data (currently by printing to a temporary file) -func (bm *BlazeManager) BlazeProcessTimestamps(tsChannel chan tslib.TimestampRecord, tsWg *sync.WaitGroup) (err error) { +// ProcessTimestamps processes timestamp data (currently by printing to a temporary file) +func (bm *BlazeManager) ProcessTimestamps(tsChannel chan tslib.TimestampRecord, tsWg *sync.WaitGroup) (err error) { defer tsWg.Done() for ts := range tsChannel { @@ -163,7 +150,19 @@ func (bm *BlazeManager) BlazeProcessTimestamps(tsChannel chan tslib.TimestampRec var writeMutex sync.Mutex -func (bm *BlazeManager) WriteAppearancesBlaze(bn base.Blknum, addrMap index.AddressBooleanMap) (err error) { +// TODO: The original intent of creating files was so that we could start over where we left off +// if we failed. But this isn't how it works. We cleanup any temp files if we fail, which means +// we write these files and if we fail, we remove them. If we don't fail, we've written them out, +// but only to re-read them and delete them in this round. Would could have easily just kept them +// in an in-memory cache. This would also allow us to not have to stringify the data and just store +// pointers to structs in memory. We wouldn't have to keep a seperate timestamps database nor a +// processedMap (the pointer would serve that purpose). + +// WriteAppearances writes the appearance for a chunk to a file +func (bm *BlazeManager) WriteAppearances(bn base.Blknum, addrMap index.AddressBooleanMap) (err error) { + ripePath := config.GetPathToIndex(bm.chain) + "ripe/" + unripePath := config.GetPathToIndex(bm.chain) + "unripe/" + if len(addrMap) > 0 { appearanceArray := make([]string, 0, len(addrMap)) for record := range addrMap { @@ -172,15 +171,15 @@ func (bm *BlazeManager) WriteAppearancesBlaze(bn base.Blknum, addrMap index.Addr sort.Strings(appearanceArray) blockNumStr := utils.PadNum(int(bn), 9) - fileName := config.GetPathToIndex(bm.chain) + "ripe/" + blockNumStr + ".txt" + fileName := ripePath + blockNumStr + ".txt" if bn > bm.ripeBlock { - fileName = config.GetPathToIndex(bm.chain) + "unripe/" + blockNumStr + ".txt" + fileName = unripePath + blockNumStr + ".txt" } toWrite := []byte(strings.Join(appearanceArray[:], "\n") + "\n") err = os.WriteFile(fileName, toWrite, 0744) // Uses os.O_WRONLY|os.O_CREATE|os.O_TRUNC if err != nil { - fmt.Println("call1 to WriteFile returned error", err) + fmt.Println("WriteFile returned error", err) return err } } @@ -189,7 +188,11 @@ func (bm *BlazeManager) WriteAppearancesBlaze(bn base.Blknum, addrMap index.Addr writeMutex.Lock() bm.processedMap[bn] = true writeMutex.Unlock() - bm.nProcessed++ + if bn > bm.ripeBlock { + bm.nUnripe++ + } else { + bm.nRipe++ + } return } @@ -207,13 +210,13 @@ func (bm *BlazeManager) syncedReporting(bn base.Blknum, force bool) { defer atomic.StoreUint32(&locker, 0) // Only report once in a while (17 blocks) - if bm.nProcessed%17 == 0 || force { + if bm.nProcessed()%17 == 0 || force { dist := uint64(0) if bm.ripeBlock > bn { dist = (bm.ripeBlock - bn) } msg := fmt.Sprintf("Scraping %-04d of %-04d at block %d of %d (%d blocks from head)", - bm.nProcessed, + bm.nProcessed(), bm.BlockCount(), bn, bm.ripeBlock, @@ -221,3 +224,11 @@ func (bm *BlazeManager) syncedReporting(bn base.Blknum, force bool) { logger.Progress(true, msg) } } + +// scrapedData combines the extracted block data, trace data, and log data into a +// structure that is passed through to the AddressChannel for further processing. +type scrapedData struct { + bn base.Blknum + traces []types.SimpleTrace + receipts []types.SimpleReceipt +} diff --git a/src/apps/chifra/internal/scrape/scrape_consolidate.go b/src/apps/chifra/internal/scrape/scrape_consolidate.go index 5adba36f45..4df5db0f92 100644 --- a/src/apps/chifra/internal/scrape/scrape_consolidate.go +++ b/src/apps/chifra/internal/scrape/scrape_consolidate.go @@ -1,194 +1,189 @@ package scrapePkg import ( + "context" "errors" "fmt" "os" "path/filepath" - "strconv" + "sort" "strings" + "time" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/file" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/index" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/sigintTrap" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils" ) const asciiAppearanceSize = 59 // Consolidate calls into the block scraper to (a) call Blaze and (b) consolidate if applicable -func (bm *BlazeManager) Consolidate() (bool, error) { +func (bm *BlazeManager) Consolidate(blocks []base.Blknum) (error, bool) { + var err error chain := bm.chain - bm.syncedReporting(bm.StartBlock()+bm.BlockCount(), true /* force */) + indexPath := config.GetPathToIndex(chain) - // Get a sorted list of files in the ripe folder - ripeFolder := filepath.Join(config.GetPathToIndex(chain), "ripe") - ripeFileList, err := os.ReadDir(ripeFolder) - if err != nil { - return true, err - } - - stageFolder := filepath.Join(config.GetPathToIndex(chain), "staging") - if len(ripeFileList) == 0 { - // On active chains, this most likely never happens, but on some less used or private chains, this is a frequent occurrence. - // return a message, but don't do anything about it. - msg := fmt.Sprintf("No new blocks at block %d (%d away from head)%s", bm.meta.Latest, (bm.meta.Latest - bm.meta.Ripe), spaces) - logger.Info(msg) - - // we need to move the file to the end of the scraped range so we show progress - stageFn, _ := file.LatestFileInFolder(stageFolder) // it may not exist... - stageRange := base.RangeFromFilename(stageFn) - end := bm.StartBlock() + bm.BlockCount() - newRangeLast := utils.Min(bm.ripeBlock, end-1) - if stageRange.Last < newRangeLast { - newRange := base.FileRange{First: stageRange.First, Last: newRangeLast} - newFilename := filepath.Join(stageFolder, newRange.String()+".txt") - _ = os.Rename(stageFn, newFilename) - _ = os.Remove(stageFn) // seems redundant, but may not be on some operating systems - } - return true, nil - } - - // Check to see if we got as many ripe files as we were expecting. In the case when AllowMissing is true, we - // can't really know, but if AllowMissing is false, then the number of files should be the same as the range width - ripeCnt := len(ripeFileList) - unripeDist := bm.opts.Settings.Unripe_dist - if uint64(ripeCnt) < (bm.BlockCount() - unripeDist) { - // Then, if they are not at least sequential, clean up and try again... - allowMissing := false // scrapeCfg.AllowMissing(chain) - if err := isListSequential(chain, ripeFileList, allowMissing); err != nil { - _ = index.CleanEphemeralIndexFolders(chain) - return true, err + backupFn := "" + stageFn, _ := file.LatestFileInFolder(bm.StageFolder()) // it may not exist... + if file.FileExists(stageFn) { + backupFn, err = file.MakeBackup(filepath.Join(config.GetPathToCache(chain)+"tmp"), stageFn) + if err != nil { + return errors.New("Could not create backup file: " + err.Error()), true } + defer func() { + // If the backup file exists, the function did not complete. In that case, we replace the original file. + if backupFn != "" && file.FileExists(backupFn) { + _ = os.Rename(backupFn, stageFn) + _ = os.Remove(backupFn) // seems redundant, but may not be on some operating systems + } + }() } - stageFn, _ := file.LatestFileInFolder(stageFolder) // it may not exist... - nAppsThen := int(file.FileSize(stageFn) / asciiAppearanceSize) + // After this point if we fail the backup file will replace the original file, so + // we can safely remove these the stage file (and ripe files) and it will get replaced - // ripeRange := rangeFromFileList(ripeFileList) - stageRange := base.RangeFromFilename(stageFn) - - curRange := base.FileRange{First: bm.StartBlock(), Last: bm.StartBlock() + bm.BlockCount() - 1} - if file.FileExists(stageFn) { - curRange = stageRange + // Make sure the user can't interrupt us... + ctx, cancel := context.WithCancel(context.Background()) + cleanOnQuit := func() { + logger.Warn(sigintTrap.TrapMessage) } - - // Note, this file may be empty or non-existant - tmpPath := filepath.Join(config.GetPathToCache(chain) + "tmp") - backupFn, err := file.MakeBackup(tmpPath, stageFn) - if err != nil { - return true, errors.New("Could not create backup file: " + err.Error()) + trapChannel := sigintTrap.Enable(ctx, cancel, cleanOnQuit) + defer sigintTrap.Disable(trapChannel) + + // Some counters... + nAppsFound := 0 + nAddrsFound := 0 + + // Load the stage into the map... + exists := file.FileExists(stageFn) // order matters + appMap, chunkRange, nAppearances := bm.AsciiFileToAppearanceMap(stageFn) + if !exists { + // Brand new stage. + chunkRange = base.FileRange{First: bm.meta.Finalized + 1, Last: blocks[0]} } - // logger.Info("Created backup file for stage") - defer func() { - if backupFn != "" && file.FileExists(backupFn) { - // If the backup file exists, something failed, so we replace the original file. - // logger.Info("Replacing backed up staging file") - _ = os.Rename(backupFn, stageFn) - _ = os.Remove(backupFn) // seems redundant, but may not be on some operating systems + // For each block... + nChunks := 0 + for _, block := range blocks { + fn := fmt.Sprintf("%09d.txt", block) + if file.FileExists(filepath.Join(bm.UnripeFolder(), fn)) { + continue // skip unripe files } - }() - - appearances := file.AsciiFileToLines(stageFn) - os.Remove(stageFn) // we have a backup copy, so it's not so bad to delete it here - for _, ripeFile := range ripeFileList { - ripePath := filepath.Join(ripeFolder, ripeFile.Name()) - appearances = append(appearances, file.AsciiFileToLines(ripePath)...) - os.Remove(ripePath) // if we fail halfway through, this will get noticed next time around and cleaned up - curCount := uint64(len(appearances)) - ripeRange := base.RangeFromFilename(ripePath) - curRange.Last = ripeRange.Last + ripeFn := filepath.Join(bm.RipeFolder(), fn) + if !file.FileExists(ripeFn) { + msg := fmt.Sprintf("ripe file not found for block %d", block) + spaces + if !bm.AllowMissing() { + _ = index.CleanEphemeralIndexFolders(chain) + return errors.New(msg), true + } else { + logger.Warn(msg) + } + } - isSnap := (curRange.Last >= bm.opts.Settings.First_snap && (curRange.Last%bm.opts.Settings.Snap_to_grid) == 0) - isOvertop := (curCount >= uint64(bm.opts.Settings.Apps_per_chunk)) + // Read in the ripe file, add it to the appMap and... + thisMap, _, thisCount := bm.AsciiFileToAppearanceMap(ripeFn) + nAppearances += thisCount + nAppsFound += thisCount + nAddrsFound += len(thisMap) + for addr, apps := range thisMap { + appMap[addr] = append(appMap[addr], apps...) + } + chunkRange.Last = block + // ...decide if we need to consolidate... + isSnap := bm.IsSnap(chunkRange.Last) // Have we hit a snap point? + isOvertop := nAppearances >= int(bm.PerChunk()) // Does this block overtop a chunk? if isSnap || isOvertop { - // we're consolidating... - appMap := make(index.AddressAppearanceMap, len(appearances)) - for _, line := range appearances { - parts := strings.Split(line, "\t") - if len(parts) == 3 { - addr := strings.ToLower(parts[0]) - bn, _ := strconv.ParseUint(parts[1], 10, 32) - txid, _ := strconv.ParseUint(parts[2], 10, 32) - appMap[addr] = append(appMap[addr], index.AppearanceRecord{ - BlockNumber: uint32(bn), - TransactionId: uint32(txid), - }) - } - } - - indexPath := config.GetPathToIndex(chain) + "finalized/" + curRange.String() + ".bin" - if report, err := index.WriteChunk(chain, bm.opts.PublisherAddr, indexPath, appMap, len(appearances), bm.opts.Pin, bm.opts.Remote); err != nil { - return false, err + // Make a chunk - i.e., consolidate + chunkPath := indexPath + "finalized/" + chunkRange.String() + ".bin" + publisher := base.ZeroAddr + if report, err := index.WriteChunk(chain, publisher, chunkPath, appMap, nAppearances, bm.opts.Pin, bm.opts.Remote); err != nil { + return err, false } else if report == nil { logger.Fatal("Should not happen, write chunk returned empty report") } else { report.Snapped = isSnap report.Report() } + if bm.opts.Remote { + time.Sleep(250 * time.Millisecond) + } - curRange.First = curRange.Last + 1 - appearances = []string{} + // reset for next chunk + bm.meta, _ = bm.opts.Conn.GetMetaData(bm.IsTestMode()) + appMap = make(index.AddressAppearanceMap, 0) + chunkRange.First = chunkRange.Last + 1 + chunkRange.Last = chunkRange.Last + 1 + nAppearances = 0 + nChunks++ } } - if len(appearances) > 0 { - lineLast := appearances[len(appearances)-1] - parts := strings.Split(lineLast, "\t") - Last := uint64(0) - if len(parts) > 1 { - Last, _ = strconv.ParseUint(parts[1], 10, 32) - Last = utils.Max(utils.Min(bm.ripeBlock, bm.StartBlock()+bm.BlockCount()-1), Last) - } else { - return true, errors.New("Cannot find last block number at lineLast in consolidate: " + lineLast) + if len(appMap) > 0 { // are there any appearances in this block range? + newRange := base.FileRange{First: bm.meta.Finalized + 1, Last: 0} + + // We need an array because we're going to write it back to disc + appearances := make([]string, 0, nAppearances) + for addr, apps := range appMap { + for _, app := range apps { + record := fmt.Sprintf("%s\t%09d\t%05d", addr, app.BlockNumber, app.TransactionId) + appearances = append(appearances, record) + newRange.Last = utils.Max(newRange.Last, uint64(app.BlockNumber)) + } } - conn := rpc.TempConnection(chain) - m, _ := conn.GetMetaData(bm.opts.Globals.TestMode) - rng := base.FileRange{First: m.Finalized + 1, Last: Last} - f := fmt.Sprintf("%s.txt", rng) - fileName := filepath.Join(config.GetPathToIndex(chain), "staging", f) - err = file.LinesToAsciiFile(fileName, appearances) - if err != nil { - os.Remove(fileName) // cleans up by replacing the previous stage - return true, err + // The stage needs to be sorted because the end user queries it and we want the search to be fast + sort.Strings(appearances) + + stageFn = filepath.Join(bm.StageFolder(), fmt.Sprintf("%s.txt", newRange)) + if err := file.LinesToAsciiFile(stageFn, appearances); err != nil { + os.Remove(stageFn) + return err, true } - // logger.Info(colors.Red, "fileName:", fileName, colors.Off) - // logger.Info(colors.Red, "curRange:", curRange, colors.Off) } - stageFn, _ = file.LatestFileInFolder(stageFolder) // it may not exist... + // Let the user know what happened... nAppsNow := int(file.FileSize(stageFn) / asciiAppearanceSize) - bm.Report(nAppsThen, nAppsNow) + bm.report(len(blocks), int(bm.PerChunk()), nChunks, nAppsNow, nAppsFound, nAddrsFound) - // logger.Info("Removing backup file as it's not needed.") - os.Remove(backupFn) // commits the change + // Commit the change by deleting the backup file. + os.Remove(backupFn) - return true, err + return nil, true } -func isListSequential(chain string, ripeFileList []os.DirEntry, allowMissing bool) error { - prev := base.NotARange - for _, file := range ripeFileList { - fileRange := base.RangeFromFilename(file.Name()) - if prev != base.NotARange && prev != fileRange { - if !prev.Preceeds(fileRange, !allowMissing) { - msg := fmt.Sprintf("Ripe files are not sequential (%s ==> %s)", prev, fileRange) - return errors.New(msg) - } +// AsciiFileToAppearanceMap reads the appearances from the stage file and returns them as a map +func (bm *BlazeManager) AsciiFileToAppearanceMap(fn string) (index.AddressAppearanceMap, base.FileRange, int) { + appearances := file.AsciiFileToLines(fn) + os.Remove(fn) // It's okay to remove this. If it fails, we'll just start over. + + appMap := make(index.AddressAppearanceMap, len(appearances)) + fileRange := base.FileRange{First: utils.NOPOS, Last: 0} + + if len(appearances) == 0 { + return appMap, base.FileRange{First: 0, Last: 0}, 0 + } + + for _, line := range appearances { + line := line + parts := strings.Split(line, "\t") + if len(parts) == 3 { // shouldn't be needed, but just in case... + addr := strings.ToLower(parts[0]) + bn := utils.MustParseUint(strings.TrimLeft(parts[1], "0")) + txid := utils.MustParseUint(strings.TrimLeft(parts[2], "0")) + fileRange.First = utils.Min(fileRange.First, bn) + fileRange.Last = utils.Max(fileRange.Last, bn) + appMap[addr] = append(appMap[addr], index.AppearanceRecord{ + BlockNumber: uint32(bn), + TransactionId: uint32(txid), + }) } - prev = fileRange } - return nil + return appMap, fileRange, len(appearances) } - -var spaces = strings.Repeat(" ", 40) - -// TODO: chifra scrape misreports appearances per block when consolidating #2291 (closed, but copied here as a TODO) diff --git a/src/apps/chifra/internal/scrape/scrape_manager.go b/src/apps/chifra/internal/scrape/scrape_manager.go index f7dc1f4064..46c06d9b21 100644 --- a/src/apps/chifra/internal/scrape/scrape_manager.go +++ b/src/apps/chifra/internal/scrape/scrape_manager.go @@ -1,17 +1,12 @@ package scrapePkg import ( - "fmt" - "strings" - "time" + "path/filepath" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/base" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/colors" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/config" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/rpc" "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/tslib" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/types" - "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils" ) // BlazeManager manages the scraper by keeping track of the progress of the scrape and @@ -19,65 +14,85 @@ import ( // if every block was visited or not. type BlazeManager struct { chain string - ripeBlock base.Blknum timestamps []tslib.TimestampRecord processedMap map[base.Blknum]bool - nProcessed uint64 opts *ScrapeOptions meta *rpc.MetaData + startBlock base.Blknum + blockCount base.Blknum + ripeBlock base.Blknum + nRipe int + nUnripe int + nChannels int } // StartBlock returns the start block for the current pass of the scraper. func (bm *BlazeManager) StartBlock() base.Blknum { - return bm.opts.StartBlock + return bm.startBlock } // BlockCount returns the number of blocks to process for this pass of the scraper. func (bm *BlazeManager) BlockCount() base.Blknum { - return bm.opts.BlockCnt + return bm.blockCount } -// Report prints out a report of the progress of the scraper. -func (bm *BlazeManager) Report(nAppsThen, nAppsNow int) { - settings := bm.opts.Settings +// EndBlock returns the last block to process for this pass of the scraper. +func (bm *BlazeManager) EndBlock() base.Blknum { + return bm.startBlock + bm.blockCount +} - msg := "Block={%d} have {%d} appearances of {%d} ({%0.1f%%}). Need {%d} more. Added {%d} records ({%0.2f} apps/blk)." - need := settings.Apps_per_chunk - utils.Min(settings.Apps_per_chunk, uint64(nAppsNow)) - seen := nAppsNow - if nAppsThen < nAppsNow { - seen = nAppsNow - nAppsThen - } - pct := float64(nAppsNow) / float64(settings.Apps_per_chunk) - pBlk := float64(seen) / float64(bm.BlockCount()) - height := bm.StartBlock() + bm.BlockCount() - 1 - msg = strings.Replace(msg, "{", colors.Green, -1) - msg = strings.Replace(msg, "}", colors.Off, -1) - logger.Info(fmt.Sprintf(msg, height, nAppsNow, settings.Apps_per_chunk, pct*100, need, seen, pBlk)) +// nProcessed returns the number of blocks processed so far (i.e., ripe + unripe). +func (bm *BlazeManager) nProcessed() int { + return bm.nRipe + bm.nUnripe } -// Pause goes to sleep for a period of time based on the settings. -func (bm *BlazeManager) Pause() { - // we always pause at least a quarter of a second to allow the node to 'rest' - time.Sleep(250 * time.Millisecond) - isDefaultSleep := bm.opts.Sleep >= 13 && bm.opts.Sleep <= 14 - distanceFromHead := bm.meta.Latest - bm.meta.Staging - shouldSleep := !isDefaultSleep || distanceFromHead <= (2*bm.opts.Settings.Unripe_dist) - if shouldSleep { - sleep := bm.opts.Sleep // this value may change elsewhere allow us to break out of sleeping???? - if sleep > 1 { - logger.Info("Sleeping for", sleep, "seconds -", distanceFromHead, "away from head.") - } - halfSecs := (sleep * 2) - 1 // we already slept one quarter of a second - for i := 0; i < int(halfSecs); i++ { - time.Sleep(time.Duration(500) * time.Millisecond) - } +// IsTestMode returns true if the scraper is running in test mode. +func (bm *BlazeManager) IsTestMode() bool { + return bm.opts.Globals.TestMode +} + +// AllowMissing returns true for all chains but mainnet and the value +// of the config item on mainnet (false by default). The scraper will +// halt if AllowMissing is false and a block with zero appearances is +// encountered. +func (bm *BlazeManager) AllowMissing() bool { + if bm.chain != "mainnet" { + return true } + return bm.opts.Settings.Allow_missing +} + +// PerChunk returns the number of blocks to process per chunk. +func (bm *BlazeManager) PerChunk() base.Blknum { + return bm.opts.Settings.Apps_per_chunk +} + +// FirstSnap returns the first block to process. +func (bm *BlazeManager) FirstSnap() base.Blknum { + return bm.opts.Settings.First_snap +} + +// SnapTo returns the number of blocks to process per chunk. +func (bm *BlazeManager) SnapTo() base.Blknum { + return bm.opts.Settings.Snap_to_grid +} + +// IsSnap returns true if the block is a snap point. +func (bm *BlazeManager) IsSnap(block base.Blknum) bool { + return block >= bm.FirstSnap() && (block%bm.SnapTo()) == 0 +} + +// StageFolder returns the folder where the stage file is stored. +func (bm *BlazeManager) StageFolder() string { + return filepath.Join(config.GetPathToIndex(bm.chain), "staging") +} + +// RipeFolder returns the folder where the stage file is stored. +func (bm *BlazeManager) RipeFolder() string { + return filepath.Join(config.GetPathToIndex(bm.chain), "ripe") } -// scrapedData combines the extracted block data, trace data, and log data into a -// structure that is passed through to the AddressChannel for further processing. -type scrapedData struct { - bn base.Blknum - traces []types.SimpleTrace - receipts []types.SimpleReceipt +// UnripeFolder returns the folder where the stage file is stored. +func (bm *BlazeManager) UnripeFolder() string { + return filepath.Join(config.GetPathToIndex(bm.chain), "unripe") } diff --git a/src/apps/chifra/internal/scrape/scrape_manager_utils.go b/src/apps/chifra/internal/scrape/scrape_manager_utils.go new file mode 100644 index 0000000000..0440c487d6 --- /dev/null +++ b/src/apps/chifra/internal/scrape/scrape_manager_utils.go @@ -0,0 +1,57 @@ +package scrapePkg + +import ( + "fmt" + "strings" + "time" + + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/colors" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/logger" + "github.com/TrueBlocks/trueblocks-core/src/apps/chifra/pkg/utils" +) + +func colored(s string) string { + s = strings.Replace(s, "{", colors.Green, -1) + s = strings.Replace(s, "}", colors.Off, -1) + return s +} + +// Report prints out a report of the progress of the scraper. +func (bm *BlazeManager) report(nBlocks, perChunk, nChunks, nAppsNow, nAppsFound, nAddrsFound int) { + nNeeded := perChunk - utils.Min(perChunk, nAppsNow) + appsPerBlock := float64(nAppsFound) / float64(nBlocks) + appsPerAddr := float64(nAppsFound) / float64(nAddrsFound) + pctFull := float64(nAppsNow) / float64(perChunk) + + msg := fmt.Sprintf(`%s #{%d}, found {%6d} apps, {%5d} addrs ({%0.1f/addr}), in {%4d} blks ({%0.1f}/blk). Created {%d} chunks, staged {%5d} of {%d} ({%0.1f%%}). Need {%5d} more.`, + bm.chain, + bm.EndBlock(), + nAppsFound, + nAddrsFound, + appsPerAddr, + nBlocks, + appsPerBlock, + nChunks, + nAppsNow, + perChunk, + pctFull*100, + nNeeded, + ) + logger.Info(colored(msg)) +} + +// Pause goes to sleep for a period of time based on the settings. +func (opts *ScrapeOptions) pause(dist uint64) { + // we always pause at least a quarter of a second to allow the node to 'rest' + time.Sleep(250 * time.Millisecond) + isDefaultSleep := opts.Sleep >= 13 && opts.Sleep <= 14 + shouldSleep := !isDefaultSleep || dist <= (2*opts.Settings.Unripe_dist) + if shouldSleep { + sleep := opts.Sleep // this value may change elsewhere allow us to break out of sleeping???? + logger.Progress(sleep > 1, "Sleeping for", sleep, "seconds -", dist, "away from head.") + halfSecs := (sleep * 2) - 1 // we already slept one quarter of a second + for i := 0; i < int(halfSecs); i++ { + time.Sleep(time.Duration(500) * time.Millisecond) + } + } +}