Skip to content

Commit

Permalink
Merge pull request #318 from filecoin-project/chore/pick-317
Browse files Browse the repository at this point in the history
Merge pull request #317 from filecoin-project/opt/import-data
  • Loading branch information
hunjixin authored Apr 13, 2023
2 parents e48c6ba + 43d1572 commit ea2038a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 75 deletions.
6 changes: 3 additions & 3 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *MarketNodeImpl) MarketImportDealData(ctx context.Context, propCid cid.C
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi)
return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi, false)
}

func (m *MarketNodeImpl) MarketImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error {
Expand Down Expand Up @@ -1039,7 +1039,7 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add
return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus)
}

func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string) error {
func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string, skipCommP bool) error {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid)
if err != nil {
return err
Expand All @@ -1053,7 +1053,7 @@ func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Ci
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi)
return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi, skipCommP)
}

func (m *MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*types.DealInfo, error) {
Expand Down
24 changes: 21 additions & 3 deletions cli/storage-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,18 @@ var storageDealsCmds = &cli.Command{
}

var dealsImportDataCmd = &cli.Command{
Name: "import-data",
Usage: "Manually import data for a deal",
Name: "import-data",
Usage: "Manually import data for a deal",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "skip-commp",
Usage: "skip calculate the piece-cid, please use with caution",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "Actually send transaction performing the action",
},
},
ArgsUsage: "<proposal CID> <file>",
Action: func(cctx *cli.Context) error {
api, closer, err := NewMarketNode(cctx)
Expand All @@ -72,7 +82,15 @@ var dealsImportDataCmd = &cli.Command{

fpath := cctx.Args().Get(1)

return api.DealsImportData(ctx, propCid, fpath)
var skipCommP bool
if cctx.IsSet("skip-commp") {
if !cctx.IsSet("really-do-it") {
return fmt.Errorf("pass --really-do-it to actually execute this action")
}
skipCommP = true
}

return api.DealsImportData(ctx, propCid, fpath, skipCommP)
},
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.1-0.20230307021527-70bd910ad469
github.com/filecoin-project/venus-auth v1.10.0
github.com/filecoin-project/venus v1.10.3-0.20230412022158-82d8b3d61d7a
github.com/filecoin-project/venus-auth v1.10.1
github.com/filecoin-project/venus-messager v1.10.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,11 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.1-0.20230307021527-70bd910ad469 h1:tMvzWy7TAc3Ex2B+6hIdQVzHUNRTzJAhKncwTTn2Ehg=
github.com/filecoin-project/venus v1.10.1-0.20230307021527-70bd910ad469/go.mod h1:lO6L4FiHcW4ad+2R7x2J2slqYkhq/X0ObZfmxkJmZ7M=
github.com/filecoin-project/venus v1.10.3-0.20230412022158-82d8b3d61d7a h1:+WkDX3hVeCQvad5+KDQ24cb+Lwtr+Hrv4dMzwp7+n4o=
github.com/filecoin-project/venus v1.10.3-0.20230412022158-82d8b3d61d7a/go.mod h1:AkArMvTdspbcJu133WnAu8aMyRI82SZDvgv4Y3rTHiI=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.0 h1:0V5R36HIvLKRFJQjKkW0tjYazgHkDW/gmjwuEtKL1SA=
github.com/filecoin-project/venus-auth v1.10.0/go.mod h1:MoirCIFdK8FNIFXpBbgF4xhRsjGi4KHHfW1EF6iAwZ8=
github.com/filecoin-project/venus-auth v1.10.1 h1:A2AkLabDTU7Ivenx8yzusPh7R1tbTrr4kkDypi+ykTY=
github.com/filecoin-project/venus-auth v1.10.1/go.mod h1:vFL6M8Ko1o05oX2EYr5wSzpKa6yL6RrLOVr5rO29Cxc=
github.com/filecoin-project/venus-messager v1.10.0 h1:A2NVaZ5FgoUiEzdBc2f/6Taj8LKp72fqgxV2peZk8qM=
github.com/filecoin-project/venus-messager v1.10.0/go.mod h1:JU21//2CNstoXZ8jE/kpc77MsmzcwAjkHaCcKFSZAuU=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
Expand Down
26 changes: 23 additions & 3 deletions storageprovider/deal_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
deal.PayloadSize = uint64(file.Size())
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database"))
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
err = storageDealPorcess.savePieceFile(ctx, deal, file, uint64(file.Size()))
if err := file.Close(); err != nil {
Expand All @@ -476,7 +476,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
err = fmt.Errorf("packing piece at path %s: %w", deal.PiecePath, err)
return storageDealPorcess.HandleError(ctx, deal, err)
}
} else {
} else if len(deal.InboundCAR) != 0 {
carFilePath = deal.InboundCAR

v2r, err := storageDealPorcess.ReadCAR(deal.InboundCAR)
Expand All @@ -488,7 +488,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
deal.PayloadSize = v2r.Header.DataSize
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database"))
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
dr, err := v2r.DataReader()
if err != nil {
Expand All @@ -507,6 +507,26 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
err = fmt.Errorf("packing piece %s: %w", deal.Ref.PieceCid, packingErr)
return storageDealPorcess.HandleError(ctx, deal, err)
}
} else {
// An index can be created even if carFilePath is empty
carFilePath = ""
// carfile may already in piece storage, verify it
pieceStore, err := storageDealPorcess.pieceStorageMgr.FindStorageForRead(ctx, deal.Proposal.PieceCID.String())
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, err)
}
log.Debugf("found %s in piece storage", deal.Proposal.PieceCID)

l, err := pieceStore.Len(ctx, deal.Proposal.PieceCID.String())
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to got payload size: %v", err))
}

deal.PayloadSize = uint64(l)
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
}

// Register the deal data as a "shard" with the DAG store. Later it can be
Expand Down
159 changes: 99 additions & 60 deletions storageprovider/storage_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type StorageProvider interface {
GetStorageCollateral(ctx context.Context, mAddr address.Address) (storagemarket.Balance, error)

// ImportDataForDeal manually imports data for an offline storage deal
ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error
ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error

// ImportPublishedDeal manually import published deals to storage deals
ImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error
Expand Down Expand Up @@ -139,6 +139,7 @@ type StorageProviderImpl struct {
transferProcess IDatatransferHandler
storageReceiver smnet.StorageReceiver
minerMgr minermgr.IMinerMgr
pieceStorageMgr *piecestorage.PieceStorageManager
}

// NewStorageProvider returns a new storage provider
Expand Down Expand Up @@ -172,7 +173,8 @@ func NewStorageProvider(

dealStore: repo.StorageDealRepo(),

minerMgr: minerMgr,
minerMgr: minerMgr,
pieceStorageMgr: pieceStorageMgr,
}

dealProcess, err := NewStorageDealProcessImpl(mCtx, spV2.conns, newPeerTagger(spV2.net), spV2.spn, spV2.dealStore, spV2.storedAsk, tf, minerMgr, pieceStorageMgr, dataTransfer, dagStore, sdf, pb)
Expand Down Expand Up @@ -261,7 +263,9 @@ func (p *StorageProviderImpl) Stop() error {
// ImportDataForDeal manually imports data for an offline storage deal
// It will verify that the data in the passed io.Reader matches the expected piece
// cid for the given deal or it will error
func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error {
// If can find the car file from the piece store, read it directly without copying the car file to the local directory.
// If skipCommP is true, do not compare piece cid.
func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error {
// TODO: be able to check if we have enough disk space
d, err := p.dealStore.GetDeal(ctx, propCid)
if err != nil {
Expand All @@ -276,81 +280,116 @@ func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid
return fmt.Errorf("deal %s does not support offline data", propCid)
}

fs, err := p.tf(d.Proposal.Provider)
if err != nil {
return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err)
}
var r io.Reader
var carSize int64
var piecePath filestore.Path
var cleanup = func() {}

tempfi, err := fs.CreateTemp()
if err != nil {
return fmt.Errorf("failed to create temp file for data import: %w", err)
}
defer func() {
if err := tempfi.Close(); err != nil {
log.Errorf("unable to close stream %v", err)
pieceStore, err := p.pieceStorageMgr.FindStorageForRead(ctx, d.Proposal.PieceCID.String())
if err == nil {
log.Debugf("found %v already in piece storage", d.Proposal.PieceCID)

// In order to avoid errors in the deal, the files in the piece store were deleted.
piecePath = filestore.Path("")
if carSize, err = pieceStore.Len(ctx, d.Proposal.PieceCID.String()); err != nil {
return fmt.Errorf("got piece size from piece store failed: %v", err)
}
}()
cleanup := func() {
_ = tempfi.Close()
_ = fs.Delete(tempfi.Path())
}
readerCloser, err := pieceStore.GetReaderCloser(ctx, d.Proposal.PieceCID.String())
if err != nil {
return fmt.Errorf("got reader from piece store failed: %v", err)
}
r = readerCloser

log.Debugw("will copy imported file to local file", "propCid", propCid)
n, err := io.Copy(tempfi, data)
if err != nil {
cleanup()
return fmt.Errorf("importing deal data failed: %w", err)
}
log.Debugw("finished copying imported file to local file", "propCid", propCid)
defer func() {
if err = readerCloser.Close(); err != nil {
log.Errorf("unable to close piece storage: %v, %v", d.Proposal.PieceCID, err)
}
}()
} else {
log.Debugf("not found %s in piece storage", d.Proposal.PieceCID)

_ = n // TODO: verify n?
fs, err := p.tf(d.Proposal.Provider)
if err != nil {
return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err)
}

carSize := uint64(tempfi.Size())
tempfi, err := fs.CreateTemp()
if err != nil {
return fmt.Errorf("failed to create temp file for data import: %w", err)
}
defer func() {
if err := tempfi.Close(); err != nil {
log.Errorf("unable to close stream %v", err)
}
}()
cleanup = func() {
_ = tempfi.Close()
_ = fs.Delete(tempfi.Path())
}

_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return fmt.Errorf("failed to seek through temp imported file: %w", err)
}
log.Debugw("will copy imported file to local file", "propCid", propCid)
n, err := io.Copy(tempfi, data)
if err != nil {
cleanup()
return fmt.Errorf("importing deal data failed: %w", err)
}
log.Debugw("finished copying imported file to local file", "propCid", propCid)

proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池?
if err != nil {
p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d)
cleanup()
return fmt.Errorf("failed to determine proof type: %w", err)
}
log.Debugw("fetched proof type", "propCid", propCid)
_ = n // TODO: verify n?

pieceCid, err := utils.GeneratePieceCommitment(proofType, tempfi, carSize)
if err != nil {
cleanup()
return fmt.Errorf("failed to generate commP: %w", err)
}
if carSizePadded := padreader.PaddedSize(carSize).Padded(); carSizePadded < d.Proposal.PieceSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pieceCid.Hash()[len(pieceCid.Hash())-32:],
uint64(carSizePadded),
uint64(d.Proposal.PieceSize),
)
carSize = tempfi.Size()
piecePath = tempfi.Path()
_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return err
return fmt.Errorf("failed to seek through temp imported file: %w", err)
}
pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)

r = tempfi
}

// Verify CommP matches
if !pieceCid.Equals(d.Proposal.PieceCID) {
cleanup()
return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID)
if !skipCommP {
log.Debugf("will calculate piece cid")

proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池?
if err != nil {
p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d)
cleanup()
return fmt.Errorf("failed to determine proof type: %w", err)
}
log.Debugw("fetched proof type", "propCid", propCid)

pieceCid, err := utils.GeneratePieceCommitment(proofType, r, uint64(carSize))
if err != nil {
cleanup()
return fmt.Errorf("failed to generate commP: %w", err)
}
if carSizePadded := padreader.PaddedSize(uint64(carSize)).Padded(); carSizePadded < d.Proposal.PieceSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pieceCid.Hash()[len(pieceCid.Hash())-32:],
uint64(carSizePadded),
uint64(d.Proposal.PieceSize),
)
if err != nil {
cleanup()
return err
}
pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

// Verify CommP matches
if !pieceCid.Equals(d.Proposal.PieceCID) {
cleanup()
return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID)
}
}

log.Debugw("will fire ReserveProviderFunds for imported file", "propCid", propCid)

// "will fire VerifiedData for imported file
d.PiecePath = tempfi.Path()
d.PiecePath = piecePath
d.MetadataPath = filestore.Path("")
log.Infof("deal %s piece path: %s", propCid, d.PiecePath)

Expand Down

0 comments on commit ea2038a

Please sign in to comment.