From 4058f5642f92ee0bc8db2a4276be50bab504c3bc Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 2 Nov 2020 13:12:32 +0200 Subject: [PATCH 1/9] head change coalescer --- chain/store/coalescer.go | 129 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 chain/store/coalescer.go diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go new file mode 100644 index 0000000000..38d458ace6 --- /dev/null +++ b/chain/store/coalescer.go @@ -0,0 +1,129 @@ +package store + +import ( + "context" + "time" + + "github.com/filecoin-project/lotus/chain/types" +) + +func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee { + c := NewHeadChangeCoalescer(fn, delay) + return c.HeadChange +} + +type HeadChangeCoalescer struct { + notify ReorgNotifee + + ctx context.Context + cancel func() + + eventq chan headChange + + revert []*types.TipSet + apply []*types.TipSet +} + +type headChange struct { + revert, apply []*types.TipSet +} + +func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer { + ctx, cancel := context.WithCancel(context.Background()) + c := &HeadChangeCoalescer{ + notify: fn, + ctx: ctx, + cancel: cancel, + eventq: make(chan headChange), + } + + go c.background(delay) + + return c +} + +func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error { + select { + case c.eventq <- headChange{revert: revert, apply: apply}: + return nil + case <-c.ctx.Done(): + return c.ctx.Err() + } +} + +func (c *HeadChangeCoalescer) Close() { + select { + case <-c.ctx.Done(): + default: + c.cancel() + } +} + +func (c *HeadChangeCoalescer) background(delay time.Duration) { + var timerC <-chan time.Time + for { + select { + case evt := <-c.eventq: + c.coalesce(evt.revert, evt.apply) + if timerC == nil { + timerC = time.After(delay) + } + + case <-timerC: + c.dispatch() + timerC = nil + + case <-c.ctx.Done(): + return + } + } +} + +func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) { + // newly reverted tipsets cancel out pending applied tipsets + // we iterate through the revert set and if a tipset is pending for apply we cancel it. + + // pending tipsets for apply + applied := make(map[types.TipSetKey]struct{}) + for _, ts := range c.apply { + applied[ts.Key()] = struct{}{} + } + + // freshly reverted tipsets from the pending applied set + reverted := make(map[types.TipSetKey]struct{}) + + for _, ts := range revert { + key := ts.Key() + + _, ok := applied[key] + if ok { + reverted[key] = struct{}{} + continue + } + + c.revert = append(c.revert, ts) + } + + newApply := make([]*types.TipSet, 0, len(c.apply)-len(reverted)+len(apply)) + for _, ts := range c.apply { + _, ok := reverted[ts.Key()] + if ok { + continue + } + + newApply = append(newApply, ts) + } + + newApply = append(newApply, apply...) + c.apply = newApply +} + +func (c *HeadChangeCoalescer) dispatch() { + err := c.notify(c.revert, c.apply) + if err != nil { + log.Errorf("error dispatching coalesced head change notification: %s", err) + } + + c.revert = nil + c.apply = nil +} From ec13c5f80d94baae50c10b65fb1cbf6b0582b25e Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 13:51:18 +0200 Subject: [PATCH 2/9] coalesce both reverts and applys in the same manner --- chain/store/coalescer.go | 70 ++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go index 38d458ace6..e19ebd661e 100644 --- a/chain/store/coalescer.go +++ b/chain/store/coalescer.go @@ -80,41 +80,77 @@ func (c *HeadChangeCoalescer) background(delay time.Duration) { } func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) { - // newly reverted tipsets cancel out pending applied tipsets - // we iterate through the revert set and if a tipset is pending for apply we cancel it. + // newly reverted tipsets cancel out with pending applys. + // similarly, newly applied tipsets cancel out with pending reverts. - // pending tipsets for apply - applied := make(map[types.TipSetKey]struct{}) - for _, ts := range c.apply { - applied[ts.Key()] = struct{}{} + // pending tipsets + pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert)) + for _, ts := range c.revert { + pendRevert[ts.Key()] = struct{}{} } - // freshly reverted tipsets from the pending applied set - reverted := make(map[types.TipSetKey]struct{}) + pendApply := make(map[types.TipSetKey]struct{}, len(c.apply)) + for _, ts := range c.apply { + pendApply[ts.Key()] = struct{}{} + } + // incoming tipsets + reverting := make(map[types.TipSetKey]struct{}, len(revert)) for _, ts := range revert { - key := ts.Key() + reverting[ts.Key()] = struct{}{} + } + + applying := make(map[types.TipSetKey]struct{}, len(apply)) + for _, ts := range apply { + applying[ts.Key()] = struct{}{} + } + + // coalesced revert set + // - pending reverts are cancelled by incoming applys + // - incoming reverts are cancelled by pending applys + newRevert := make([]*types.TipSet, 0, len(c.revert)+len(revert)) + for _, ts := range c.revert { + _, cancel := applying[ts.Key()] + if cancel { + continue + } - _, ok := applied[key] - if ok { - reverted[key] = struct{}{} + newRevert = append(newRevert, ts) + } + + for _, ts := range revert { + _, cancel := pendApply[ts.Key()] + if cancel { continue } - c.revert = append(c.revert, ts) + newRevert = append(newRevert, ts) } - newApply := make([]*types.TipSet, 0, len(c.apply)-len(reverted)+len(apply)) + // coalesced apply set + // - pending applys are cancelled by incoming reverts + // - incoming applys are cancelled by pending reverts + newApply := make([]*types.TipSet, 0, len(c.apply)+len(apply)) for _, ts := range c.apply { - _, ok := reverted[ts.Key()] - if ok { + _, cancel := reverting[ts.Key()] + if cancel { + continue + } + + newApply = append(newApply, ts) + } + + for _, ts := range apply { + _, cancel := pendRevert[ts.Key()] + if cancel { continue } newApply = append(newApply, ts) } - newApply = append(newApply, apply...) + // commit the coalesced sets + c.revert = newRevert c.apply = newApply } From 9149ae21646c7c8613a1cb0c384d490d61c9f0fe Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 13:58:03 +0200 Subject: [PATCH 3/9] godoc and final dispatch when closing the coalescer --- chain/store/coalescer.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go index e19ebd661e..5e03e00d5a 100644 --- a/chain/store/coalescer.go +++ b/chain/store/coalescer.go @@ -7,11 +7,14 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer. func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee { c := NewHeadChangeCoalescer(fn, delay) return c.HeadChange } +// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes +// with pending head changes to reduce state computations from head change notifications. type HeadChangeCoalescer struct { notify ReorgNotifee @@ -28,6 +31,7 @@ type headChange struct { revert, apply []*types.TipSet } +// NewHeadChangeCoalescer creates a HeadChangeCoalescer. func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer { ctx, cancel := context.WithCancel(context.Background()) c := &HeadChangeCoalescer{ @@ -42,6 +46,8 @@ func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoa return c } +// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming +// head change and schedules dispatch of a coalesced head change in the background. func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error { select { case c.eventq <- headChange{revert: revert, apply: apply}: @@ -51,6 +57,8 @@ func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error { } } +// Close closes the coalescer and cancels the background dispatch goroutine. +// Any further notification will result in an error. func (c *HeadChangeCoalescer) Close() { select { case <-c.ctx.Done(): @@ -59,6 +67,8 @@ func (c *HeadChangeCoalescer) Close() { } } +// Implementation details + func (c *HeadChangeCoalescer) background(delay time.Duration) { var timerC <-chan time.Time for { @@ -74,6 +84,9 @@ func (c *HeadChangeCoalescer) background(delay time.Duration) { timerC = nil case <-c.ctx.Done(): + if c.revert != nil || c.apply != nil { + c.dispatch() + } return } } From 08aab1c48ac97d368a0f0bb1a58eeb3f62ec6bdc Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 13:58:44 +0200 Subject: [PATCH 4/9] make Close satisfy io.Closer --- chain/store/coalescer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go index 5e03e00d5a..8e30c468ce 100644 --- a/chain/store/coalescer.go +++ b/chain/store/coalescer.go @@ -59,12 +59,14 @@ func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error { // Close closes the coalescer and cancels the background dispatch goroutine. // Any further notification will result in an error. -func (c *HeadChangeCoalescer) Close() { +func (c *HeadChangeCoalescer) Close() error { select { case <-c.ctx.Done(): default: c.cancel() } + + return nil } // Implementation details From 84c9d1dbb73f8bba49bd5ad04355f692fb0e6749 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 17:53:00 +0200 Subject: [PATCH 5/9] hook head change coalescer to mpool --- chain/messagepool/provider.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 347e90044d..97a03e264d 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -2,6 +2,7 @@ package messagepool import ( "context" + "time" "github.com/ipfs/go-cid" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -9,9 +10,12 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) +var HeadChangeCoalesceDelay = time.Second + type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(m types.ChainMsg) (cid.Cid, error) @@ -34,7 +38,7 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { } func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { - mpp.sm.ChainStore().SubscribeHeadChanges(cb) + mpp.sm.ChainStore().SubscribeHeadChanges(store.WrapHeadChangeCoalescer(cb, HeadChangeCoalesceDelay)) return mpp.sm.ChainStore().GetHeaviestTipSet() } From 6660f8136326f1c8b39069a7668225aaa4c55f00 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 17:53:16 +0200 Subject: [PATCH 6/9] minimize coalesce delay for tests --- node/test/builder.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/node/test/builder.go b/node/test/builder.go index 46efb90742..727f46098f 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/gen" genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" @@ -53,6 +54,7 @@ import ( func init() { chain.BootstrapPeerThreshold = 1 + messagepool.HeadChangeCoalesceDelay = time.Microsecond } func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode { From 9a76c648f1225c6349b345ee096dd4278ad47e4e Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 18:20:25 +0200 Subject: [PATCH 7/9] unit test for head change coalescer --- chain/store/coalescer_test.go | 68 +++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 chain/store/coalescer_test.go diff --git a/chain/store/coalescer_test.go b/chain/store/coalescer_test.go new file mode 100644 index 0000000000..cb99690c54 --- /dev/null +++ b/chain/store/coalescer_test.go @@ -0,0 +1,68 @@ +package store + +import ( + "testing" + "time" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" +) + +func TestHeadChangeCoalescer(t *testing.T) { + notif := make(chan headChange, 1) + c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error { + notif <- headChange{apply: apply, revert: revert} + return nil + }, 100*time.Millisecond) + defer c.Close() + + b0 := mock.MkBlock(nil, 0, 0) + root := mock.TipSet(b0) + bA := mock.MkBlock(root, 1, 1) + tA := mock.TipSet(bA) + bB := mock.MkBlock(root, 1, 2) + tB := mock.TipSet(bB) + tAB := mock.TipSet(bA, bB) + bC := mock.MkBlock(root, 1, 3) + tABC := mock.TipSet(bA, bB, bC) + bD := mock.MkBlock(root, 1, 4) + tABCD := mock.TipSet(bA, bB, bC, bD) + bE := mock.MkBlock(root, 1, 5) + tABCDE := mock.TipSet(bA, bB, bC, bD, bE) + + c.HeadChange(nil, []*types.TipSet{tA}) + c.HeadChange(nil, []*types.TipSet{tB}) + c.HeadChange([]*types.TipSet{tA, tB}, []*types.TipSet{tAB}) + c.HeadChange([]*types.TipSet{tAB}, []*types.TipSet{tABC}) + + change := <-notif + + if len(change.revert) != 0 { + t.Fatalf("expected empty revert set but got %d elements", len(change.revert)) + } + if len(change.apply) != 1 { + t.Fatalf("expected single element apply set but got %d elements", len(change.apply)) + } + if change.apply[0] != tABC { + t.Fatalf("expected to apply tABC") + } + + c.HeadChange([]*types.TipSet{tABC}, []*types.TipSet{tABCD}) + c.HeadChange([]*types.TipSet{tABCD}, []*types.TipSet{tABCDE}) + + change = <-notif + + if len(change.revert) != 1 { + t.Fatalf("expected single element revert set but got %d elements", len(change.revert)) + } + if change.revert[0] != tABC { + t.Fatalf("expected to revert tABC") + } + if len(change.apply) != 1 { + t.Fatalf("expected single element apply set but got %d elements", len(change.apply)) + } + if change.apply[0] != tABCDE { + t.Fatalf("expected to revert tABC") + } + +} From 378d7a1ad93977a89dd55b2a5c2601e64e80d2d6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 4 Nov 2020 18:25:54 +0200 Subject: [PATCH 8/9] stupid linter --- chain/store/coalescer_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/chain/store/coalescer_test.go b/chain/store/coalescer_test.go index cb99690c54..a936019547 100644 --- a/chain/store/coalescer_test.go +++ b/chain/store/coalescer_test.go @@ -14,7 +14,7 @@ func TestHeadChangeCoalescer(t *testing.T) { notif <- headChange{apply: apply, revert: revert} return nil }, 100*time.Millisecond) - defer c.Close() + defer c.Close() //nolint b0 := mock.MkBlock(nil, 0, 0) root := mock.TipSet(b0) @@ -30,10 +30,10 @@ func TestHeadChangeCoalescer(t *testing.T) { bE := mock.MkBlock(root, 1, 5) tABCDE := mock.TipSet(bA, bB, bC, bD, bE) - c.HeadChange(nil, []*types.TipSet{tA}) - c.HeadChange(nil, []*types.TipSet{tB}) - c.HeadChange([]*types.TipSet{tA, tB}, []*types.TipSet{tAB}) - c.HeadChange([]*types.TipSet{tAB}, []*types.TipSet{tABC}) + c.HeadChange(nil, []*types.TipSet{tA}) //nolint + c.HeadChange(nil, []*types.TipSet{tB}) //nolint + c.HeadChange([]*types.TipSet{tA, tB}, []*types.TipSet{tAB}) //nolint + c.HeadChange([]*types.TipSet{tAB}, []*types.TipSet{tABC}) //nolint change := <-notif @@ -47,8 +47,8 @@ func TestHeadChangeCoalescer(t *testing.T) { t.Fatalf("expected to apply tABC") } - c.HeadChange([]*types.TipSet{tABC}, []*types.TipSet{tABCD}) - c.HeadChange([]*types.TipSet{tABCD}, []*types.TipSet{tABCDE}) + c.HeadChange([]*types.TipSet{tABC}, []*types.TipSet{tABCD}) //nolint + c.HeadChange([]*types.TipSet{tABCD}, []*types.TipSet{tABCDE}) //nolint change = <-notif From 185653f85afcb9d531d3c318281d9fcb95de3ec1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 9 Nov 2020 13:35:42 +0200 Subject: [PATCH 9/9] improve coalescing --- chain/messagepool/provider.go | 14 ++++++++-- chain/store/coalescer.go | 52 +++++++++++++++++++++++++++++------ chain/store/coalescer_test.go | 6 +++- node/test/builder.go | 4 ++- 4 files changed, 63 insertions(+), 13 deletions(-) diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 97a03e264d..5a6c751bce 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -14,7 +14,11 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -var HeadChangeCoalesceDelay = time.Second +var ( + HeadChangeCoalesceMinDelay = 2 * time.Second + HeadChangeCoalesceMaxDelay = 6 * time.Second + HeadChangeCoalesceMergeInterval = time.Second +) type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet @@ -38,7 +42,13 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { } func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { - mpp.sm.ChainStore().SubscribeHeadChanges(store.WrapHeadChangeCoalescer(cb, HeadChangeCoalesceDelay)) + mpp.sm.ChainStore().SubscribeHeadChanges( + store.WrapHeadChangeCoalescer( + cb, + HeadChangeCoalesceMinDelay, + HeadChangeCoalesceMaxDelay, + HeadChangeCoalesceMergeInterval, + )) return mpp.sm.ChainStore().GetHeaviestTipSet() } diff --git a/chain/store/coalescer.go b/chain/store/coalescer.go index 8e30c468ce..443359c8a0 100644 --- a/chain/store/coalescer.go +++ b/chain/store/coalescer.go @@ -8,8 +8,15 @@ import ( ) // WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer. -func WrapHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) ReorgNotifee { - c := NewHeadChangeCoalescer(fn, delay) +// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will +// wait for that long to coalesce more head changes. +// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change +// more than that. +// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was +// within the merge interval when the coalesce timer fires, then the coalesce time is extended +// by min delay and up to max delay total. +func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee { + c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval) return c.HeadChange } @@ -32,7 +39,7 @@ type headChange struct { } // NewHeadChangeCoalescer creates a HeadChangeCoalescer. -func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoalescer { +func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer { ctx, cancel := context.WithCancel(context.Background()) c := &HeadChangeCoalescer{ notify: fn, @@ -41,7 +48,7 @@ func NewHeadChangeCoalescer(fn ReorgNotifee, delay time.Duration) *HeadChangeCoa eventq: make(chan headChange), } - go c.background(delay) + go c.background(minDelay, maxDelay, mergeInterval) return c } @@ -71,19 +78,46 @@ func (c *HeadChangeCoalescer) Close() error { // Implementation details -func (c *HeadChangeCoalescer) background(delay time.Duration) { +func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) { var timerC <-chan time.Time + var first, last time.Time + for { select { case evt := <-c.eventq: c.coalesce(evt.revert, evt.apply) + + now := time.Now() + last = now + if first.IsZero() { + first = now + } + if timerC == nil { - timerC = time.After(delay) + timerC = time.After(minDelay) } - case <-timerC: - c.dispatch() - timerC = nil + case now := <-timerC: + sinceFirst := now.Sub(first) + sinceLast := now.Sub(last) + + if sinceLast < mergeInterval && sinceFirst < maxDelay { + // coalesce some more + maxWait := maxDelay - sinceFirst + wait := minDelay + if maxWait < wait { + wait = maxWait + } + + timerC = time.After(wait) + } else { + // dispatch + c.dispatch() + + first = time.Time{} + last = time.Time{} + timerC = nil + } case <-c.ctx.Done(): if c.revert != nil || c.apply != nil { diff --git a/chain/store/coalescer_test.go b/chain/store/coalescer_test.go index a936019547..d462851086 100644 --- a/chain/store/coalescer_test.go +++ b/chain/store/coalescer_test.go @@ -13,7 +13,11 @@ func TestHeadChangeCoalescer(t *testing.T) { c := NewHeadChangeCoalescer(func(revert, apply []*types.TipSet) error { notif <- headChange{apply: apply, revert: revert} return nil - }, 100*time.Millisecond) + }, + 100*time.Millisecond, + 200*time.Millisecond, + 10*time.Millisecond, + ) defer c.Close() //nolint b0 := mock.MkBlock(nil, 0, 0) diff --git a/node/test/builder.go b/node/test/builder.go index 727f46098f..cf38792daa 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -54,7 +54,9 @@ import ( func init() { chain.BootstrapPeerThreshold = 1 - messagepool.HeadChangeCoalesceDelay = time.Microsecond + messagepool.HeadChangeCoalesceMinDelay = time.Microsecond + messagepool.HeadChangeCoalesceMaxDelay = 2 * time.Microsecond + messagepool.HeadChangeCoalesceMergeInterval = 100 * time.Nanosecond } func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Address, act address.Address, pk crypto.PrivKey, tnd test.TestNode, mn mocknet.Mocknet, opts node.Option) test.TestStorageNode {