Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

head change coalescer #4688

Merged
merged 9 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion chain/messagepool/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@ package messagepool

import (
"context"
"time"

"github.com/ipfs/go-cid"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)

var (
HeadChangeCoalesceMinDelay = 2 * time.Second
HeadChangeCoalesceMaxDelay = 6 * time.Second
HeadChangeCoalesceMergeInterval = time.Second
)

type Provider interface {
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
PutMessage(m types.ChainMsg) (cid.Cid, error)
Expand All @@ -34,7 +42,13 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
}

func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
mpp.sm.ChainStore().SubscribeHeadChanges(
store.WrapHeadChangeCoalescer(
cb,
HeadChangeCoalesceMinDelay,
HeadChangeCoalesceMaxDelay,
HeadChangeCoalesceMergeInterval,
))
return mpp.sm.ChainStore().GetHeaviestTipSet()
}

Expand Down
214 changes: 214 additions & 0 deletions chain/store/coalescer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package store

import (
"context"
"time"

"github.com/filecoin-project/lotus/chain/types"
)

// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
// wait for that long to coalesce more head changes.
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
// more than that.
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
// by min delay and up to max delay total.
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval)
return c.HeadChange
}

// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes
// with pending head changes to reduce state computations from head change notifications.
type HeadChangeCoalescer struct {
notify ReorgNotifee

ctx context.Context
cancel func()

eventq chan headChange

revert []*types.TipSet
apply []*types.TipSet
}

type headChange struct {
revert, apply []*types.TipSet
}

// NewHeadChangeCoalescer creates a HeadChangeCoalescer.
func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer {
ctx, cancel := context.WithCancel(context.Background())
c := &HeadChangeCoalescer{
notify: fn,
ctx: ctx,
cancel: cancel,
eventq: make(chan headChange),
}

go c.background(minDelay, maxDelay, mergeInterval)

return c
}

// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming
// head change and schedules dispatch of a coalesced head change in the background.
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
select {
case c.eventq <- headChange{revert: revert, apply: apply}:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}

// Close closes the coalescer and cancels the background dispatch goroutine.
// Any further notification will result in an error.
func (c *HeadChangeCoalescer) Close() error {
select {
case <-c.ctx.Done():
default:
c.cancel()
}

return nil
}

// Implementation details

func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) {
var timerC <-chan time.Time
var first, last time.Time

for {
select {
case evt := <-c.eventq:
c.coalesce(evt.revert, evt.apply)

now := time.Now()
last = now
if first.IsZero() {
first = now
}

if timerC == nil {
timerC = time.After(minDelay)
}

case now := <-timerC:
sinceFirst := now.Sub(first)
sinceLast := now.Sub(last)

if sinceLast < mergeInterval && sinceFirst < maxDelay {
// coalesce some more
maxWait := maxDelay - sinceFirst
wait := minDelay
if maxWait < wait {
wait = maxWait
}

timerC = time.After(wait)
} else {
// dispatch
c.dispatch()

first = time.Time{}
last = time.Time{}
timerC = nil
}

case <-c.ctx.Done():
if c.revert != nil || c.apply != nil {
c.dispatch()
}
return
}
}
}

func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) {
// newly reverted tipsets cancel out with pending applys.
// similarly, newly applied tipsets cancel out with pending reverts.

// pending tipsets
pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert))
for _, ts := range c.revert {
pendRevert[ts.Key()] = struct{}{}
}

pendApply := make(map[types.TipSetKey]struct{}, len(c.apply))
for _, ts := range c.apply {
pendApply[ts.Key()] = struct{}{}
}

// incoming tipsets
reverting := make(map[types.TipSetKey]struct{}, len(revert))
for _, ts := range revert {
reverting[ts.Key()] = struct{}{}
}

applying := make(map[types.TipSetKey]struct{}, len(apply))
for _, ts := range apply {
applying[ts.Key()] = struct{}{}
}

// coalesced revert set
// - pending reverts are cancelled by incoming applys
// - incoming reverts are cancelled by pending applys
newRevert := make([]*types.TipSet, 0, len(c.revert)+len(revert))
for _, ts := range c.revert {
_, cancel := applying[ts.Key()]
if cancel {
continue
}

newRevert = append(newRevert, ts)
Comment on lines +163 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can these be changed to

if !cancel {
    newRevert = append(newRevert, ts)
}

}

for _, ts := range revert {
_, cancel := pendApply[ts.Key()]
if cancel {
continue
}

newRevert = append(newRevert, ts)
}

// coalesced apply set
// - pending applys are cancelled by incoming reverts
// - incoming applys are cancelled by pending reverts
newApply := make([]*types.TipSet, 0, len(c.apply)+len(apply))
for _, ts := range c.apply {
_, cancel := reverting[ts.Key()]
if cancel {
continue
}

newApply = append(newApply, ts)
}

for _, ts := range apply {
_, cancel := pendRevert[ts.Key()]
if cancel {
continue
}

newApply = append(newApply, ts)
}

// commit the coalesced sets
c.revert = newRevert
c.apply = newApply
}

func (c *HeadChangeCoalescer) dispatch() {
err := c.notify(c.revert, c.apply)
if err != nil {
log.Errorf("error dispatching coalesced head change notification: %s", err)
}

c.revert = nil
c.apply = nil
}
72 changes: 72 additions & 0 deletions chain/store/coalescer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package store

import (
"testing"
"time"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
)

func TestHeadChangeCoalescer(t *testing.T) {
notif := make(chan headChange, 1)
c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error {
notif <- headChange{apply: apply, revert: revert}
return nil
},
100*time.Millisecond,
200*time.Millisecond,
10*time.Millisecond,
)
defer c.Close() //nolint

b0 := mock.MkBlock(nil, 0, 0)
root := mock.TipSet(b0)
bA := mock.MkBlock(root, 1, 1)
tA := mock.TipSet(bA)
bB := mock.MkBlock(root, 1, 2)
tB := mock.TipSet(bB)
tAB := mock.TipSet(bA, bB)
bC := mock.MkBlock(root, 1, 3)
tABC := mock.TipSet(bA, bB, bC)
bD := mock.MkBlock(root, 1, 4)
tABCD := mock.TipSet(bA, bB, bC, bD)
bE := mock.MkBlock(root, 1, 5)
tABCDE := mock.TipSet(bA, bB, bC, bD, bE)

c.HeadChange(nil, []*types.TipSet{tA}) //nolint
c.HeadChange(nil, []*types.TipSet{tB}) //nolint
c.HeadChange([]*types.TipSet{tA, tB}, []*types.TipSet{tAB}) //nolint
c.HeadChange([]*types.TipSet{tAB}, []*types.TipSet{tABC}) //nolint

change := <-notif

if len(change.revert) != 0 {
t.Fatalf("expected empty revert set but got %d elements", len(change.revert))
}
if len(change.apply) != 1 {
t.Fatalf("expected single element apply set but got %d elements", len(change.apply))
}
if change.apply[0] != tABC {
t.Fatalf("expected to apply tABC")
}

c.HeadChange([]*types.TipSet{tABC}, []*types.TipSet{tABCD}) //nolint
c.HeadChange([]*types.TipSet{tABCD}, []*types.TipSet{tABCDE}) //nolint

change = <-notif

if len(change.revert) != 1 {
t.Fatalf("expected single element revert set but got %d elements", len(change.revert))
}
if change.revert[0] != tABC {
t.Fatalf("expected to revert tABC")
}
if len(change.apply) != 1 {
t.Fatalf("expected single element apply set but got %d elements", len(change.apply))
}
if change.apply[0] != tABCDE {
t.Fatalf("expected to revert tABC")
}

}
4 changes: 4 additions & 0 deletions node/test/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/gen"
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
Expand All @@ -53,6 +54,9 @@ import (

func init() {
chain.BootstrapPeerThreshold = 1
messagepool.HeadChangeCoalesceMinDelay = time.Microsecond
messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond
messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond
}

func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {
Expand Down