Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: unseal piece before tansfer / 在数据传输之前先 unseal piece 数据 #331

Merged
merged 6 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 6 additions & 51 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dagstore

import (
"context"
"errors"
"fmt"
"io"

Expand All @@ -15,12 +16,10 @@ import (
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-padreader"
gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2"
vSharedTypes "github.com/filecoin-project/venus/venus-shared/types"

marketMetrics "github.com/filecoin-project/venus-market/v2/metrics"
"github.com/filecoin-project/venus-market/v2/models/repo"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/storageprovider"
"github.com/filecoin-project/venus-market/v2/utils"
)

Expand Down Expand Up @@ -70,55 +69,10 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err
_, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
log.Warnf("unable to find storage for piece %s: %s", pieceCid, err)

// check it from the SP through venus-gateway
deals, err := m.pieceRepo.GetDealsByPieceCidAndStatus(ctx, pieceCid, storageprovider.ReadyRetrievalDealStatus...)
if err != nil {
return false, fmt.Errorf("get delas for piece %s: %w", pieceCid, err)
if errors.Is(err, piecestorage.ErrorNotFoundForRead) {
return false, nil
}

if len(deals) == 0 {
return false, fmt.Errorf("no storage deals found for piece %s", pieceCid)
}

// check if we have an unsealed deal for the given piece in any of the unsealed sectors.
for _, deal := range deals {
deal := deal

// Throttle this path to avoid flooding the storage subsystem.
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {

// send SectorsUnsealPiece task
wps, err := m.pieceStorageMgr.FindStorageForWrite(int64(deal.Proposal.PieceSize))
if err != nil {
return fmt.Errorf("failed to find storage to write %s: %w", pieceCid, err)
}

pieceTransfer, err := wps.GetPieceTransfer(ctx, pieceCid.String())
if err != nil {
return fmt.Errorf("get piece transfer for %s: %w", pieceCid, err)
}

return m.gatewayMarketClient.SectorsUnsealPiece(
ctx,
deal.Proposal.Provider,
pieceCid,
deal.SectorNumber,
vSharedTypes.PaddedByteIndex(deal.Offset.Unpadded()),
deal.Proposal.PieceSize,
pieceTransfer,
)
})

if err != nil {
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue // move on to the next match.
}
return true, nil
}

// we don't have an unsealed sector containing the piece
return false, nil
return false, err
}

return true, nil
Expand All @@ -132,8 +86,9 @@ func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid)

pieceStorage, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String())
if err != nil {
return nil, err
return nil, fmt.Errorf("find piece for read: %w", err)
}

storageName := pieceStorage.GetName()
size, err := pieceStorage.Len(ctx, pieceCid.String())
if err != nil {
Expand Down
198 changes: 198 additions & 0 deletions dagstore/mocks/mock_dagstore_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 49 additions & 22 deletions dagstore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,36 +211,63 @@ func (w *Wrapper) gcLoop() {
}

func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) {
log.Debugf("acquiring shard for piece CID %s", pieceCid)
log := log.With("piece-cid", pieceCid)
log.Debug("acquiring shard")

key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
log.Debugf("sent message to acquire shard for piece CID %s", pieceCid)

if err != nil {
if !errors.Is(err, dagstore.ErrShardUnknown) {
return nil, fmt.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
// get shard info
var sInfo dagstore.ShardInfo
var err error
retryCount := 5
for i := retryCount; i >= 0; i-- {
simlecode marked this conversation as resolved.
Show resolved Hide resolved
if i == 0 {
return nil, fmt.Errorf("failed to get shard info for piece CID %s, after %d retry : %w", pieceCid, i, err)
}

// if the DAGStore does not know about the Shard -> register it and then try to acquire it again.
log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid)
// The path of a transient file that we can ask the DAG Store to use
// to perform the Indexing rather than fetching it via the Mount if
// we already have a transient file. However, we don't have it here
// and therefore we pass an empty file path.
carPath := ""
if err := stores.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil {
return nil, fmt.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
sInfo, err = w.dagst.GetShardInfo(key)
if err != nil {
if errors.Is(err, dagstore.ErrShardUnknown) {
log.Warn("shard not found, try to re-register")
if err := stores.RegisterShardSync(ctx, w, pieceCid, "", false); err != nil {
return nil, fmt.Errorf("failed to re-register shard during loading pieceCID %s: %w", pieceCid, err)
}
continue
} else {
return nil, fmt.Errorf("failed to get shard info for piece CID %s: %w", pieceCid, err)
}
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)

resch = make(chan dagstore.ShardResult, 1)
if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, fmt.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
break
}

// check state
log.Infof("shard state: %s", sInfo.ShardState.String())
switch sInfo.ShardState {
case dagstore.ShardStateErrored:
// try to recover
log.Warn("shard is in errored state, try to recover")
recoverRes := make(chan dagstore.ShardResult, 1)
if err := w.dagst.RecoverShard(ctx, key, recoverRes, dagstore.RecoverOpts{}); err != nil {
return nil, fmt.Errorf("failed to recover shard for piece CID %s: %w", pieceCid, err)
}
select {
case res := <-recoverRes:
if res.Error != nil {
return nil, fmt.Errorf("failed to recover shard for piece CID %s: %w", pieceCid, res.Error)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}

resCh := make(chan dagstore.ShardResult, 1)
err = w.dagst.AcquireShard(ctx, key, resCh, dagstore.AcquireOpts{})
log.Debugf("sent message to acquire shard for piece CID %s", pieceCid)

if err != nil {
return nil, fmt.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, err)
}

// TODO: The context is not yet being actively monitored by the DAG store,
// so we need to select against ctx.Done() until the following issue is
// implemented:
Expand All @@ -249,7 +276,7 @@ func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.Closa
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
case res = <-resCh:
if res.Error != nil {
return nil, fmt.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error)
}
Expand Down
Loading