Skip to content

Commit

Permalink
polish(stmgr): define ExecMonitor for message applicaiton callback
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Jun 4, 2021
1 parent bf46876 commit 053e1d4
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 89 deletions.
20 changes: 7 additions & 13 deletions chain/stmgr/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,18 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
var errHaltExecution = fmt.Errorf("halt")

func (sm *StateManager) Replay(ctx context.Context, ts *types.TipSet, mcid cid.Cid) (*types.Message, *vm.ApplyRet, error) {
var outm *types.Message
var outr *vm.ApplyRet

_, _, err := sm.computeTipSetState(ctx, ts, func(c cid.Cid, m *types.Message, ret *vm.ApplyRet) error {
if c == mcid {
outm = m
outr = ret
return errHaltExecution
}
return nil
})
var finder messageFinder
// message to find
finder.mcid = mcid

_, _, err := sm.computeTipSetState(ctx, ts, &finder)
if err != nil && !xerrors.Is(err, errHaltExecution) {
return nil, nil, xerrors.Errorf("unexpected error during execution: %w", err)
}

if outr == nil {
if finder.outr == nil {
return nil, nil, xerrors.Errorf("given message not found in tipset")
}

return outm, outr, nil
return finder.outm, finder.outr, nil
}
50 changes: 25 additions & 25 deletions chain/stmgr/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type MigrationCache interface {
type MigrationFunc func(
ctx context.Context,
sm *StateManager, cache MigrationCache,
cb ExecCallback, oldState cid.Cid,
cb ExecMonitor, oldState cid.Cid,
height abi.ChainEpoch, ts *types.TipSet,
) (newState cid.Cid, err error)

Expand Down Expand Up @@ -292,7 +292,7 @@ func (us UpgradeSchedule) Validate() error {
return nil
}

func (sm *StateManager) handleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecCallback, ts *types.TipSet) (cid.Cid, error) {
func (sm *StateManager) handleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
retCid := root
var err error
u := sm.stateMigrations[height]
Expand Down Expand Up @@ -472,7 +472,7 @@ func doTransfer(tree types.StateTree, from, to address.Address, amt abi.TokenAmo
return nil
}

func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ MigrationCache, em ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Some initial parameters
FundsForMiners := types.FromFil(1_000_000)
LookbackEpoch := abi.ChainEpoch(32000)
Expand Down Expand Up @@ -722,12 +722,12 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
return cid.Undef, xerrors.Errorf("resultant state tree account balance was not correct: %s", total)
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, builtin.SystemActorAddr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: types.ExecutionTrace{
Expand All @@ -740,15 +740,15 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
},
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return cid.Undef, xerrors.Errorf("recording transfers: %w", err)
}
}

return tree.Flush(ctx)
}

func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
store := sm.cs.ActorStore(ctx)

if build.UpgradeLiftoffHeight <= epoch {
Expand Down Expand Up @@ -785,12 +785,12 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return cid.Undef, xerrors.Errorf("resetting genesis msig start epochs: %w", err)
}

err = splitGenesisMultisig0(ctx, cb, split1, store, tree, 50, epoch)
err = splitGenesisMultisig0(ctx, cb, split1, store, tree, 50, epoch, ts)
if err != nil {
return cid.Undef, xerrors.Errorf("splitting first msig: %w", err)
}

err = splitGenesisMultisig0(ctx, cb, split2, store, tree, 50, epoch)
err = splitGenesisMultisig0(ctx, cb, split2, store, tree, 50, epoch, ts)
if err != nil {
return cid.Undef, xerrors.Errorf("splitting second msig: %w", err)
}
Expand All @@ -803,7 +803,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return tree.Flush(ctx)
}

func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {

store := sm.cs.ActorStore(ctx)
tree, err := sm.StateTree(root)
Expand All @@ -829,7 +829,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return tree.Flush(ctx)
}

func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)

Expand Down Expand Up @@ -875,7 +875,7 @@ func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return newRoot, nil
}

func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
tree, err := sm.StateTree(root)
if err != nil {
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
Expand All @@ -889,7 +889,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return tree.Flush(ctx)
}

func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
if build.BuildType != build.BuildMainnet {
return root, nil
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return newRoot, nil
}

func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, cb ExecCallback, epoch abi.ChainEpoch) error {
func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, em ExecMonitor, epoch abi.ChainEpoch, ts *types.TipSet) error {
a, err := tree.GetActor(addr)
if xerrors.Is(err, types.ErrActorNotFound) {
return types.ErrActorNotFound
Expand All @@ -950,18 +950,18 @@ func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Add
return xerrors.Errorf("transferring terminated actor's balance: %w", err)
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: trace,
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return xerrors.Errorf("recording transfers: %w", err)
}
}
Expand Down Expand Up @@ -995,7 +995,7 @@ func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Add
return tree.SetActor(init_.Address, ia)
}

func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand All @@ -1019,7 +1019,7 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache
}

if build.BuildType == build.BuildMainnet {
err := terminateActor(ctx, tree, build.ZeroAddress, cb, epoch)
err := terminateActor(ctx, tree, build.ZeroAddress, cb, epoch, ts)
if err != nil && !xerrors.Is(err, types.ErrActorNotFound) {
return cid.Undef, xerrors.Errorf("deleting zero bls actor: %w", err)
}
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func upgradeActorsV3Common(
return newRoot, nil
}

func UpgradeActorsV4(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV4(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func upgradeActorsV4Common(
return newRoot, nil
}

func UpgradeActorsV5(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV5(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand Down Expand Up @@ -1296,7 +1296,7 @@ func setNetworkName(ctx context.Context, store adt.Store, tree *state.StateTree,
return nil
}

func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Address, store adt0.Store, tree *state.StateTree, portions uint64, epoch abi.ChainEpoch) error {
func splitGenesisMultisig0(ctx context.Context, em ExecMonitor, addr address.Address, store adt0.Store, tree *state.StateTree, portions uint64, epoch abi.ChainEpoch, ts *types.TipSet) error {
if portions < 1 {
return xerrors.Errorf("cannot split into 0 portions")
}
Expand Down Expand Up @@ -1393,12 +1393,12 @@ func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Ad
i++
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: types.ExecutionTrace{
Expand All @@ -1411,7 +1411,7 @@ func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Ad
},
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return xerrors.Errorf("recording transfers: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestForkHeightTriggers(t *testing.T) {
cg.ChainStore(), UpgradeSchedule{{
Network: 1,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore())

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestForkRefuseCall(t *testing.T) {
Network: 1,
Expensive: true,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
return root, nil
}}})
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestForkPreMigration(t *testing.T) {
cg.ChainStore(), UpgradeSchedule{{
Network: 1,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {

// Make sure the test that should be canceled, is canceled.
Expand Down
Loading

0 comments on commit 053e1d4

Please sign in to comment.