Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sync wait nicer #3991

Merged
merged 1 commit into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,8 @@ type ActiveSync struct {

type SyncState struct {
ActiveSyncs []ActiveSync

VMApplied uint64
}

type SyncStateStage int
Expand Down
11 changes: 11 additions & 0 deletions chain/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"reflect"
"sync/atomic"
"time"

block "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -40,6 +41,12 @@ var log = logging.Logger("vm")
var actorLog = logging.Logger("actors")
var gasOnActorExec = newGasCharge("OnActorExec", 0, 0)

// stat counters
var (
StatSends uint64
StatApplied uint64
)

// ResolveToKeyAddr returns the public key type of address (`BLS`/`SECP256K1`) of an account actor identified by `addr`.
func ResolveToKeyAddr(state types.StateTree, cst cbor.IpldStore, addr address.Address) (address.Address, error) {
if addr.Protocol() == address.BLS || addr.Protocol() == address.SECP256K1 {
Expand Down Expand Up @@ -204,6 +211,8 @@ type ApplyRet struct {
func (vm *VM) send(ctx context.Context, msg *types.Message, parent *Runtime,
gasCharge *GasCharge, start time.Time) ([]byte, aerrors.ActorError, *Runtime) {

defer atomic.AddUint64(&StatSends, 1)

st := vm.cstate

origin := msg.From
Expand Down Expand Up @@ -312,6 +321,7 @@ func checkMessage(msg *types.Message) error {

func (vm *VM) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
start := build.Clock.Now()
defer atomic.AddUint64(&StatApplied, 1)
ret, actorErr, rt := vm.send(ctx, msg, nil, nil, start)
rt.finilizeGasTracing()
return &ApplyRet{
Expand All @@ -331,6 +341,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet,
start := build.Clock.Now()
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End()
defer atomic.AddUint64(&StatApplied, 1)
msg := cmsg.VMMessage()
if span.IsRecordingEvents() {
span.AddAttributes(
Expand Down
33 changes: 31 additions & 2 deletions cli/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ var syncCheckpointCmd = &cli.Command{
}

func SyncWait(ctx context.Context, napi api.FullNode) error {
tick := time.Second / 4

lastLines := 0
ticker := time.NewTicker(tick)
defer ticker.Stop()

samples := 8
i := 0
var app, lastApp uint64

for {
state, err := napi.SyncState(ctx)
if err != nil {
Expand Down Expand Up @@ -266,7 +276,24 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
heightDiff = 0
}

fmt.Printf("\r\x1b[2KWorker %d: Base Height: %d\tTarget Height: %d\t Height diff: %d\tTarget: %s\tState: %s\tHeight: %d", working, baseHeight, theight, heightDiff, target, ss.Stage, ss.Height)
for i := 0; i < lastLines; i++ {
fmt.Print("\r\x1b[2K\x1b[A")
}

fmt.Printf("Worker: %d; Base: %d; Target: %d (diff: %d)\n", working, baseHeight, theight, heightDiff)
fmt.Printf("State: %s; Current Epoch: %d; Todo: %d\n", ss.Stage, ss.Height, theight-ss.Height)
lastLines = 2

if i%samples == 0 {
lastApp = app
app = state.VMApplied
}
if i > 0 {
fmt.Printf("Validated %d messages (%d per second)\n", state.VMApplied, (app-lastApp)*uint64(time.Second/tick)/uint64(samples))
lastLines++
}

_ = target // todo: maybe print? (creates a bunch of line wrapping issues with most tipsets)

if time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs) {
fmt.Println("\nDone!")
Expand All @@ -277,7 +304,9 @@ func SyncWait(ctx context.Context, napi api.FullNode) error {
case <-ctx.Done():
fmt.Println("\nExit by user")
return nil
case <-build.Clock.After(1 * time.Second):
case <-ticker.C:
}

i++
}
}
3 changes: 2 additions & 1 deletion documentation/en/api-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -4303,7 +4303,8 @@ Inputs: `null`
Response:
```json
{
"ActiveSyncs": null
"ActiveSyncs": null,
"VMApplied": 42
}
```

Expand Down
6 changes: 5 additions & 1 deletion node/impl/full/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package full

import (
"context"
"sync/atomic"

cid "github.com/ipfs/go-cid"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

Expand All @@ -28,7 +30,9 @@ type SyncAPI struct {
func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
states := a.Syncer.State()

out := &api.SyncState{}
out := &api.SyncState{
VMApplied: atomic.LoadUint64(&vm.StatApplied),
}

for i := range states {
ss := &states[i]
Expand Down