Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polish(stmgr): define ExecMonitor for message application callback #6389

Merged
merged 1 commit into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions chain/stmgr/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,24 +239,18 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
var errHaltExecution = fmt.Errorf("halt")

func (sm *StateManager) Replay(ctx context.Context, ts *types.TipSet, mcid cid.Cid) (*types.Message, *vm.ApplyRet, error) {
var outm *types.Message
var outr *vm.ApplyRet

_, _, err := sm.computeTipSetState(ctx, ts, func(c cid.Cid, m *types.Message, ret *vm.ApplyRet) error {
if c == mcid {
outm = m
outr = ret
return errHaltExecution
}
return nil
})
var finder messageFinder
// message to find
finder.mcid = mcid

_, _, err := sm.computeTipSetState(ctx, ts, &finder)
if err != nil && !xerrors.Is(err, errHaltExecution) {
return nil, nil, xerrors.Errorf("unexpected error during execution: %w", err)
}

if outr == nil {
if finder.outr == nil {
return nil, nil, xerrors.Errorf("given message not found in tipset")
}

return outm, outr, nil
return finder.outm, finder.outr, nil
}
50 changes: 25 additions & 25 deletions chain/stmgr/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type MigrationCache interface {
type MigrationFunc func(
ctx context.Context,
sm *StateManager, cache MigrationCache,
cb ExecCallback, oldState cid.Cid,
cb ExecMonitor, oldState cid.Cid,
height abi.ChainEpoch, ts *types.TipSet,
) (newState cid.Cid, err error)

Expand Down Expand Up @@ -292,7 +292,7 @@ func (us UpgradeSchedule) Validate() error {
return nil
}

func (sm *StateManager) handleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecCallback, ts *types.TipSet) (cid.Cid, error) {
func (sm *StateManager) handleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
retCid := root
var err error
u := sm.stateMigrations[height]
Expand Down Expand Up @@ -472,7 +472,7 @@ func doTransfer(tree types.StateTree, from, to address.Address, amt abi.TokenAmo
return nil
}

func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ MigrationCache, em ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Some initial parameters
FundsForMiners := types.FromFil(1_000_000)
LookbackEpoch := abi.ChainEpoch(32000)
Expand Down Expand Up @@ -722,12 +722,12 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
return cid.Undef, xerrors.Errorf("resultant state tree account balance was not correct: %s", total)
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, builtin.SystemActorAddr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: types.ExecutionTrace{
Expand All @@ -740,15 +740,15 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
},
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return cid.Undef, xerrors.Errorf("recording transfers: %w", err)
}
}

return tree.Flush(ctx)
}

func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
store := sm.cs.ActorStore(ctx)

if build.UpgradeLiftoffHeight <= epoch {
Expand Down Expand Up @@ -785,12 +785,12 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return cid.Undef, xerrors.Errorf("resetting genesis msig start epochs: %w", err)
}

err = splitGenesisMultisig0(ctx, cb, split1, store, tree, 50, epoch)
err = splitGenesisMultisig0(ctx, cb, split1, store, tree, 50, epoch, ts)
if err != nil {
return cid.Undef, xerrors.Errorf("splitting first msig: %w", err)
}

err = splitGenesisMultisig0(ctx, cb, split2, store, tree, 50, epoch)
err = splitGenesisMultisig0(ctx, cb, split2, store, tree, 50, epoch, ts)
if err != nil {
return cid.Undef, xerrors.Errorf("splitting second msig: %w", err)
}
Expand All @@ -803,7 +803,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return tree.Flush(ctx)
}

func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {

store := sm.cs.ActorStore(ctx)
tree, err := sm.StateTree(root)
Expand All @@ -829,7 +829,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return tree.Flush(ctx)
}

func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)

Expand Down Expand Up @@ -875,7 +875,7 @@ func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return newRoot, nil
}

func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
tree, err := sm.StateTree(root)
if err != nil {
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
Expand All @@ -889,7 +889,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return tree.Flush(ctx)
}

func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
if build.BuildType != build.BuildMainnet {
return root, nil
}
Expand Down Expand Up @@ -935,7 +935,7 @@ func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return newRoot, nil
}

func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, cb ExecCallback, epoch abi.ChainEpoch) error {
func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, em ExecMonitor, epoch abi.ChainEpoch, ts *types.TipSet) error {
a, err := tree.GetActor(addr)
if xerrors.Is(err, types.ErrActorNotFound) {
return types.ErrActorNotFound
Expand All @@ -950,18 +950,18 @@ func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Add
return xerrors.Errorf("transferring terminated actor's balance: %w", err)
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: trace,
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return xerrors.Errorf("recording transfers: %w", err)
}
}
Expand Down Expand Up @@ -995,7 +995,7 @@ func terminateActor(ctx context.Context, tree *state.StateTree, addr address.Add
return tree.SetActor(init_.Address, ia)
}

func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand All @@ -1019,7 +1019,7 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache
}

if build.BuildType == build.BuildMainnet {
err := terminateActor(ctx, tree, build.ZeroAddress, cb, epoch)
err := terminateActor(ctx, tree, build.ZeroAddress, cb, epoch, ts)
if err != nil && !xerrors.Is(err, types.ErrActorNotFound) {
return cid.Undef, xerrors.Errorf("deleting zero bls actor: %w", err)
}
Expand Down Expand Up @@ -1097,7 +1097,7 @@ func upgradeActorsV3Common(
return newRoot, nil
}

func UpgradeActorsV4(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV4(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func upgradeActorsV4Common(
return newRoot, nil
}

func UpgradeActorsV5(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV5(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
if workerCount <= 0 {
Expand Down Expand Up @@ -1296,7 +1296,7 @@ func setNetworkName(ctx context.Context, store adt.Store, tree *state.StateTree,
return nil
}

func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Address, store adt0.Store, tree *state.StateTree, portions uint64, epoch abi.ChainEpoch) error {
func splitGenesisMultisig0(ctx context.Context, em ExecMonitor, addr address.Address, store adt0.Store, tree *state.StateTree, portions uint64, epoch abi.ChainEpoch, ts *types.TipSet) error {
if portions < 1 {
return xerrors.Errorf("cannot split into 0 portions")
}
Expand Down Expand Up @@ -1393,12 +1393,12 @@ func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Ad
i++
}

if cb != nil {
if em != nil {
// record the transfer in execution traces

fakeMsg := makeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))

if err := cb(fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
MessageReceipt: *makeFakeRct(),
ActorErr: nil,
ExecutionTrace: types.ExecutionTrace{
Expand All @@ -1411,7 +1411,7 @@ func splitGenesisMultisig0(ctx context.Context, cb ExecCallback, addr address.Ad
},
Duration: 0,
GasCosts: nil,
}); err != nil {
}, false); err != nil {
return xerrors.Errorf("recording transfers: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions chain/stmgr/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestForkHeightTriggers(t *testing.T) {
cg.ChainStore(), UpgradeSchedule{{
Network: 1,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore())

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestForkRefuseCall(t *testing.T) {
Network: 1,
Expensive: true,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
return root, nil
}}})
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestForkPreMigration(t *testing.T) {
cg.ChainStore(), UpgradeSchedule{{
Network: 1,
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecMonitor,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {

// Make sure the test that should be canceled, is canceled.
Expand Down
Loading