diff --git a/api/client/utils_test.go b/api/client/utils_test.go index d3a502e9d..0f542ffd8 100644 --- a/api/client/utils_test.go +++ b/api/client/utils_test.go @@ -36,31 +36,32 @@ func defaultServerConfig(t *testing.T) server.Config { grpcMaddr := util.MustParseAddr(grpcHostAddress) conf := server.Config{ - WalletInitialFunds: *big.NewInt(int64(4000000000)), - IpfsAPIAddr: ipfsAddr, - LotusAddress: devnetAddr, - LotusAuthToken: "", - LotusMasterAddr: "", - LotusConnectionRetries: 5, - Devnet: true, - GrpcHostNetwork: grpcHostNetwork, - GrpcHostAddress: grpcMaddr, - GrpcWebProxyAddress: grpcWebProxyAddress, - RepoPath: repoPath, - GatewayHostAddr: gatewayHostAddr, - IndexRawJSONHostAddr: indexRawJSONHostAddr, - MaxMindDBFolder: "../../iplocation/maxmind", - MinerSelector: "reputation", - FFSDealFinalityTimeout: time.Minute * 30, - FFSMaxParallelDealPreparing: 1, - FFSGCAutomaticGCInterval: 0, - DealWatchPollDuration: time.Second * 15, - SchedMaxParallel: 10, - AskIndexQueryAskTimeout: time.Second * 3, - AskIndexRefreshInterval: time.Second * 3, - AskIndexRefreshOnStart: true, - AskindexMaxParallel: 2, - IndexMinersRefreshOnStart: false, + WalletInitialFunds: *big.NewInt(int64(4000000000)), + IpfsAPIAddr: ipfsAddr, + LotusAddress: devnetAddr, + LotusAuthToken: "", + LotusMasterAddr: "", + LotusConnectionRetries: 5, + Devnet: true, + GrpcHostNetwork: grpcHostNetwork, + GrpcHostAddress: grpcMaddr, + GrpcWebProxyAddress: grpcWebProxyAddress, + RepoPath: repoPath, + GatewayHostAddr: gatewayHostAddr, + IndexRawJSONHostAddr: indexRawJSONHostAddr, + MaxMindDBFolder: "../../iplocation/maxmind", + MinerSelector: "reputation", + FFSDealFinalityTimeout: time.Minute * 30, + FFSMaxParallelDealPreparing: 1, + FFSGCAutomaticGCInterval: 0, + FFSRetrievalNextEventTimeout: time.Hour, + DealWatchPollDuration: time.Second * 15, + SchedMaxParallel: 10, + AskIndexQueryAskTimeout: time.Second * 3, + AskIndexRefreshInterval: time.Second * 3, + AskIndexRefreshOnStart: true, + AskindexMaxParallel: 2, + IndexMinersRefreshOnStart: false, } return conf } diff --git a/api/server/server.go b/api/server/server.go index a9e0cb078..54bc63f6b 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -126,19 +126,20 @@ type Config struct { MongoURI string MongoDB string - FFSAdminToken string - FFSUseMasterAddr bool - FFSDealFinalityTimeout time.Duration - FFSMinimumPieceSize uint64 - FFSMaxParallelDealPreparing int - FFSGCAutomaticGCInterval time.Duration - FFSGCStageGracePeriod time.Duration - SchedMaxParallel int - MinerSelector string - MinerSelectorParams string - DealWatchPollDuration time.Duration - AutocreateMasterAddr bool - WalletInitialFunds big.Int + FFSAdminToken string + FFSUseMasterAddr bool + FFSDealFinalityTimeout time.Duration + FFSMinimumPieceSize uint64 + FFSRetrievalNextEventTimeout time.Duration + FFSMaxParallelDealPreparing int + FFSGCAutomaticGCInterval time.Duration + FFSGCStageGracePeriod time.Duration + SchedMaxParallel int + MinerSelector string + MinerSelectorParams string + DealWatchPollDuration time.Duration + AutocreateMasterAddr bool + WalletInitialFunds big.Int AskIndexQueryAskTimeout time.Duration AskindexMaxParallel int @@ -261,7 +262,7 @@ func NewServer(conf Config) (*Server, error) { if conf.Devnet { conf.FFSMinimumPieceSize = 0 } - cs := filcold.New(ms, dm, wm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing) + cs := filcold.New(ms, dm, wm, ipfs, chain, l, lsm, conf.FFSMinimumPieceSize, conf.FFSMaxParallelDealPreparing, conf.FFSRetrievalNextEventTimeout) hs, err := coreipfs.New(txndstr.Wrap(ds, "ffs/coreipfs"), ipfs, l) if err != nil { return nil, fmt.Errorf("creating coreipfs: %s", err) diff --git a/cmd/powd/main.go b/cmd/powd/main.go index cb7fdb854..e475a1637 100644 --- a/cmd/powd/main.go +++ b/cmd/powd/main.go @@ -130,6 +130,7 @@ func configFromFlags() (server.Config, error) { ffsSchedMaxParallel := config.GetInt("ffsschedmaxparallel") ffsDealWatchFinalityTimeout := time.Minute * time.Duration(config.GetInt("ffsdealfinalitytimeout")) ffsMinimumPieceSize := config.GetUint64("ffsminimumpiecesize") + ffsRetrievalNextEventTimeout := config.GetDuration("ffsretrievalnexteventtimeout") ffsMaxParallelDealPreparing := config.GetInt("ffsmaxparalleldealpreparing") ffsGCInterval := time.Minute * time.Duration(config.GetInt("ffsgcinterval")) ffsGCStagedGracePeriod := time.Minute * time.Duration(config.GetInt("ffsgcstagedgraceperiod")) @@ -166,18 +167,19 @@ func configFromFlags() (server.Config, error) { MongoURI: mongoURI, MongoDB: mongoDB, - FFSAdminToken: ffsAdminToken, - FFSUseMasterAddr: ffsUseMasterAddr, - FFSDealFinalityTimeout: ffsDealWatchFinalityTimeout, - FFSMinimumPieceSize: ffsMinimumPieceSize, - FFSMaxParallelDealPreparing: ffsMaxParallelDealPreparing, - FFSGCAutomaticGCInterval: ffsGCInterval, - FFSGCStageGracePeriod: ffsGCStagedGracePeriod, - AutocreateMasterAddr: autocreateMasterAddr, - MinerSelector: minerSelector, - MinerSelectorParams: minerSelectorParams, - SchedMaxParallel: ffsSchedMaxParallel, - DealWatchPollDuration: dealWatchPollDuration, + FFSAdminToken: ffsAdminToken, + FFSUseMasterAddr: ffsUseMasterAddr, + FFSDealFinalityTimeout: ffsDealWatchFinalityTimeout, + FFSMinimumPieceSize: ffsMinimumPieceSize, + FFSRetrievalNextEventTimeout: ffsRetrievalNextEventTimeout, + FFSMaxParallelDealPreparing: ffsMaxParallelDealPreparing, + FFSGCAutomaticGCInterval: ffsGCInterval, + FFSGCStageGracePeriod: ffsGCStagedGracePeriod, + AutocreateMasterAddr: autocreateMasterAddr, + MinerSelector: minerSelector, + MinerSelectorParams: minerSelectorParams, + SchedMaxParallel: ffsSchedMaxParallel, + DealWatchPollDuration: dealWatchPollDuration, AskIndexQueryAskTimeout: askIndexQueryAskTimeout, AskIndexRefreshInterval: askIndexRefreshInterval, @@ -387,6 +389,7 @@ func setupFlags() error { pflag.String("ffsminerselector", "reputation", "Miner selector to be used by FFS: 'sr2', 'reputation'.") pflag.String("ffsminerselectorparams", "", "Miner selector configuration parameter, depends on --ffsminerselector.") pflag.String("ffsminimumpiecesize", "67108864", "Minimum piece size in bytes allowed to be stored in Filecoin.") + pflag.Duration("ffsretrievalnexteventtimeout", time.Hour, "Maximum amount of time to wait for the next retrieval event before erroring it.") pflag.String("ffsschedmaxparallel", "1000", "Maximum amount of Jobs executed in parallel.") pflag.String("ffsdealfinalitytimeout", "4320", "Deadline in minutes in which a deal must prove liveness changing status before considered abandoned.") pflag.String("ffsmaxparalleldealpreparing", "2", "Max parallel deal preparing tasks.") diff --git a/deals/module/records.go b/deals/module/records.go index 9ac558e72..06891eea8 100644 --- a/deals/module/records.go +++ b/deals/module/records.go @@ -411,7 +411,7 @@ func (m *Module) recordRetrieval(addr string, offer api.QueryOffer, bytesReceive RootCid: offer.Root, Size: offer.Size, MinPrice: offer.MinPrice.Uint64(), - Miner: offer.Miner.String(), + Miner: offer.MinerPeer.Address.String(), MinerPeerID: offer.MinerPeer.ID.String(), PaymentInterval: offer.PaymentInterval, PaymentIntervalIncrease: offer.PaymentIntervalIncrease, diff --git a/deals/module/retrieve.go b/deals/module/retrieve.go index 47e368c95..fd5842ae9 100644 --- a/deals/module/retrieve.go +++ b/deals/module/retrieve.go @@ -123,7 +123,7 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l break Loop } if e.Err != "" { - log.Infof("in progress retrieval errored: %s", err) + log.Infof("in progress retrieval errored: %s", e.Err) errMsg = e.Err } if dtStart.IsZero() && e.Event == retrievalmarket.ClientEventBlocksReceived { @@ -146,7 +146,7 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l // payment channel creation. This isn't ideal, but // it's better than missing the data. // We WARN just to signal this might be happening. - if dtStart.IsZero() { + if dtStart.IsZero() && errMsg == "" { dtStart = retrievalStartTime log.Warnf("retrieval data-transfer start fallback to retrieval start") } @@ -154,7 +154,7 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l // event in the retrieval. We just fallback to Now(), // which should always be pretty close to the real // event. We WARN just to signal this is happening. - if dtEnd.IsZero() { + if dtEnd.IsZero() && errMsg == "" { dtEnd = time.Now() log.Warnf("retrieval data-transfer end fallback to retrieval end") } @@ -162,7 +162,7 @@ func (m *Module) retrieve(ctx context.Context, lapi *apistruct.FullNodeStruct, l } }() - return o.Miner.String(), out, nil + return o.MinerPeer.Address.String(), out, nil } func getRetrievalOffers(ctx context.Context, lapi *apistruct.FullNodeStruct, payloadCid cid.Cid, pieceCid *cid.Cid, miners []string) []api.QueryOffer { diff --git a/ffs/coreipfs/coreipfs.go b/ffs/coreipfs/coreipfs.go index 12e9f7b61..1bb02dc49 100644 --- a/ffs/coreipfs/coreipfs.go +++ b/ffs/coreipfs/coreipfs.go @@ -57,13 +57,12 @@ func New(ds datastore.TxnDatastore, ipfs iface.CoreAPI, l ffs.JobLogger) (*CoreI // Stage adds the data of io.Reader in the storage, and creates a stage-pin on the resulting cid. func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid.Cid, error) { - ci.lock.Lock() - defer ci.lock.Unlock() - p, err := ci.ipfs.Unixfs().Add(ctx, ipfsfiles.NewReaderFile(r), options.Unixfs.Pin(true)) if err != nil { return cid.Undef, fmt.Errorf("adding data to ipfs: %s", err) } + ci.lock.Lock() + defer ci.lock.Unlock() if err := ci.ps.AddStaged(iid, p.Cid()); err != nil { return cid.Undef, fmt.Errorf("saving new pin in pinstore: %s", err) @@ -72,8 +71,11 @@ func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid. return p.Cid(), nil } -// StageCid stage-pin a Cid. +// StageCid pull the Cid data and stage-pin it. func (ci *CoreIpfs) StageCid(ctx context.Context, iid ffs.APIID, c cid.Cid) error { + if err := ci.ipfs.Pin().Add(ctx, path.IpfsPath(c), options.Pin.Recursive(true)); err != nil { + return fmt.Errorf("adding data to ipfs: %s", err) + } ci.lock.Lock() defer ci.lock.Unlock() @@ -263,7 +265,7 @@ Loop: // Skip Cids that are excluded. if _, ok := excludeMap[stagedPin.Cid]; ok { - log.Infof("skipping staged cid %s since it's in exclusion list", stagedPin) + log.Infof("skipping staged cid %s since it's in exclusion list", stagedPin.Cid) continue Loop } // A Cid is only safe to GC if all existing stage-pin are older than diff --git a/ffs/filcold/filcold.go b/ffs/filcold/filcold.go index 6960e45ef..dd58b30ff 100644 --- a/ffs/filcold/filcold.go +++ b/ffs/filcold/filcold.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/ipfs/go-cid" logger "github.com/ipfs/go-log/v2" iface "github.com/ipfs/interface-go-ipfs-core" @@ -35,15 +36,16 @@ var ( // FilCold is a ColdStorage implementation which saves data in the Filecoin network. // It assumes the underlying Filecoin client has access to an IPFS node where data is stored. type FilCold struct { - ms ffs.MinerSelector - dm *dealsModule.Module - wm wallet.Module - ipfs iface.CoreAPI - chain FilChain - l ffs.JobLogger - lsm *lotus.SyncMonitor - minPieceSize uint64 - semaphDealPrep chan struct{} + ms ffs.MinerSelector + dm *dealsModule.Module + wm wallet.Module + ipfs iface.CoreAPI + chain FilChain + l ffs.JobLogger + lsm *lotus.SyncMonitor + minPieceSize uint64 + retrNextEventTimeout time.Duration + semaphDealPrep chan struct{} } var _ ffs.ColdStorage = (*FilCold)(nil) @@ -54,17 +56,18 @@ type FilChain interface { } // New returns a new FilCold instance. -func New(ms ffs.MinerSelector, dm *dealsModule.Module, wm wallet.Module, ipfs iface.CoreAPI, chain FilChain, l ffs.JobLogger, lsm *lotus.SyncMonitor, minPieceSize uint64, maxParallelDealPreparing int) *FilCold { +func New(ms ffs.MinerSelector, dm *dealsModule.Module, wm wallet.Module, ipfs iface.CoreAPI, chain FilChain, l ffs.JobLogger, lsm *lotus.SyncMonitor, minPieceSize uint64, maxParallelDealPreparing int, retrievalNextEventTimeout time.Duration) *FilCold { return &FilCold{ - ms: ms, - dm: dm, - wm: wm, - ipfs: ipfs, - chain: chain, - l: l, - lsm: lsm, - minPieceSize: minPieceSize, - semaphDealPrep: make(chan struct{}, maxParallelDealPreparing), + ms: ms, + dm: dm, + wm: wm, + ipfs: ipfs, + chain: chain, + l: l, + lsm: lsm, + minPieceSize: minPieceSize, + retrNextEventTimeout: retrievalNextEventTimeout, + semaphDealPrep: make(chan struct{}, maxParallelDealPreparing), } } @@ -76,21 +79,39 @@ func (fc *FilCold) Fetch(ctx context.Context, pyCid cid.Cid, piCid *cid.Cid, wad return ffs.FetchInfo{}, fmt.Errorf("fetching from deal module: %s", err) } fc.l.Log(ctx, "Fetching from %s...", miner) - var fundsSpent uint64 - var lastMsg string - for e := range events { - if e.Err != "" { - return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err) - } - strEvent := retrievalmarket.ClientEvents[e.Event] - strDealStatus := retrievalmarket.DealStatuses[e.Status] - fundsSpent = e.FundsSpent.Uint64() - newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus) - if newMsg != lastMsg { - fc.l.Log(ctx, newMsg) - lastMsg = newMsg + + var ( + fundsSpent uint64 + lastMsg string + lastEvent marketevents.RetrievalEvent + ) +Loop: + for { + select { + case <-time.After(fc.retrNextEventTimeout): + return ffs.FetchInfo{}, fmt.Errorf("didn't receive events for %d minutes", int64(fc.retrNextEventTimeout.Minutes())) + case e, ok := <-events: + if !ok { + break Loop + } + if e.Err != "" { + return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err) + } + strEvent := retrievalmarket.ClientEvents[e.Event] + strDealStatus := retrievalmarket.DealStatuses[e.Status] + fundsSpent = e.FundsSpent.Uint64() + newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus) + if newMsg != lastMsg { + fc.l.Log(ctx, newMsg) + lastMsg = newMsg + } + lastEvent = e } } + if lastEvent.Status != retrievalmarket.DealStatusCompleted { + return ffs.FetchInfo{}, fmt.Errorf("retrieval failed with status %s and message %s", retrievalmarket.DealStatuses[lastEvent.Status], lastMsg) + } + return ffs.FetchInfo{RetrievedMiner: miner, FundsSpent: fundsSpent}, nil } diff --git a/ffs/integrationtest/manager/manager.go b/ffs/integrationtest/manager/manager.go index 62be1f0e0..ce9c70b88 100644 --- a/ffs/integrationtest/manager/manager.go +++ b/ffs/integrationtest/manager/manager.go @@ -86,7 +86,7 @@ func NewCustomFFSManager(t require.TestingT, ds datastore.TxnDatastore, cb lotus l := joblogger.New(txndstr.Wrap(ds, "ffs/joblogger")) lsm, err := lotus.NewSyncMonitor(cb) require.NoError(t, err) - cl := filcold.New(ms, dm, nil, ipfsClient, fchain, l, lsm, minimumPieceSize, 1) + cl := filcold.New(ms, dm, nil, ipfsClient, fchain, l, lsm, minimumPieceSize, 1, time.Hour) hl, err := coreipfs.New(ds, ipfsClient, l) require.NoError(t, err) sched, err := scheduler.New(txndstr.Wrap(ds, "ffs/scheduler"), l, hl, cl, 10, time.Minute*10, nil, scheduler.GCConfig{AutoGCInterval: 0}) diff --git a/ffs/interfaces.go b/ffs/interfaces.go index d022e48ca..8dbeb5928 100644 --- a/ffs/interfaces.go +++ b/ffs/interfaces.go @@ -42,7 +42,7 @@ type HotStorage interface { // Stage adds io.Reader and stage-pins it. Stage(context.Context, APIID, io.Reader) (cid.Cid, error) - // StageCid stage-pins a cid. + // StageCid pulls Cid data and stage-pin it. StageCid(context.Context, APIID, cid.Cid) error // Unpin unpins a Cid. diff --git a/ffs/manager/manager.go b/ffs/manager/manager.go index 77951b6fc..d3eecd76a 100644 --- a/ffs/manager/manager.go +++ b/ffs/manager/manager.go @@ -33,7 +33,7 @@ var ( Hot: ffs.HotConfig{ Enabled: false, Ipfs: ffs.IpfsConfig{ - AddTimeout: 480, // 8 min + AddTimeout: 15 * 60, // 15min }, }, Cold: ffs.ColdConfig{ diff --git a/ffs/scheduler/internal/sjstore/sjstore.go b/ffs/scheduler/internal/sjstore/sjstore.go index 702d1235d..d80407601 100644 --- a/ffs/scheduler/internal/sjstore/sjstore.go +++ b/ffs/scheduler/internal/sjstore/sjstore.go @@ -210,6 +210,8 @@ func (s *Store) Enqueue(j ffs.StorageJob) error { // GetExecutingJob returns a JobID that is currently executing for // data with cid c in iid. If there's not such job, it returns nil. func (s *Store) GetExecutingJob(iid ffs.APIID, c cid.Cid) *ffs.JobID { + s.lock.Lock() + defer s.lock.Unlock() j, ok := s.executingJobs[iid][c] if !ok { return nil diff --git a/ffs/scheduler/scheduler_storage.go b/ffs/scheduler/scheduler_storage.go index 1b3429a85..c00fdf317 100644 --- a/ffs/scheduler/scheduler_storage.go +++ b/ffs/scheduler/scheduler_storage.go @@ -228,6 +228,18 @@ func (s *Scheduler) executeStorage(ctx context.Context, a astore.StorageAction, s.l.Log(ctx, "Hot-Storage configuration ran successfully.") } + // We want to avoid relying on Lotus working in online-mode. + // We need to take care ourselves of pulling the data from + // the IPFS network. + if !a.Cfg.Hot.Enabled && a.Cfg.Cold.Enabled { + s.l.Log(ctx, "Automatically staging Cid from the IPFS network...") + stageCtx, cancel := context.WithTimeout(ctx, time.Duration(a.Cfg.Hot.Ipfs.AddTimeout)*time.Second) + defer cancel() + if err := s.hs.StageCid(stageCtx, a.APIID, a.Cid); err != nil { + return ffs.StorageInfo{}, nil, fmt.Errorf("automatically staging cid: %s", err) + } + } + s.l.Log(ctx, "Executing Cold-Storage configuration...") cold, errors, err := s.executeColdStorage(ctx, ci, a.Cfg.Cold, dealUpdates) if err != nil { @@ -287,6 +299,7 @@ func (s *Scheduler) executeEnabledHotStorage(ctx context.Context, iid ffs.APIID, var size int var err error if !replaceCid.Defined() { + s.l.Log(ctx, "Fetching from the IPFS network...") size, err = s.hs.Pin(sctx, iid, curr.Cid) } else { s.l.Log(ctx, "Replace of previous pin %s", replaceCid)