diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 347e90044d5..5a6c751bce5 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -2,6 +2,7 @@ package messagepool import ( "context" + "time" "github.com/ipfs/go-cid" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -9,9 +10,16 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) +var ( + HeadChangeCoalesceMinDelay = 2 * time.Second + HeadChangeCoalesceMaxDelay = 6 * time.Second + HeadChangeCoalesceMergeInterval = time.Second +) + type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(m types.ChainMsg) (cid.Cid, error) @@ -34,7 +42,13 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { } func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { - mpp.sm.ChainStore().SubscribeHeadChanges(cb) + mpp.sm.ChainStore().SubscribeHeadChanges( + store.WrapHeadChangeCoalescer( + cb, + HeadChangeCoalesceMinDelay, + HeadChangeCoalesceMaxDelay, + HeadChangeCoalesceMergeInterval, + )) return mpp.sm.ChainStore().GetHeaviestTipSet() } diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go new file mode 100644 index 00000000000..443359c8a06 --- /dev/null +++ b/chain/store/coalescer.go @@ -0,0 +1,214 @@ +package store + +import ( + "context" + "time" + + "github.com/filecoin-project/lotus/chain/types" +) + +// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer. +// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will +// wait for that long to coalesce more head changes. +// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change +// more than that. +// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was +// within the merge interval when the coalesce timer fires, then the coalesce time is extended +// by min delay and up to max delay total. +func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee { + c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval) + return c.HeadChange +} + +// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes +// with pending head changes to reduce state computations from head change notifications. +type HeadChangeCoalescer struct { + notify ReorgNotifee + + ctx context.Context + cancel func() + + eventq chan headChange + + revert []*types.TipSet + apply []*types.TipSet +} + +type headChange struct { + revert, apply []*types.TipSet +} + +// NewHeadChangeCoalescer creates a HeadChangeCoalescer. +func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer { + ctx, cancel := context.WithCancel(context.Background()) + c := &HeadChangeCoalescer{ + notify: fn, + ctx: ctx, + cancel: cancel, + eventq: make(chan headChange), + } + + go c.background(minDelay, maxDelay, mergeInterval) + + return c +} + +// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming +// head change and schedules dispatch of a coalesced head change in the background. +func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error { + select { + case c.eventq <- headChange{revert: revert, apply: apply}: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} + +// Close closes the coalescer and cancels the background dispatch goroutine. +// Any further notification will result in an error. +func (c *HeadChangeCoalescer) Close() error { + select { + case <-c.ctx.Done(): + default: + c.cancel() + } + + return nil +} + +// Implementation details + +func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) { + var timerC <-chan time.Time + var first, last time.Time + + for { + select { + case evt := <-c.eventq: + c.coalesce(evt.revert, evt.apply) + + now := time.Now() + last = now + if first.IsZero() { + first = now + } + + if timerC == nil { + timerC = time.After(minDelay) + } + + case now := <-timerC: + sinceFirst := now.Sub(first) + sinceLast := now.Sub(last) + + if sinceLast < mergeInterval && sinceFirst < maxDelay { + // coalesce some more + maxWait := maxDelay - sinceFirst + wait := minDelay + if maxWait < wait { + wait = maxWait + } + + timerC = time.After(wait) + } else { + // dispatch + c.dispatch() + + first = time.Time{} + last = time.Time{} + timerC = nil + } + + case <-c.ctx.Done(): + if c.revert != nil || c.apply != nil { + c.dispatch() + } + return + } + } +} + +func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) { + // newly reverted tipsets cancel out with pending applys. + // similarly, newly applied tipsets cancel out with pending reverts. + + // pending tipsets + pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert)) + for _, ts := range c.revert { + pendRevert[ts.Key()] = struct{}{} + } + + pendApply := make(map[types.TipSetKey]struct{}, len(c.apply)) + for _, ts := range c.apply { + pendApply[ts.Key()] = struct{}{} + } + + // incoming tipsets + reverting := make(map[types.TipSetKey]struct{}, len(revert)) + for _, ts := range revert { + reverting[ts.Key()] = struct{}{} + } + + applying := make(map[types.TipSetKey]struct{}, len(apply)) + for _, ts := range apply { + applying[ts.Key()] = struct{}{} + } + + // coalesced revert set + // - pending reverts are cancelled by incoming applys + // - incoming reverts are cancelled by pending applys + newRevert := make([]*types.TipSet, 0, len(c.revert)+len(revert)) + for _, ts := range c.revert { + _, cancel := applying[ts.Key()] + if cancel { + continue + } + + newRevert = append(newRevert, ts) + } + + for _, ts := range revert { + _, cancel := pendApply[ts.Key()] + if cancel { + continue + } + + newRevert = append(newRevert, ts) + } + + // coalesced apply set + // - pending applys are cancelled by incoming reverts + // - incoming applys are cancelled by pending reverts + newApply := make([]*types.TipSet, 0, len(c.apply)+len(apply)) + for _, ts := range c.apply { + _, cancel := reverting[ts.Key()] + if cancel { + continue + } + + newApply = append(newApply, ts) + } + + for _, ts := range apply { + _, cancel := pendRevert[ts.Key()] + if cancel { + continue + } + + newApply = append(newApply, ts) + } + + // commit the coalesced sets + c.revert = newRevert + c.apply = newApply +} + +func (c *HeadChangeCoalescer) dispatch() { + err := c.notify(c.revert, c.apply) + if err != nil { + log.Errorf("error dispatching coalesced head change notification: %s", err) + } + + c.revert = nil + c.apply = nil +} diff --git a/chain/store/coalescer_test.go b/chain/store/coalescer_test.go new file mode 100644 index 00000000000..d462851086e --- /dev/null +++ b/chain/store/coalescer_test.go @@ -0,0 +1,72 @@ +package store + +import ( + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" +) + +func TestHeadChangeCoalescer(t *testing.T) { + notif := make(chan headChange, 1) + c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error { + notif <- headChange{apply: apply, revert: revert} + return nil + }, + 100*time.Millisecond, + 200*time.Millisecond, + 10*time.Millisecond, + ) + defer c.Close() //nolint + + b0 := mock.MkBlock(nil, 0, 0) + root := mock.TipSet(b0) + bA := mock.MkBlock(root, 1, 1) + tA := mock.TipSet(bA) + bB := mock.MkBlock(root, 1, 2) + tB := mock.TipSet(bB) + tAB := mock.TipSet(bA, bB) + bC := mock.MkBlock(root, 1, 3) + tABC := mock.TipSet(bA, bB, bC) + bD := mock.MkBlock(root, 1, 4) + tABCD := mock.TipSet(bA, bB, bC, bD) + bE := mock.MkBlock(root, 1, 5) + tABCDE := mock.TipSet(bA, bB, bC, bD, bE) + + c.HeadChange(nil, []*types.TipSet{tA}) //nolint + c.HeadChange(nil, []*types.TipSet{tB}) //nolint + c.HeadChange([]*types.TipSet{tA, tB}, []*types.TipSet{tAB}) //nolint + c.HeadChange([]*types.TipSet{tAB}, []*types.TipSet{tABC}) //nolint + + change := <-notif + + if len(change.revert) != 0 { + t.Fatalf("expected empty revert set but got %d elements", len(change.revert)) + } + if len(change.apply) != 1 { + t.Fatalf("expected single element apply set but got %d elements", len(change.apply)) + } + if change.apply[0] != tABC { + t.Fatalf("expected to apply tABC") + } + + c.HeadChange([]*types.TipSet{tABC}, []*types.TipSet{tABCD}) //nolint + c.HeadChange([]*types.TipSet{tABCD}, []*types.TipSet{tABCDE}) //nolint + + change = <-notif + + if len(change.revert) != 1 { + t.Fatalf("expected single element revert set but got %d elements", len(change.revert)) + } + if change.revert[0] != tABC { + t.Fatalf("expected to revert tABC") + } + if len(change.apply) != 1 { + t.Fatalf("expected single element apply set but got %d elements", len(change.apply)) + } + if change.apply[0] != tABCDE { + t.Fatalf("expected to revert tABC") + } + +} diff --git a/node/test/builder.go b/node/test/builder.go index 46efb90742c..cf38792daaa 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/gen" genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" @@ -53,6 +54,9 @@ import ( func init() { chain.BootstrapPeerThreshold = 1 + messagepool.HeadChangeCoalesceMinDelay = time.Microsecond + messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond + messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond } func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {