From 357559fb1f88f678fbf1bdf5b5107d8dba6d42bd Mon Sep 17 00:00:00 2001 From: Tiance <1033935631@qq.com> Date: Mon, 9 Jan 2023 17:20:29 +0800 Subject: [PATCH] feat: unsealed from sp through venus-gateway --- dagstore/market_api.go | 85 +++++++++++++++++++++++++++++++++++-- dagstore/market_api_test.go | 2 +- dagstore/modules.go | 9 +++- piecestorage/storagemgr.go | 10 ++--- piecestorage/type.go | 2 +- 5 files changed, 96 insertions(+), 12 deletions(-) diff --git a/dagstore/market_api.go b/dagstore/market_api.go index 135c666f..ed91bcea 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -12,13 +12,18 @@ import ( "github.com/ipfs/go-cid" "github.com/filecoin-project/dagstore/mount" + "github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-storage/storage" gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" + vSharedTypes "github.com/filecoin-project/venus/venus-shared/types" marketMetrics "github.com/filecoin-project/venus-market/v2/metrics" "github.com/filecoin-project/venus-market/v2/models/repo" "github.com/filecoin-project/venus-market/v2/piecestorage" + "github.com/filecoin-project/venus-market/v2/storageprovider" "github.com/filecoin-project/venus-market/v2/utils" ) @@ -35,17 +40,28 @@ type marketAPI struct { useTransient bool metricsCtx metrics.MetricsCtx gatewayMarketClient gatewayAPIV2.IMarketClient + + throttle throttle.Throttler } var _ MarketAPI = (*marketAPI)(nil) -func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI { +func NewMarketAPI( + ctx metrics.MetricsCtx, + repo repo.Repo, + pieceStorageMgr *piecestorage.PieceStorageManager, + gatewayMarketClient gatewayAPIV2.IMarketClient, + useTransient bool, + concurrency int) MarketAPI { + return &marketAPI{ pieceRepo: repo.StorageDealRepo(), pieceStorageMgr: pieceStorageMgr, useTransient: useTransient, metricsCtx: ctx, gatewayMarketClient: gatewayMarketClient, + + throttle: throttle.Fixed(concurrency), } } @@ -56,11 +72,72 @@ func (m *marketAPI) Start(_ context.Context) error { func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { _, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String()) if err != nil { - return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err) + log.Warnf("unable to find storage for piece %s: %s", pieceCid, err) + + // check it from the SP through venus-gateway + deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...) + if err != nil { + return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err) + } + + if len(deals) == 0 { + return false, fmt.Errorf("no storage deals found for piece %s", pieceCid) + } + + // check if we have an unsealed deal for the given piece in any of the unsealed sectors. + for _, deal := range deals { + deal := deal + + var isUnsealed bool + // Throttle this path to avoid flooding the storage subsystem. + err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { + isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid, + storage.SectorRef{ID: abi.SectorID{Number: deal.SectorNumber}}, + vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), + deal.Proposal.PieceSize) + if err != nil { + return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err) + } + + if isUnsealed { + // send SectorsUnsealPiece task + wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize)) + if err != nil { + return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err) + } + redirectUrl, err := wps.GetRedirectUrl(ctx, wps.GetName()) + if err != nil { + return fmt.Errorf("failed to get redirect url for piece storage %s: %w", wps.GetName(), err) + } + return m.gatewayMarketClient.SectorsUnsealPiece( + ctx, + deal.Proposal.Provider, + pieceCid, + storage.SectorRef{ID: abi.SectorID{Number: deal.SectorNumber}}, + vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()), + deal.Proposal.PieceSize, + redirectUrl, + ) + } + + return nil + }) + + if err != nil { + log.Warnf("failed to check/retrieve unsealed sector: %s", err) + continue // move on to the next match. + } + + if isUnsealed { + return true, nil + } + } + + // we don't have an unsealed sector containing the piece + return false, nil } + return true, nil - // todo check isunseal from miner - // m.gatewayMarketClient.IsUnsealed() } func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) { diff --git a/dagstore/market_api_test.go b/dagstore/market_api_test.go index 7e196736..a76ba3c3 100644 --- a/dagstore/market_api_test.go +++ b/dagstore/market_api_test.go @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) { assert.Nil(t, err) // todo: mock IMarketEvent - marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false) + marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100) size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId) assert.Nil(t, err) diff --git a/dagstore/modules.go b/dagstore/modules.go index ecec639a..de6c1a80 100644 --- a/dagstore/modules.go +++ b/dagstore/modules.go @@ -31,7 +31,14 @@ const ( // CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts. func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) { - mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient) + mountApi := NewMarketAPI( + ctx, + repo, + pieceStorage, + gatewayMarketClient, + r.UseTransient, + r.MaxConcurrencyStorageCalls, + ) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return mountApi.Start(ctx) diff --git a/piecestorage/storagemgr.go b/piecestorage/storagemgr.go index 638ec5d5..9c55c507 100644 --- a/piecestorage/storagemgr.go +++ b/piecestorage/storagemgr.go @@ -21,7 +21,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err // todo: extract name check logic to a function and check blank in name for _, fsCfg := range cfg.Fs { - // check if storage already exist in manager and it's name is not empty + // check if storage already exist in manager, and it's name is not empty if fsCfg.Name == "" { return nil, fmt.Errorf("fs piece storage name is empty, must set storage name in piece storage config `name=yourname`") } @@ -38,7 +38,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err } for _, s3Cfg := range cfg.S3 { - // check if storage already exist in manager and it's name is not empty + // check if storage already exist in manager, and it's name is not empty if s3Cfg.Name == "" { return nil, fmt.Errorf("s3 pieceStorage name is empty, must set storage name in piece storage config `name=yourname`") } @@ -64,7 +64,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) _ = p.EachPieceStorage(func(st IPieceStorage) error { has, err := st.Has(ctx, s) if err != nil { - log.Warnf("got error while check avaibale in storageg") + log.Warnf("got error while check avaibale in storage: %s", err.Error()) return nil } if has { @@ -74,7 +74,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string) }) if len(storages) == 0 { - return nil, fmt.Errorf("unable to find piece in storage %s", s) + return nil, fmt.Errorf("unable to find piece %s in storage", s) } return randStorageSelector(storages) @@ -125,7 +125,7 @@ func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error { p.lk.Lock() defer p.lk.Unlock() - // check if storage already exist in manager and it's name is not empty + // check if storage already exist in manager, and it's name is not empty _, ok := p.storages[s.GetName()] if ok { return fmt.Errorf("duplicate storage name: %s", s.GetName()) diff --git a/piecestorage/type.go b/piecestorage/type.go index cd9a327d..97115d87 100644 --- a/piecestorage/type.go +++ b/piecestorage/type.go @@ -29,7 +29,7 @@ type IPieceStorage interface { Len(context.Context, string) (int64, error) // ListResourceIds get resource ids from piece store ListResourceIds(ctx context.Context) ([]string, error) - // GetMountReader use direct read if storage have low performance effecitive ReadAt + // GetReaderCloser use direct read if storage have low performance effecitive ReadAt GetReaderCloser(context.Context, string) (io.ReadCloser, error) // GetMountReader used to support dagstore GetMountReader(context.Context, string) (mount.Reader, error)