Skip to content

Commit

Permalink
Merge pull request #6612 from filecoin-project/feat/miner-par-transfe…
Browse files Browse the repository at this point in the history
…rs-cfg

Miner SimultaneousTransfers config
  • Loading branch information
magik6k authored Jun 28, 2021
2 parents cefd140 + e9dd3e8 commit 49a709a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 13 deletions.
78 changes: 78 additions & 0 deletions itests/deals_concurrent_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package itests

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)

func TestDealCyclesConcurrent(t *testing.T) {
Expand Down Expand Up @@ -47,3 +57,71 @@ func TestDealCyclesConcurrent(t *testing.T) {
t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) })
}
}

func TestSimultenousTransferLimit(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}

kit.QuietMiningLogs()

blockTime := 10 * time.Millisecond

// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
startEpoch := abi.ChainEpoch(2 << 12)

runTest := func(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(2))),
))
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner)

ctx, cancel := context.WithCancel(context.Background())

du, err := miner.MarketDataTransferUpdates(ctx)
require.NoError(t, err)

var maxOngoing int
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

ongoing := map[datatransfer.TransferID]struct{}{}

for {
select {
case u := <-du:
t.Logf("%d - %s", u.TransferID, datatransfer.Statuses[u.Status])
if u.Status == datatransfer.Ongoing {
ongoing[u.TransferID] = struct{}{}
} else {
delete(ongoing, u.TransferID)
}

if len(ongoing) > maxOngoing {
maxOngoing = len(ongoing)
}
case <-ctx.Done():
return
}
}
}()

dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{
N: 1, // TODO: set to 20 after https://github.com/ipfs/go-graphsync/issues/175 is fixed
FastRetrieval: true,
StartEpoch: startEpoch,
})

cancel()
wg.Wait()

require.LessOrEqual(t, maxOngoing, 2)
}

runTest(t)
}
14 changes: 8 additions & 6 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ var LibP2P = Options(
Override(ConnGaterKey, lp2p.ConnGaterOption),
)

func isType(t repo.RepoType) func(s *Settings) bool {
func IsType(t repo.RepoType) func(s *Settings) bool {
return func(s *Settings) bool { return s.nodeType == t }
}

Expand Down Expand Up @@ -299,7 +299,7 @@ var ChainNode = Options(
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),

// Shared graphsync (markets, serving chain)
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)),
Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultSimultaneousTransfers)),

// Service: Wallet
Override(new(*messagesigner.MessageSigner), messagesigner.NewMessageSigner),
Expand Down Expand Up @@ -403,7 +403,7 @@ var MinerNode = Options(
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(config.DefaultSimultaneousTransfers)),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),

Expand Down Expand Up @@ -468,7 +468,7 @@ func Online() Option {
LibP2P,

ApplyIf(isFullOrLiteNode, ChainNode),
ApplyIf(isType(repo.StorageMiner), MinerNode),
ApplyIf(IsType(repo.StorageMiner), MinerNode),
)
}

Expand Down Expand Up @@ -606,6 +606,8 @@ func ConfigStorageMiner(c interface{}) Option {
})),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),

Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(cfg.Dealmaking.SimultaneousTransfers)),

Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
Expand Down Expand Up @@ -678,8 +680,8 @@ func Repo(r repo.Repo) Option {

Override(new(*dtypes.APIAlg), modules.APISecret),

ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
ApplyIf(IsType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(IsType(repo.StorageMiner), ConfigStorageMiner(c)),
)(settings)
}
}
Expand Down
5 changes: 5 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type DealmakingConfig struct {
// as a multiplier of the minimum collateral bound
MaxProviderCollateralMultiplier uint64

// The maximum number of parallel online data transfers (storage+retrieval)
SimultaneousTransfers uint64

Filter string
RetrievalFilter string

Expand Down Expand Up @@ -362,6 +365,8 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8,
MaxProviderCollateralMultiplier: 2,

SimultaneousTransfers: DefaultSimultaneousTransfers,

RetrievalPricing: &RetrievalPricing{
Strategy: RetrievalPricingDefaultMode,
Default: &RetrievalPricingDefault{
Expand Down
16 changes: 9 additions & 7 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,15 @@ func StagingDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBloc

// StagingGraphsync creates a graphsync instance which reads and writes blocks
// to the StagingBlockstore
func StagingGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(ibs)
storer := storeutil.StorerForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault())

return gs
func StagingGraphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.StagingBlockstore, h host.Host) dtypes.StagingGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(ibs)
storer := storeutil.StorerForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsync.RejectAllRequestsByDefault(), graphsync.MaxInProgressRequests(parallelTransfers))

return gs
}
}

func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api v1api.FullNode, epp gen.WinningPoStProver, sf *slashfilter.SlashFilter, j journal.Journal) (*lotusminer.Miner, error) {
Expand Down

0 comments on commit 49a709a

Please sign in to comment.