Skip to content

Commit

Permalink
polish(stmgr): define ExecMonitor for message applicaiton callback
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Jun 4, 2021
1 parent bf46876 commit 6a52f6d
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 77 deletions.
20 changes: 7 additions & 13 deletions chain/stmgr/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,18 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
var errHaltExecution = fmt.Errorf("halt")

func (sm *StateManager) Replay(ctx context.Context, ts *types.TipSet, mcid cid.Cid) (*types.Message, *vm.ApplyRet, error) {
var outm *types.Message
var outr *vm.ApplyRet

_, _, err := sm.computeTipSetState(ctx, ts, func(c cid.Cid, m *types.Message, ret *vm.ApplyRet) error {
if c == mcid {
outm = m
outr = ret
return errHaltExecution
}
return nil
})
var finder messageFinder
// message to find
finder.mcid = mcid

_, _, err := sm.computeTipSetState(ctx, ts, &finder)
if err != nil && !xerrors.Is(err, errHaltExecution) {
return nil, nil, xerrors.Errorf("unexpected error during execution: %w", err)
}

if outr == nil {
if finder.outr == nil {
return nil, nil, xerrors.Errorf("given message not found in tipset")
}

return outm, outr, nil
return finder.outm, finder.outr, nil
}
48 changes: 24 additions & 24 deletions chain/stmgr/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type MigrationCache interface {
type MigrationFunc func(
ctx context.Context,
sm *StateManager, cache MigrationCache,
cb ExecCallback, oldState cid.Cid,
cb ExecMonitor, oldState cid.Cid,
height abi.ChainEpoch, ts *types.TipSet,
) (newState cid.Cid, err error)

Expand Down Expand Up @@ -292,7 +292,7 @@ func (us UpgradeSchedule) Validate() error {
return nil
}

func (sm *StateManager) handleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecCallback, ts *types.TipSet) (cid.Cid, error) {
func (sm *StateManager) handleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
retCid := root
var err error
u := sm.stateMigrations[height]
Expand Down Expand Up @@ -472,7 +472,7 @@ func doTransfer(tree types.StateTree, from, to address.Address, amt abi.TokenAmo
return nil
}

func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ MigrationCache, em ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Some initial parameters
FundsForMiners := types.FromFil(1_000_000)
LookbackEpoch := abi.ChainEpoch(32000)
Expand Down Expand Up @@ -722,12 +722,12 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
return cid.Undef, xerrors.Errorf("resultant state tree account balance was not correct: %s", total)
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, builtin.SystemActorAddr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: types.ExecutionTrace{
Expand All @@ -740,15 +740,15 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
},
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return cid.Undef, xerrors.Errorf("recording transfers: %w", err)
}
}

return tree.Flush(ctx)
}

func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
store := sm.cs.ActorStore(ctx)

if build.UpgradeLiftoffHeight <= epoch {
Expand Down Expand Up @@ -785,12 +785,12 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return cid.Undef, xerrors.Errorf("resetting genesis msig start epochs: %w", err)
}

err = splitGenesisMultisig0(ctx, cb, split1, store, tree, 50, epoch)
err = splitGenesisMultisig0(ctx, cb, split1, store, tree, 50, epoch, ts)
if err != nil {
return cid.Undef, xerrors.Errorf("splitting first msig: %w", err)
}

err = splitGenesisMultisig0(ctx, cb, split2, store, tree, 50, epoch)
err = splitGenesisMultisig0(ctx, cb, split2, store, tree, 50, epoch, ts)
if err != nil {
return cid.Undef, xerrors.Errorf("splitting second msig: %w", err)
}
Expand All @@ -803,7 +803,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return tree.Flush(ctx)
}

func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {

store := sm.cs.ActorStore(ctx)
tree, err := sm.StateTree(root)
Expand All @@ -829,7 +829,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return tree.Flush(ctx)
}

func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)

Expand Down Expand Up @@ -875,7 +875,7 @@ func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return newRoot, nil
}

func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
tree, err := sm.StateTree(root)
if err != nil {
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
Expand All @@ -889,7 +889,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return tree.Flush(ctx)
}

func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
if build.BuildType != build.BuildMainnet {
return root, nil
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return newRoot, nil
}

func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, cb ExecCallback, epoch abi.ChainEpoch) error {
func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, em ExecMonitor, epoch abi.ChainEpoch, ts *types.TipSet) error {
a, err := tree.GetActor(addr)
if xerrors.Is(err, types.ErrActorNotFound) {
return types.ErrActorNotFound
Expand All @@ -950,18 +950,18 @@ func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Add
return xerrors.Errorf("transferring terminated actor's balance: %w", err)
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: trace,
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return xerrors.Errorf("recording transfers: %w", err)
}
}
Expand Down Expand Up @@ -995,7 +995,7 @@ func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Add
return tree.SetActor(init_.Address, ia)
}

func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand All @@ -1019,7 +1019,7 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache
}

if build.BuildType == build.BuildMainnet {
err := terminateActor(ctx, tree, build.ZeroAddress, cb, epoch)
err := terminateActor(ctx, tree, build.ZeroAddress, cb, epoch, ts)
if err != nil && !xerrors.Is(err, types.ErrActorNotFound) {
return cid.Undef, xerrors.Errorf("deleting zero bls actor: %w", err)
}
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func upgradeActorsV3Common(
return newRoot, nil
}

func UpgradeActorsV4(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV4(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand Down Expand Up @@ -1296,7 +1296,7 @@ func setNetworkName(ctx context.Context, store adt.Store, tree *state.StateTree,
return nil
}

func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Address, store adt0.Store, tree *state.StateTree, portions uint64, epoch abi.ChainEpoch) error {
func splitGenesisMultisig0(ctx context.Context, em ExecMonitor, addr address.Address, store adt0.Store, tree *state.StateTree, portions uint64, epoch abi.ChainEpoch, ts *types.TipSet) error {
if portions < 1 {
return xerrors.Errorf("cannot split into 0 portions")
}
Expand Down Expand Up @@ -1393,12 +1393,12 @@ func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Ad
i++
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: types.ExecutionTrace{
Expand All @@ -1411,7 +1411,7 @@ func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Ad
},
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return xerrors.Errorf("recording transfers: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestForkHeightTriggers(t *testing.T) {
cg.ChainStore(), UpgradeSchedule{{
Network: 1,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore())

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestForkRefuseCall(t *testing.T) {
Network: 1,
Expensive: true,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
return root, nil
}}})
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestForkPreMigration(t *testing.T) {
cg.ChainStore(), UpgradeSchedule{{
Network: 1,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {

// Make sure the test that should be canceled, is canceled.
Expand Down
61 changes: 25 additions & 36 deletions chain/stmgr/stmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type StateManager struct {

genesisPledge abi.TokenAmount
genesisMarketFunds abi.TokenAmount

tsExecMonitor ExecMonitor
}

// Caches a single state tree
Expand Down Expand Up @@ -171,6 +173,15 @@ func NewStateManagerWithUpgradeSchedule(cs *store.ChainStore, us UpgradeSchedule
}, nil
}

func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, us UpgradeSchedule, em ExecMonitor) (*StateManager, error) {
sm, err := NewStateManagerWithUpgradeSchedule(cs, us)
if err != nil {
return nil, err
}
sm.tsExecMonitor = em
return sm, nil
}

func cidsToKey(cids []cid.Cid) string {
var out string
for _, c := range cids {
Expand Down Expand Up @@ -255,47 +266,25 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st c
return ts.Blocks()[0].ParentStateRoot, ts.Blocks()[0].ParentMessageReceipts, nil
}

st, rec, err = sm.computeTipSetState(ctx, ts, nil)
st, rec, err = sm.computeTipSetState(ctx, ts, sm.tsExecMonitor)
if err != nil {
return cid.Undef, cid.Undef, err
}

return st, rec, nil
}

func traceFunc(trace *[]*api.InvocResult) func(mcid cid.Cid, msg *types.Message, ret *vm.ApplyRet) error {
return func(mcid cid.Cid, msg *types.Message, ret *vm.ApplyRet) error {
ir := &api.InvocResult{
MsgCid: mcid,
Msg: msg,
MsgRct: &ret.MessageReceipt,
ExecutionTrace: ret.ExecutionTrace,
Duration: ret.Duration,
}
if ret.ActorErr != nil {
ir.Error = ret.ActorErr.Error()
}
if ret.GasCosts != nil {
ir.GasCost = MakeMsgGasCost(msg, ret)
}
*trace = append(*trace, ir)
return nil
}
}

func (sm *StateManager) ExecutionTrace(ctx context.Context, ts *types.TipSet) (cid.Cid, []*api.InvocResult, error) {
var trace []*api.InvocResult
st, _, err := sm.computeTipSetState(ctx, ts, traceFunc(&trace))
var invocTrace []*api.InvocResult
st, _, err := sm.computeTipSetState(ctx, ts, &InvocationTracer{trace: &invocTrace})
if err != nil {
return cid.Undef, nil, err
}

return st, trace, nil
return st, invocTrace, nil
}

type ExecCallback func(cid.Cid, *types.Message, *vm.ApplyRet) error

func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEpoch, pstate cid.Cid, bms []store.BlockMessages, epoch abi.ChainEpoch, r vm.Rand, cb ExecCallback, baseFee abi.TokenAmount, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEpoch, pstate cid.Cid, bms []store.BlockMessages, epoch abi.ChainEpoch, r vm.Rand, em ExecMonitor, baseFee abi.TokenAmount, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
done := metrics.Timer(ctx, metrics.VMApplyBlocksTotal)
defer done()

Expand Down Expand Up @@ -341,8 +330,8 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
if err != nil {
return err
}
if cb != nil {
if err := cb(cronMsg.Cid(), cronMsg, ret); err != nil {
if em != nil {
if err := em.MessageApplied(ctx, ts, cronMsg.Cid(), cronMsg, ret, true); err != nil {
return xerrors.Errorf("callback failed on cron message: %w", err)
}
}
Expand All @@ -368,7 +357,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp

// handle state forks
// XXX: The state tree
newState, err := sm.handleStateForks(ctx, pstate, i, cb, ts)
newState, err := sm.handleStateForks(ctx, pstate, i, em, ts)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("error handling state forks: %w", err)
}
Expand Down Expand Up @@ -407,8 +396,8 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
gasReward = big.Add(gasReward, r.GasCosts.MinerTip)
penalty = big.Add(penalty, r.GasCosts.MinerPenalty)

if cb != nil {
if err := cb(cm.Cid(), m, r); err != nil {
if em != nil {
if err := em.MessageApplied(ctx, ts, cm.Cid(), m, r, false); err != nil {
return cid.Undef, cid.Undef, err
}
}
Expand Down Expand Up @@ -440,8 +429,8 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
if actErr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("failed to apply reward message for miner %s: %w", b.Miner, actErr)
}
if cb != nil {
if err := cb(rwMsg.Cid(), rwMsg, ret); err != nil {
if em != nil {
if err := em.MessageApplied(ctx, ts, rwMsg.Cid(), rwMsg, ret, true); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("callback failed on reward message: %w", err)
}
}
Expand Down Expand Up @@ -483,7 +472,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
return st, rectroot, nil
}

func (sm *StateManager) computeTipSetState(ctx context.Context, ts *types.TipSet, cb ExecCallback) (cid.Cid, cid.Cid, error) {
func (sm *StateManager) computeTipSetState(ctx context.Context, ts *types.TipSet, em ExecMonitor) (cid.Cid, cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "computeTipSetState")
defer span.End()

Expand Down Expand Up @@ -519,7 +508,7 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, ts *types.TipSet

baseFee := blks[0].ParentBaseFee

return sm.ApplyBlocks(ctx, parentEpoch, pstate, blkmsgs, blks[0].Height, r, cb, baseFee, ts)
return sm.ApplyBlocks(ctx, parentEpoch, pstate, blkmsgs, blks[0].Height, r, em, baseFee, ts)
}

func (sm *StateManager) parentState(ts *types.TipSet) cid.Cid {
Expand Down
Loading

0 comments on commit 6a52f6d

Please sign in to comment.