From 43d1572edafccc32556d7af1b71830c272124340 Mon Sep 17 00:00:00 2001 From: Mike <41407352+hunjixin@users.noreply.github.com> Date: Thu, 13 Apr 2023 10:08:11 +0800 Subject: [PATCH] Merge pull request #317 from filecoin-project/opt/import-data Opt/import data --- api/impl/venus_market.go | 6 +- cli/storage-deals.go | 24 ++++- go.mod | 4 +- go.sum | 8 +- storageprovider/deal_handler.go | 26 ++++- storageprovider/storage_provider.go | 159 +++++++++++++++++----------- 6 files changed, 152 insertions(+), 75 deletions(-) diff --git a/api/impl/venus_market.go b/api/impl/venus_market.go index bc440f79..8bd65d36 100644 --- a/api/impl/venus_market.go +++ b/api/impl/venus_market.go @@ -223,7 +223,7 @@ func (m *MarketNodeImpl) MarketImportDealData(ctx context.Context, propCid cid.C } defer fi.Close() //nolint:errcheck - return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi) + return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi, false) } func (m *MarketNodeImpl) MarketImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error { @@ -1039,7 +1039,7 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus) } -func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string) error { +func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string, skipCommP bool) error { deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid) if err != nil { return err @@ -1053,7 +1053,7 @@ func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Ci } defer fi.Close() //nolint:errcheck - return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi) + return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi, skipCommP) } func (m *MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*types.DealInfo, error) { diff --git a/cli/storage-deals.go b/cli/storage-deals.go index 73368ea6..0b4b5f18 100644 --- a/cli/storage-deals.go +++ b/cli/storage-deals.go @@ -49,8 +49,18 @@ var storageDealsCmds = &cli.Command{ } var dealsImportDataCmd = &cli.Command{ - Name: "import-data", - Usage: "Manually import data for a deal", + Name: "import-data", + Usage: "Manually import data for a deal", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "skip-commp", + Usage: "skip calculate the piece-cid, please use with caution", + }, + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "Actually send transaction performing the action", + }, + }, ArgsUsage: " ", Action: func(cctx *cli.Context) error { api, closer, err := NewMarketNode(cctx) @@ -72,7 +82,15 @@ var dealsImportDataCmd = &cli.Command{ fpath := cctx.Args().Get(1) - return api.DealsImportData(ctx, propCid, fpath) + var skipCommP bool + if cctx.IsSet("skip-commp") { + if !cctx.IsSet("really-do-it") { + return fmt.Errorf("pass --really-do-it to actually execute this action") + } + skipCommP = true + } + + return api.DealsImportData(ctx, propCid, fpath, skipCommP) }, } diff --git a/go.mod b/go.mod index a99ca9ee..ac08233d 100644 --- a/go.mod +++ b/go.mod @@ -29,8 +29,8 @@ require ( github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 - github.com/filecoin-project/venus v1.10.1-0.20230307021527-70bd910ad469 - github.com/filecoin-project/venus-auth v1.10.0 + github.com/filecoin-project/venus v1.10.3-0.20230412022158-82d8b3d61d7a + github.com/filecoin-project/venus-auth v1.10.1 github.com/filecoin-project/venus-messager v1.10.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 diff --git a/go.sum b/go.sum index ad4502be..5d0bd5e5 100644 --- a/go.sum +++ b/go.sum @@ -455,11 +455,11 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893 github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8= github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E= github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0= -github.com/filecoin-project/venus v1.10.1-0.20230307021527-70bd910ad469 h1:tMvzWy7TAc3Ex2B+6hIdQVzHUNRTzJAhKncwTTn2Ehg= -github.com/filecoin-project/venus v1.10.1-0.20230307021527-70bd910ad469/go.mod h1:lO6L4FiHcW4ad+2R7x2J2slqYkhq/X0ObZfmxkJmZ7M= +github.com/filecoin-project/venus v1.10.3-0.20230412022158-82d8b3d61d7a h1:+WkDX3hVeCQvad5+KDQ24cb+Lwtr+Hrv4dMzwp7+n4o= +github.com/filecoin-project/venus v1.10.3-0.20230412022158-82d8b3d61d7a/go.mod h1:AkArMvTdspbcJu133WnAu8aMyRI82SZDvgv4Y3rTHiI= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= -github.com/filecoin-project/venus-auth v1.10.0 h1:0V5R36HIvLKRFJQjKkW0tjYazgHkDW/gmjwuEtKL1SA= -github.com/filecoin-project/venus-auth v1.10.0/go.mod h1:MoirCIFdK8FNIFXpBbgF4xhRsjGi4KHHfW1EF6iAwZ8= +github.com/filecoin-project/venus-auth v1.10.1 h1:A2AkLabDTU7Ivenx8yzusPh7R1tbTrr4kkDypi+ykTY= +github.com/filecoin-project/venus-auth v1.10.1/go.mod h1:vFL6M8Ko1o05oX2EYr5wSzpKa6yL6RrLOVr5rO29Cxc= github.com/filecoin-project/venus-messager v1.10.0 h1:A2NVaZ5FgoUiEzdBc2f/6Taj8LKp72fqgxV2peZk8qM= github.com/filecoin-project/venus-messager v1.10.0/go.mod h1:JU21//2CNstoXZ8jE/kpc77MsmzcwAjkHaCcKFSZAuU= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= diff --git a/storageprovider/deal_handler.go b/storageprovider/deal_handler.go index a5dff9d2..22a23c4a 100644 --- a/storageprovider/deal_handler.go +++ b/storageprovider/deal_handler.go @@ -465,7 +465,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context, deal.PayloadSize = uint64(file.Size()) err = storageDealPorcess.deals.SaveDeal(ctx, deal) if err != nil { - return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database")) + return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err)) } err = storageDealPorcess.savePieceFile(ctx, deal, file, uint64(file.Size())) if err := file.Close(); err != nil { @@ -476,7 +476,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context, err = fmt.Errorf("packing piece at path %s: %w", deal.PiecePath, err) return storageDealPorcess.HandleError(ctx, deal, err) } - } else { + } else if len(deal.InboundCAR) != 0 { carFilePath = deal.InboundCAR v2r, err := storageDealPorcess.ReadCAR(deal.InboundCAR) @@ -488,7 +488,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context, deal.PayloadSize = v2r.Header.DataSize err = storageDealPorcess.deals.SaveDeal(ctx, deal) if err != nil { - return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database")) + return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err)) } dr, err := v2r.DataReader() if err != nil { @@ -507,6 +507,26 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context, err = fmt.Errorf("packing piece %s: %w", deal.Ref.PieceCid, packingErr) return storageDealPorcess.HandleError(ctx, deal, err) } + } else { + // An index can be created even if carFilePath is empty + carFilePath = "" + // carfile may already in piece storage, verify it + pieceStore, err := storageDealPorcess.pieceStorageMgr.FindStorageForRead(ctx, deal.Proposal.PieceCID.String()) + if err != nil { + return storageDealPorcess.HandleError(ctx, deal, err) + } + log.Debugf("found %s in piece storage", deal.Proposal.PieceCID) + + l, err := pieceStore.Len(ctx, deal.Proposal.PieceCID.String()) + if err != nil { + return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to got payload size: %v", err)) + } + + deal.PayloadSize = uint64(l) + err = storageDealPorcess.deals.SaveDeal(ctx, deal) + if err != nil { + return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err)) + } } // Register the deal data as a "shard" with the DAG store. Later it can be diff --git a/storageprovider/storage_provider.go b/storageprovider/storage_provider.go index 3c74f316..ab09f2c4 100644 --- a/storageprovider/storage_provider.go +++ b/storageprovider/storage_provider.go @@ -109,7 +109,7 @@ type StorageProvider interface { GetStorageCollateral(ctx context.Context, mAddr address.Address) (storagemarket.Balance, error) // ImportDataForDeal manually imports data for an offline storage deal - ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error + ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error // ImportPublishedDeal manually import published deals to storage deals ImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error @@ -139,6 +139,7 @@ type StorageProviderImpl struct { transferProcess IDatatransferHandler storageReceiver smnet.StorageReceiver minerMgr minermgr.IMinerMgr + pieceStorageMgr *piecestorage.PieceStorageManager } // NewStorageProvider returns a new storage provider @@ -172,7 +173,8 @@ func NewStorageProvider( dealStore: repo.StorageDealRepo(), - minerMgr: minerMgr, + minerMgr: minerMgr, + pieceStorageMgr: pieceStorageMgr, } dealProcess, err := NewStorageDealProcessImpl(mCtx, spV2.conns, newPeerTagger(spV2.net), spV2.spn, spV2.dealStore, spV2.storedAsk, tf, minerMgr, pieceStorageMgr, dataTransfer, dagStore, sdf, pb) @@ -261,7 +263,9 @@ func (p *StorageProviderImpl) Stop() error { // ImportDataForDeal manually imports data for an offline storage deal // It will verify that the data in the passed io.Reader matches the expected piece // cid for the given deal or it will error -func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error { +// If can find the car file from the piece store, read it directly without copying the car file to the local directory. +// If skipCommP is true, do not compare piece cid. +func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error { // TODO: be able to check if we have enough disk space d, err := p.dealStore.GetDeal(ctx, propCid) if err != nil { @@ -276,81 +280,116 @@ func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid return fmt.Errorf("deal %s does not support offline data", propCid) } - fs, err := p.tf(d.Proposal.Provider) - if err != nil { - return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err) - } + var r io.Reader + var carSize int64 + var piecePath filestore.Path + var cleanup = func() {} - tempfi, err := fs.CreateTemp() - if err != nil { - return fmt.Errorf("failed to create temp file for data import: %w", err) - } - defer func() { - if err := tempfi.Close(); err != nil { - log.Errorf("unable to close stream %v", err) + pieceStore, err := p.pieceStorageMgr.FindStorageForRead(ctx, d.Proposal.PieceCID.String()) + if err == nil { + log.Debugf("found %v already in piece storage", d.Proposal.PieceCID) + + // In order to avoid errors in the deal, the files in the piece store were deleted. + piecePath = filestore.Path("") + if carSize, err = pieceStore.Len(ctx, d.Proposal.PieceCID.String()); err != nil { + return fmt.Errorf("got piece size from piece store failed: %v", err) } - }() - cleanup := func() { - _ = tempfi.Close() - _ = fs.Delete(tempfi.Path()) - } + readerCloser, err := pieceStore.GetReaderCloser(ctx, d.Proposal.PieceCID.String()) + if err != nil { + return fmt.Errorf("got reader from piece store failed: %v", err) + } + r = readerCloser - log.Debugw("will copy imported file to local file", "propCid", propCid) - n, err := io.Copy(tempfi, data) - if err != nil { - cleanup() - return fmt.Errorf("importing deal data failed: %w", err) - } - log.Debugw("finished copying imported file to local file", "propCid", propCid) + defer func() { + if err = readerCloser.Close(); err != nil { + log.Errorf("unable to close piece storage: %v, %v", d.Proposal.PieceCID, err) + } + }() + } else { + log.Debugf("not found %s in piece storage", d.Proposal.PieceCID) - _ = n // TODO: verify n? + fs, err := p.tf(d.Proposal.Provider) + if err != nil { + return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err) + } - carSize := uint64(tempfi.Size()) + tempfi, err := fs.CreateTemp() + if err != nil { + return fmt.Errorf("failed to create temp file for data import: %w", err) + } + defer func() { + if err := tempfi.Close(); err != nil { + log.Errorf("unable to close stream %v", err) + } + }() + cleanup = func() { + _ = tempfi.Close() + _ = fs.Delete(tempfi.Path()) + } - _, err = tempfi.Seek(0, io.SeekStart) - if err != nil { - cleanup() - return fmt.Errorf("failed to seek through temp imported file: %w", err) - } + log.Debugw("will copy imported file to local file", "propCid", propCid) + n, err := io.Copy(tempfi, data) + if err != nil { + cleanup() + return fmt.Errorf("importing deal data failed: %w", err) + } + log.Debugw("finished copying imported file to local file", "propCid", propCid) - proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池? - if err != nil { - p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d) - cleanup() - return fmt.Errorf("failed to determine proof type: %w", err) - } - log.Debugw("fetched proof type", "propCid", propCid) + _ = n // TODO: verify n? - pieceCid, err := utils.GeneratePieceCommitment(proofType, tempfi, carSize) - if err != nil { - cleanup() - return fmt.Errorf("failed to generate commP: %w", err) - } - if carSizePadded := padreader.PaddedSize(carSize).Padded(); carSizePadded < d.Proposal.PieceSize { - // need to pad up! - rawPaddedCommp, err := commp.PadCommP( - // we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes - pieceCid.Hash()[len(pieceCid.Hash())-32:], - uint64(carSizePadded), - uint64(d.Proposal.PieceSize), - ) + carSize = tempfi.Size() + piecePath = tempfi.Path() + _, err = tempfi.Seek(0, io.SeekStart) if err != nil { cleanup() - return err + return fmt.Errorf("failed to seek through temp imported file: %w", err) } - pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp) + + r = tempfi } - // Verify CommP matches - if !pieceCid.Equals(d.Proposal.PieceCID) { - cleanup() - return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID) + if !skipCommP { + log.Debugf("will calculate piece cid") + + proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池? + if err != nil { + p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d) + cleanup() + return fmt.Errorf("failed to determine proof type: %w", err) + } + log.Debugw("fetched proof type", "propCid", propCid) + + pieceCid, err := utils.GeneratePieceCommitment(proofType, r, uint64(carSize)) + if err != nil { + cleanup() + return fmt.Errorf("failed to generate commP: %w", err) + } + if carSizePadded := padreader.PaddedSize(uint64(carSize)).Padded(); carSizePadded < d.Proposal.PieceSize { + // need to pad up! + rawPaddedCommp, err := commp.PadCommP( + // we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes + pieceCid.Hash()[len(pieceCid.Hash())-32:], + uint64(carSizePadded), + uint64(d.Proposal.PieceSize), + ) + if err != nil { + cleanup() + return err + } + pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp) + } + + // Verify CommP matches + if !pieceCid.Equals(d.Proposal.PieceCID) { + cleanup() + return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID) + } } log.Debugw("will fire ReserveProviderFunds for imported file", "propCid", propCid) // "will fire VerifiedData for imported file - d.PiecePath = tempfi.Path() + d.PiecePath = piecePath d.MetadataPath = filestore.Path("") log.Infof("deal %s piece path: %s", propCid, d.PiecePath)