diff --git a/api/server/server.go b/api/server/server.go index 4fde67a61..35c44150b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -571,14 +571,14 @@ func createDatastore(conf Config, longTimeout bool) (datastore.TxnDatastore, err func getMinerSelector(conf Config, rm *reputation.Module, ai *ask.Runner, cb lotus.ClientBuilder) (ffs.MinerSelector, error) { if conf.Devnet { - return reptop.New(rm, ai), nil + return reptop.New(cb, rm, ai), nil } var ms ffs.MinerSelector var err error switch conf.MinerSelector { case "reputation": - ms = reptop.New(rm, ai) + ms = reptop.New(cb, rm, ai) case "sr2": ms, err = sr2.New(conf.MinerSelectorParams, cb) if err != nil { diff --git a/cmd/powd/main.go b/cmd/powd/main.go index ac52354f4..fa7effa59 100644 --- a/cmd/powd/main.go +++ b/cmd/powd/main.go @@ -258,6 +258,7 @@ func setupLogging(repoPath string) error { // Miner Selectors "sr2-miner-selector", + "reptop", // FFS "ffs-scheduler", diff --git a/ffs/filcold/filcold.go b/ffs/filcold/filcold.go index e6fbfec79..652af4c09 100644 --- a/ffs/filcold/filcold.go +++ b/ffs/filcold/filcold.go @@ -17,6 +17,7 @@ import ( dealsModule "github.com/textileio/powergate/deals/module" "github.com/textileio/powergate/ffs" "github.com/textileio/powergate/lotus" + "github.com/textileio/powergate/util" ) const ( @@ -290,7 +291,7 @@ func (fc *FilCold) makeDeals(ctx context.Context, c cid.Cid, pieceSize abi.Padde } for _, cfg := range cfgs { - fc.l.Log(ctx, "Proposing deal to miner %s with %d attoFIL per epoch...", cfg.Miner, cfg.EpochPrice) + fc.l.Log(ctx, "Proposing deal to miner %s with %s FIL per epoch...", cfg.Miner, util.AttoFilToFil(cfg.EpochPrice)) } sres, err := fc.dm.Store(ctx, fcfg.Addr, c, pieceSize, pieceCid, cfgs, uint64(fcfg.DealMinDuration)) diff --git a/ffs/minerselector/reptop/reptop.go b/ffs/minerselector/reptop/reptop.go index 72d096e57..d49c27388 100644 --- a/ffs/minerselector/reptop/reptop.go +++ b/ffs/minerselector/reptop/reptop.go @@ -1,28 +1,41 @@ package reptop import ( + "context" "fmt" + "time" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/lotus/chain/types" + logger "github.com/ipfs/go-log/v2" "github.com/textileio/powergate/ffs" askRunner "github.com/textileio/powergate/index/ask/runner" + "github.com/textileio/powergate/lotus" "github.com/textileio/powergate/reputation" ) +var ( + log = logger.Logger("reptop") +) + // RepTop is a ffs.MinerSelector implementation that returns the top N // miners from a Reputations Module and an Ask Index. type RepTop struct { rm *reputation.Module ai *askRunner.Runner + cb lotus.ClientBuilder } var _ ffs.MinerSelector = (*RepTop)(nil) // New returns a new RetTop instance that uses the specified Reputation Module // to select miners and the AskIndex for their epoch prices. -func New(rm *reputation.Module, ai *askRunner.Runner) *RepTop { +func New(cb lotus.ClientBuilder, rm *reputation.Module, ai *askRunner.Runner) *RepTop { return &RepTop{ rm: rm, ai: ai, + cb: cb, } } @@ -32,30 +45,59 @@ func (rt *RepTop) GetMiners(n int, f ffs.MinerSelectorFilter) ([]ffs.MinerPropos if n < 1 { return nil, fmt.Errorf("the number of miners should be greater than zero") } - ms, err := rt.rm.QueryMiners(f.ExcludedMiners, f.CountryCodes, f.TrustedMiners) + + // We start directly targeting trusted miners without relying on reputation index. + // This is done to circumvent index building restrictions such as excluding miners + // with zero power. Trusted miners are trusted by definition, so if they have zero + // power, that's a risk that the client is accepting. + trustedMiners := rt.genTrustedMiners(f, n) + + // The remaining needed miners are gathered from the reputation index. + reputationMiners, err := rt.genFromReputation(f, n-len(trustedMiners)) + if err != nil { + return nil, fmt.Errorf("getting miners from reputation module: %s", err) + } + + // Return both lists. + return append(trustedMiners, reputationMiners...), nil +} + +func (rt *RepTop) genTrustedMiners(f ffs.MinerSelectorFilter, n int) []ffs.MinerProposal { + ret := make([]ffs.MinerProposal, 0, len(f.TrustedMiners)) + for _, m := range f.TrustedMiners { + mp, err := rt.getMinerProposal(f, m) + if err != nil { + log.Warnf("trusted miner query asking: %s", err) + continue + } + ret = append(ret, mp) + if len(ret) == n { + break + } + } + + return ret +} + +func (rt *RepTop) genFromReputation(f ffs.MinerSelectorFilter, n int) ([]ffs.MinerProposal, error) { + if n == 0 { + return nil, nil + } + ms, err := rt.rm.QueryMiners(f.TrustedMiners, f.CountryCodes, nil) if err != nil { return nil, fmt.Errorf("getting miners from reputation module: %s", err) } if len(ms) < n { - return nil, fmt.Errorf("not enough miners that satisfy the constraints") + return nil, fmt.Errorf("not enough miners from reputation module to satisfy the constraints") } - aidx := rt.ai.Get() + res := make([]ffs.MinerProposal, 0, n) for _, m := range ms { - sa, ok := aidx.Storage[m.Addr] - if !ok { - continue - } - if f.MaxPrice > 0 && sa.Price > f.MaxPrice { - continue + mp, err := rt.getMinerProposal(f, m.Addr) + if err != nil { + continue // Cached miner isn't replying to query-ask now, skip. } - if f.PieceSize < sa.MinPieceSize || f.PieceSize > sa.MaxPieceSize { - continue - } - res = append(res, ffs.MinerProposal{ - Addr: sa.Miner, - EpochPrice: sa.Price, - }) + res = append(res, mp) if len(res) == n { break } @@ -65,3 +107,55 @@ func (rt *RepTop) GetMiners(n int, f ffs.MinerSelectorFilter) ([]ffs.MinerPropos } return res, nil } + +func (rt *RepTop) getMinerProposal(f ffs.MinerSelectorFilter, addrStr string) (ffs.MinerProposal, error) { + c, cls, err := rt.cb(context.Background()) + if err != nil { + return ffs.MinerProposal{}, fmt.Errorf("creating lotus client: %s", err) + } + defer cls() + + addr, err := address.NewFromString(addrStr) + if err != nil { + return ffs.MinerProposal{}, fmt.Errorf("miner address is invalid: %s", err) + } + ctx, cls := context.WithTimeout(context.Background(), time.Second*10) + defer cls() + + mi, err := c.StateMinerInfo(ctx, addr, types.EmptyTSK) + if err != nil { + return ffs.MinerProposal{}, fmt.Errorf("getting miner %s info: %s", addr, err) + } + + type chAskRes struct { + Error string + Ask *storagemarket.StorageAsk + } + chAsk := make(chan chAskRes) + go func() { + sask, err := c.ClientQueryAsk(ctx, *mi.PeerId, addr) + if err != nil { + chAsk <- chAskRes{Error: err.Error()} + return + } + + chAsk <- chAskRes{Ask: sask} + }() + + select { + case <-time.After(time.Second * 10): + return ffs.MinerProposal{}, fmt.Errorf("query asking timed out") + case r := <-chAsk: + if r.Error != "" { + return ffs.MinerProposal{}, fmt.Errorf("query ask had controlled error: %s", r.Error) + } + if f.MaxPrice > 0 && r.Ask.Price.Uint64() > f.MaxPrice { + return ffs.MinerProposal{}, fmt.Errorf("miner doesn't satisfy price constraints %d>%d", r.Ask.Price, f.MaxPrice) + } + if f.PieceSize < uint64(r.Ask.MinPieceSize) || f.PieceSize > uint64(r.Ask.MaxPieceSize) { + return ffs.MinerProposal{}, fmt.Errorf("miner doesn't satisfy piece size constraints: %d<%d<%d", r.Ask.MinPieceSize, f.PieceSize, r.Ask.MaxPieceSize) + } + + return ffs.MinerProposal{Addr: addrStr, EpochPrice: r.Ask.Price.Uint64()}, nil + } +} diff --git a/util/util.go b/util/util.go index db40e0165..b0760e6ee 100644 --- a/util/util.go +++ b/util/util.go @@ -3,8 +3,11 @@ package util import ( "context" "fmt" + "math/big" + "strings" "time" + "github.com/filecoin-project/lotus/build" "github.com/ipfs/go-cid" ma "github.com/multiformats/go-multiaddr" dns "github.com/multiformats/go-multiaddr-dns" @@ -98,3 +101,13 @@ func CidFromString(c string) (cid.Cid, error) { } return cid.Decode(c) } + +// AttoFilToFil transforms an attoFIL integer value, to a +// pretty FIL string. +func AttoFilToFil(attoFil uint64) string { + r := new(big.Rat).SetFrac(big.NewInt(int64(attoFil)), big.NewInt(int64(build.FilecoinPrecision))) + if r.Sign() == 0 { + return "0" + } + return strings.TrimRight(strings.TrimRight(r.FloatString(18), "0"), ".") +}