Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resumable newblock in mining #1891

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ createBlock validate nonce pact = do

-- assemble block without nonce and timestamp

T2 parent payload <- newBlock noMiner pact
bip <- newBlock noMiner True pact
let parent = _blockInProgressParentHeader bip
let payload = blockInProgressToPayloadWithOutputs bip

let creationTime = add second $ _blockCreationTime $ _parentHeader parent
let bh = newBlockHeader
Expand Down
70 changes: 52 additions & 18 deletions src/Chainweb/Chainweb/MinerResources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}

-- |
Expand Down Expand Up @@ -64,10 +65,10 @@ import Chainweb.Pact.Utils
import Chainweb.Payload
import Chainweb.Payload.PayloadStore
import Chainweb.Sync.WebBlockHeaderStore
import Chainweb.Time (Micros, Time, minute, getCurrentTimeIntegral, scaleTimeSpan)
import Chainweb.Utils (fromJuste, runForever, thd, T2(..), T3(..))
import Chainweb.Time
import Chainweb.Utils
import Chainweb.Version
import Chainweb.WebPactExecutionService (_webPactExecutionService)
import Chainweb.WebPactExecutionService

import Data.LogMessage (JsonLog(..), LogFunction)

Expand Down Expand Up @@ -96,8 +97,9 @@ withMiningCoordination logger conf cdb inner
in fmap ((mid,) . HM.fromList) $
forM cids $ \cid -> do
let bh = fromMaybe (genesisBlockHeader v cid) (HM.lookup cid (_cutMap cut))
fmap ((cid,) . over _2 Just) $
getPayload (ParentHeader bh) cid miner
newBlock <- getPayload cid miner (ParentHeader bh)
return (cid, Just newBlock)

m <- newTVarIO initialPw
c503 <- newIORef 0
c403 <- newIORef 0
Expand Down Expand Up @@ -141,30 +143,62 @@ withMiningCoordination logger conf cdb inner
pw <- readTVarIO tpw
let
-- we assume that this path always exists in PrimedWork and never delete it.
ourMiner :: Traversal' PrimedWork (T2 ParentHeader (Maybe PayloadWithOutputs))
ourMiner = _Wrapped' . at (view minerId miner) . _Just . at cid . _Just
let !(T2 ph _) = fromJuste $ pw ^? ourMiner
-- wait for a block different from what we've got primed work for
new <- awaitNewBlock cdb cid (_parentHeader ph)
ourMiner :: Traversal' PrimedWork (Maybe NewBlock)
ourMiner = _Wrapped' . ix (view minerId miner) . ix cid
let !outdatedPayload = pw ^?! ourMiner . _Just
let ParentHeader outdatedParent = newBlockParentHeader outdatedPayload
let
periodicallyRefreshPayload = do
let delay =
timeSpanToMicros $ _coordinationPayloadRefreshDelay coordConf
threadDelay (fromIntegral @Micros @Int delay)
when (not $ v ^. versionCheats . disablePact) $ do
-- "stale" in the sense of not having all of the transactions
-- that it could. it still has the latest possible parent
staleBlockInProgress <- atomically $ do
primed <- readTVar tpw <&> (^?! (ourMiner . _Just))
case primed of
NewBlockInProgress bip -> return bip
NewBlockPayload {} ->
error "periodicallyRefreshPayload: encountered NewBlockPayload in PrimedWork, which cannot be refreshed"
maybeNewBlock <- _pactContinueBlock pact cid staleBlockInProgress
-- if continuing returns Nothing then the parent header
-- isn't available in the checkpointer right now.
-- in that case we just mark the payload as not stale
atomically $ modifyTVar' tpw $
ourMiner .~ Just (NewBlockInProgress (fromMaybe staleBlockInProgress maybeNewBlock))
periodicallyRefreshPayload

newParent <- either ParentHeader id <$> race
-- wait for a block different from what we've got primed work for
(awaitNewBlock cdb cid outdatedParent)
-- in the meantime, periodically refresh the payload to make sure
-- it has all of the transactions it can have
periodicallyRefreshPayload

-- Temporarily block this chain from being considered for queries
atomically $ modifyTVar' tpw (ourMiner . _2 .~ Nothing)
-- Generate new payload for this miner
newParentAndPayload <- getPayload (ParentHeader new) cid miner
atomically $ modifyTVar' tpw (ourMiner .~ over _2 Just newParentAndPayload)
atomically $ modifyTVar' tpw (ourMiner .~ Nothing)

-- Get a payload for the new block
newBlock <- getPayload cid miner newParent

atomically $ modifyTVar' tpw (ourMiner .~ Just newBlock)


getPayload :: ParentHeader -> ChainId -> Miner -> IO (T2 ParentHeader PayloadWithOutputs)
getPayload new cid m =
getPayload :: ChainId -> Miner -> ParentHeader -> IO NewBlock
getPayload cid m ph =
if v ^. versionCheats . disablePact
-- if pact is disabled, we must keep track of the latest header
-- ourselves. otherwise we use the header we get from newBlock as the
-- real parent. newBlock may return a header in the past due to a race
-- with rocksdb though that shouldn't cause a problem, just wasted work,
-- see docs for
-- Chainweb.Pact.PactService.Checkpointer.findLatestValidBlockHeader'
then return $ T2 new emptyPayload
then return $
NewBlockPayload ph emptyPayload
else trace (logFunction logger)
"Chainweb.Chainweb.MinerResources.withMiningCoordination.newBlock"
() 1 (_pactNewBlock pact cid m)
() 1 (_pactNewBlock pact cid m True)

pact :: PactExecutionService
pact = _webPactExecutionService $ view cutDbPactService cdb
Expand Down
14 changes: 13 additions & 1 deletion src/Chainweb/Miner/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}

-- |
Expand Down Expand Up @@ -55,7 +56,7 @@ import Pact.Types.Term (mkKeySet, PublicKeyText(..))
-- internal modules

import Chainweb.Miner.Pact (Miner(..), MinerKeys(..), MinerId(..), minerId)
import Chainweb.Time (Seconds)
import Chainweb.Time

---

Expand Down Expand Up @@ -138,6 +139,7 @@ data CoordinationConfig = CoordinationConfig
-- ^ the maximum number of concurrent update streams that is supported
, _coordinationUpdateStreamTimeout :: !Seconds
-- ^ the duration that an update stream is kept open in seconds
, _coordinationPayloadRefreshDelay :: !(TimeSpan Micros)
} deriving stock (Eq, Show, Generic)

coordinationEnabled :: Lens' CoordinationConfig Bool
Expand All @@ -157,13 +159,18 @@ coordinationUpdateStreamTimeout :: Lens' CoordinationConfig Seconds
coordinationUpdateStreamTimeout =
lens _coordinationUpdateStreamTimeout (\m c -> m { _coordinationUpdateStreamTimeout = c })

coordinationPayloadRefreshDelay :: Lens' CoordinationConfig (TimeSpan Micros)
coordinationPayloadRefreshDelay =
lens _coordinationPayloadRefreshDelay (\m c -> m { _coordinationPayloadRefreshDelay = c })

instance ToJSON CoordinationConfig where
toJSON o = object
[ "enabled" .= _coordinationEnabled o
, "limit" .= _coordinationReqLimit o
, "miners" .= (J.toJsonViaEncode <$> S.toList (_coordinationMiners o))
, "updateStreamLimit" .= _coordinationUpdateStreamLimit o
, "updateStreamTimeout" .= _coordinationUpdateStreamTimeout o
, "payloadRefreshDelay" .= _coordinationPayloadRefreshDelay o
]

instance FromJSON (CoordinationConfig -> CoordinationConfig) where
Expand All @@ -173,6 +180,7 @@ instance FromJSON (CoordinationConfig -> CoordinationConfig) where
<*< coordinationMiners .fromLeftMonoidalUpdate %.: "miners" % o
<*< coordinationUpdateStreamLimit ..: "updateStreamLimit" % o
<*< coordinationUpdateStreamTimeout ..: "updateStreamTimeout" % o
<*< coordinationPayloadRefreshDelay ..: "payloadRefreshDelay" % o

defaultCoordination :: CoordinationConfig
defaultCoordination = CoordinationConfig
Expand All @@ -181,6 +189,7 @@ defaultCoordination = CoordinationConfig
, _coordinationReqLimit = 1200
, _coordinationUpdateStreamLimit = 2000
, _coordinationUpdateStreamTimeout = 240
, _coordinationPayloadRefreshDelay = TimeSpan (Micros 15_000_000)
}

pCoordinationConfig :: MParser CoordinationConfig
Expand All @@ -198,6 +207,9 @@ pCoordinationConfig = id
<*< coordinationUpdateStreamTimeout .:: jsonOption
% long "mining-update-stream-timeout"
<> help "duration that an update stream is kept open in seconds"
<*< coordinationPayloadRefreshDelay .:: jsonOption
% long "mining-payload-refresh-delay"
<> help "frequency that the mining payload is refreshed"

pMiner :: String -> Parser Miner
pMiner prefix = pkToMiner <$> pPk
Expand Down
20 changes: 14 additions & 6 deletions src/Chainweb/Miner/Coordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}

-- |
-- Module: Chainweb.Miner.Coordinator
Expand Down Expand Up @@ -130,14 +131,14 @@ data MiningCoordination logger tbl = MiningCoordination
-- made as often as desired, without clogging the Pact queue.
--
newtype PrimedWork =
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (T2 ParentHeader (Maybe PayloadWithOutputs))))
PrimedWork (HM.HashMap MinerId (HM.HashMap ChainId (Maybe NewBlock)))
deriving newtype (Semigroup, Monoid)
deriving stock Generic
deriving anyclass (Wrapped)

resetPrimed :: MinerId -> ChainId -> PrimedWork -> PrimedWork
resetPrimed mid cid (PrimedWork pw) = PrimedWork
$! HM.adjust (HM.adjust (_2 .~ Nothing) cid) mid pw
$! HM.adjust (HM.adjust (\_ -> Nothing) cid) mid pw

-- | Data shared between the mining threads represented by `newWork` and
-- `publish`.
Expand Down Expand Up @@ -200,21 +201,28 @@ newWork logFun choice eminer@(Miner mid _) hdb pact tpw c = do
mpw <- atomically $ do
PrimedWork pw <- readTVar tpw
mpw <- maybe retry return (HM.lookup mid pw)
guard (any (isJust . ssnd) mpw)
guard (any isJust mpw)
return mpw
let mr = T2
<$> HM.lookup cid mpw
<*> getCutExtension c cid

case mr of
Just (T2 (T2 _ Nothing) _) -> do
Just (T2 Nothing _) -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " has stale work"
newWork logFun Anything eminer hdb pact tpw c
Nothing -> do
logFun @T.Text Debug $ "newWork: chain " <> sshow cid <> " not mineable"
newWork logFun Anything eminer hdb pact tpw c
Just (T2 (T2 (ParentHeader primedParent) (Just payload)) extension)
Just
(T2
(Just
newBlock@(newBlockParentHeader -> ParentHeader primedParent)
)
extension
)
| _blockHash primedParent == _blockHash (_parentHeader (_cutExtensionParent extension)) -> do
let payload = newBlockToPayloadWithOutputs newBlock
let !phash = _payloadWithOutputsPayloadHash payload
!wh <- newWorkHeader hdb extension phash
pure $ Just $ T2 wh payload
Expand Down Expand Up @@ -333,7 +341,7 @@ work mr mcid m = do
"no chains have primed work"
| otherwise ->
"all chains with primed work may be stalled. chains with primed payloads: "
<> sshow (sort [cid | (cid, T2 _ (Just _)) <- HM.toList mpw])
<> sshow (sort [cid | (cid, Just _) <- HM.toList mpw])
)

logDelays n'
Expand Down
30 changes: 20 additions & 10 deletions src/Chainweb/Pact/Backend/ChainwebPactDb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,25 @@ createVersionedTable tablename db = do
"CREATE INDEX IF NOT EXISTS " <> tbl ixName <> " ON " <> tbl tablename <> "(txid DESC);"

-- | Delete any state from the database newer than the input parent header.
-- Returns the ending txid of the input parent header.
rewindDbTo
:: SQLiteEnv
-> Maybe ParentHeader
-> IO ()
rewindDbTo db Nothing = rewindDbToGenesis db
-> IO TxId
rewindDbTo db Nothing = do
rewindDbToGenesis db
return 0
rewindDbTo db mh@(Just (ParentHeader ph)) = do
!endingtxid <- getEndTxId "rewindDbToBlock" db mh
rewindDbToBlock db (_blockHeight ph) endingtxid
!maybeEndingTxId <- getEndTxId "rewindDbToBlock" db mh
endingTxId <- maybe
(throwM
$ BlockHeaderLookupFailure
$ "rewindDbTo.getEndTxId: not in db: "
<> sshow ph)
return
maybeEndingTxId
rewindDbToBlock db (_blockHeight ph) endingTxId
return endingTxId

-- rewind before genesis, delete all user tables and all rows in all tables
rewindDbToGenesis
Expand Down Expand Up @@ -862,12 +873,12 @@ initSchema logger sql =
"CREATE INDEX IF NOT EXISTS \
\ transactionIndexByBH ON TransactionIndex(blockheight)";

getEndTxId :: Text -> SQLiteEnv -> Maybe ParentHeader -> IO TxId
getEndTxId :: Text -> SQLiteEnv -> Maybe ParentHeader -> IO (Maybe TxId)
getEndTxId msg sql pc = case pc of
Nothing -> return 0
Nothing -> return (Just 0)
Just (ParentHeader ph) -> getEndTxId' msg sql (_blockHeight ph) (_blockHash ph)

getEndTxId' :: Text -> SQLiteEnv -> BlockHeight -> BlockHash -> IO TxId
getEndTxId' :: Text -> SQLiteEnv -> BlockHeight -> BlockHash -> IO (Maybe TxId)
getEndTxId' msg sql bh bhsh = do
r <- qry sql
"SELECT endingtxid FROM BlockHistory WHERE blockheight = ? and hash = ?;"
Expand All @@ -876,9 +887,8 @@ getEndTxId' msg sql bh bhsh = do
]
[RInt]
case r of
[[SInt tid]] -> return (TxId (fromIntegral tid))
[] -> throwM $ BlockHeaderLookupFailure $ msg <> ".getEndTxId: not in db: " <>
sshow (bh, bhsh)
[[SInt tid]] -> return $ Just (TxId (fromIntegral tid))
[] -> return Nothing
_ -> internalError $ msg <> ".getEndTxId: expected single-row int result, got " <> sshow r

-- | Careful doing this! It's expensive and for our use case, probably pointless.
Expand Down
Loading
Loading