Skip to content

Commit

Permalink
feat: unsealed from sp through venus-gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
diwufeiwen committed Jan 9, 2023
1 parent a540f61 commit 357559f
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 12 deletions.
85 changes: 81 additions & 4 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ import (
"github.com/ipfs/go-cid"

"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"

gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
vSharedTypes "github.com/filecoin-project/venus/venus-shared/types"

marketMetrics "github.com/filecoin-project/venus-market/v2/metrics"
"github.com/filecoin-project/venus-market/v2/models/repo"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils"
)

Expand All @@ -35,17 +40,28 @@ type marketAPI struct {
useTransient bool
metricsCtx metrics.MetricsCtx
gatewayMarketClient gatewayAPIV2.IMarketClient

throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMarketAPI(ctx metrics.MetricsCtx, repo repo.Repo, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient, useTransient bool) MarketAPI {
func NewMarketAPI(
ctx metrics.MetricsCtx,
repo repo.Repo,
pieceStorageMgr *piecestorage.PieceStorageManager,
gatewayMarketClient gatewayAPIV2.IMarketClient,
useTransient bool,
concurrency int) MarketAPI {

return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorageMgr: pieceStorageMgr,
useTransient: useTransient,
metricsCtx: ctx,
gatewayMarketClient: gatewayMarketClient,

throttle: throttle.Fixed(concurrency),
}
}

Expand All @@ -56,11 +72,72 @@ func (m *marketAPI) Start(_ context.Context) error {
func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
return false, fmt.Errorf("unable to find storage for piece %s %w", pieceCid, err)
log.Warnf("unable to find storage for piece %s: %s", pieceCid, err)

// check it from the SP through venus-gateway
deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err)
}

if len(deals) == 0 {
return false, fmt.Errorf("no storage deals found for piece %s", pieceCid)
}

// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
for _, deal := range deals {
deal := deal

var isUnsealed bool
// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
isUnsealed, err = m.gatewayMarketClient.IsUnsealed(ctx, deal.Proposal.Provider, pieceCid,
storage.SectorRef{ID: abi.SectorID{Number: deal.SectorNumber}},
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize)
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorNumber, deal.DealID, err)
}

if isUnsealed {
// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}
redirectUrl, err := wps.GetRedirectUrl(ctx, wps.GetName())
if err != nil {
return fmt.Errorf("failed to get redirect url for piece storage %s: %w", wps.GetName(), err)
}
return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
storage.SectorRef{ID: abi.SectorID{Number: deal.SectorNumber}},
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
redirectUrl,
)
}

return nil
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}

if isUnsealed {
return true, nil
}
}

// we don't have an unsealed sector containing the piece
return false, nil
}

return true, nil
// todo check isunseal from miner
// m.gatewayMarketClient.IsUnsealed()
}

func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) {
Expand Down
2 changes: 1 addition & 1 deletion dagstore/market_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMarket(t *testing.T) {
assert.Nil(t, err)

// todo: mock IMarketEvent
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false)
marketAPI := NewMarketAPI(ctx, r, pmgr, nil, false, 100)

size, err := marketAPI.GetUnpaddedCARSize(ctx, testResourceId)
assert.Nil(t, err)
Expand Down
9 changes: 8 additions & 1 deletion dagstore/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ const (

// CreateAndStartMarketAPI creates a new MarketAPI adaptor for the dagstore mounts.
func CreateAndStartMarketAPI(ctx metrics.MetricsCtx, lc fx.Lifecycle, r *config.DAGStoreConfig, repo repo.Repo, pieceStorage *piecestorage.PieceStorageManager, gatewayMarketClient gatewayAPIV2.IMarketClient) (MarketAPI, error) {
mountApi := NewMarketAPI(ctx, repo, pieceStorage, gatewayMarketClient, r.UseTransient)
mountApi := NewMarketAPI(
ctx,
repo,
pieceStorage,
gatewayMarketClient,
r.UseTransient,
r.MaxConcurrencyStorageCalls,
)
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return mountApi.Start(ctx)
Expand Down
10 changes: 5 additions & 5 deletions piecestorage/storagemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err
// todo: extract name check logic to a function and check blank in name

for _, fsCfg := range cfg.Fs {
// check if storage already exist in manager and it's name is not empty
// check if storage already exist in manager, and it's name is not empty
if fsCfg.Name == "" {
return nil, fmt.Errorf("fs piece storage name is empty, must set storage name in piece storage config `name=yourname`")
}
Expand All @@ -38,7 +38,7 @@ func NewPieceStorageManager(cfg *config.PieceStorage) (*PieceStorageManager, err
}

for _, s3Cfg := range cfg.S3 {
// check if storage already exist in manager and it's name is not empty
// check if storage already exist in manager, and it's name is not empty
if s3Cfg.Name == "" {
return nil, fmt.Errorf("s3 pieceStorage name is empty, must set storage name in piece storage config `name=yourname`")
}
Expand All @@ -64,7 +64,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string)
_ = p.EachPieceStorage(func(st IPieceStorage) error {
has, err := st.Has(ctx, s)
if err != nil {
log.Warnf("got error while check avaibale in storageg")
log.Warnf("got error while check avaibale in storage: %s", err.Error())
return nil
}
if has {
Expand All @@ -74,7 +74,7 @@ func (p *PieceStorageManager) FindStorageForRead(ctx context.Context, s string)
})

if len(storages) == 0 {
return nil, fmt.Errorf("unable to find piece in storage %s", s)
return nil, fmt.Errorf("unable to find piece %s in storage", s)
}

return randStorageSelector(storages)
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *PieceStorageManager) AddPieceStorage(s IPieceStorage) error {
p.lk.Lock()
defer p.lk.Unlock()

// check if storage already exist in manager and it's name is not empty
// check if storage already exist in manager, and it's name is not empty
_, ok := p.storages[s.GetName()]
if ok {
return fmt.Errorf("duplicate storage name: %s", s.GetName())
Expand Down
2 changes: 1 addition & 1 deletion piecestorage/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type IPieceStorage interface {
Len(context.Context, string) (int64, error)
// ListResourceIds get resource ids from piece store
ListResourceIds(ctx context.Context) ([]string, error)
// GetMountReader use direct read if storage have low performance effecitive ReadAt
// GetReaderCloser use direct read if storage have low performance effecitive ReadAt
GetReaderCloser(context.Context, string) (io.ReadCloser, error)
// GetMountReader used to support dagstore
GetMountReader(context.Context, string) (mount.Reader, error)
Expand Down

0 comments on commit 357559f

Please sign in to comment.