From c7fe2392ccd3ee96575993630324b203ec7e87f9 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 13 Mar 2024 14:31:33 +0800 Subject: [PATCH 1/5] fix: generate direct deal index --- dagstore/market_api.go | 31 ++++++++++-- models/badger/direct_deal.go | 48 +++++++++++++++++++ models/badger/direct_deal_test.go | 63 ++++++++++++++++++++++++ models/badger/testing.go | 1 + models/mysql/direct_deal.go | 33 +++++++++++++ models/mysql/direct_deal_test.go | 80 +++++++++++++++++++++++++++++++ models/repo/repo.go | 2 + 7 files changed, 253 insertions(+), 5 deletions(-) diff --git a/dagstore/market_api.go b/dagstore/market_api.go index d0330210..5b33724e 100644 --- a/dagstore/market_api.go +++ b/dagstore/market_api.go @@ -14,7 +14,9 @@ import ( "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/dagstore/throttle" + "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" gatewayAPIV2 "github.com/filecoin-project/venus/venus-shared/api/gateway/v2" marketMetrics "github.com/ipfs-force-community/droplet/v2/metrics" @@ -31,8 +33,9 @@ type MarketAPI interface { } type marketAPI struct { - pieceStorageMgr *piecestorage.PieceStorageManager - pieceRepo repo.StorageDealRepo + pieceStorageMgr *piecestorage.PieceStorageManager + repo repo.Repo + useTransient bool metricsCtx metrics.MetricsCtx gatewayMarketClient gatewayAPIV2.IMarketClient @@ -51,7 +54,7 @@ func NewMarketAPI( concurrency int) MarketAPI { return &marketAPI{ - pieceRepo: repo.StorageDealRepo(), + repo: repo, pieceStorageMgr: pieceStorageMgr, useTransient: useTransient, metricsCtx: ctx, @@ -65,6 +68,24 @@ func (m *marketAPI) Start(_ context.Context) error { return nil } +func (m *marketAPI) getPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) { + pieceInfo, err := m.repo.StorageDealRepo().GetPieceInfo(ctx, pieceCID) + if err == nil { + return pieceInfo, nil + } + + return m.repo.DirectDealRepo().GetPieceInfo(ctx, pieceCID) +} + +func (m *marketAPI) getPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) { + payloadSize, pieceSize, err := m.repo.StorageDealRepo().GetPieceSize(ctx, pieceCID) + if err == nil { + return payloadSize, pieceSize, nil + } + + return m.repo.DirectDealRepo().GetPieceSize(ctx, pieceCID) +} + func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) { _, err := m.pieceStorageMgr.FindStorageForRead(ctx, pieceCid.String()) if err != nil { @@ -79,7 +100,7 @@ func (m *marketAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, err } func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) (mount.Reader, error) { - payloadSize, pieceSize, err := m.pieceRepo.GetPieceSize(ctx, pieceCid) + payloadSize, pieceSize, err := m.getPieceSize(ctx, pieceCid) if err != nil { return nil, err } @@ -121,7 +142,7 @@ func (m *marketAPI) FetchFromPieceStorage(ctx context.Context, pieceCid cid.Cid) } func (m *marketAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) { - pieceInfo, err := m.pieceRepo.GetPieceInfo(ctx, pieceCid) + pieceInfo, err := m.getPieceInfo(ctx, pieceCid) if err != nil { return 0, fmt.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) } diff --git a/models/badger/direct_deal.go b/models/badger/direct_deal.go index ca553ab7..f9dd89a2 100644 --- a/models/badger/direct_deal.go +++ b/models/badger/direct_deal.go @@ -6,9 +6,12 @@ import ( "fmt" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-state-types/abi" types "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/google/uuid" "github.com/ipfs-force-community/droplet/v2/models/repo" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" ) @@ -88,6 +91,51 @@ func (r *directDealRepo) GetDealsByMinerAndState(ctx context.Context, miner addr return deals, nil } + +func (r *directDealRepo) GetPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) { + pieceInfo := piecestore.PieceInfo{ + PieceCID: pieceCID, + Deals: nil, + } + var err error + if err = travelJSONAbleDS(ctx, r.ds, func(deal *types.DirectDeal) (bool, error) { + if deal.PieceCID.Equals(pieceCID) { + pieceInfo.Deals = append(pieceInfo.Deals, piecestore.DealInfo{ + SectorID: deal.SectorID, + Offset: deal.Offset, + Length: deal.PieceSize, + }) + } + return false, nil + }); err != nil { + return nil, err + } + + if len(pieceInfo.Deals) == 0 { + err = repo.ErrNotFound + } + + return &pieceInfo, err +} + +func (r *directDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) { + var deal *types.DirectDeal + err := travelJSONAbleDS(ctx, r.ds, func(inDeal *types.DirectDeal) (stop bool, err error) { + if inDeal.PieceCID == pieceCID && inDeal.State != types.DealExpired { + deal = inDeal + return true, nil + } + return false, nil + }) + if err != nil { + return 0, 0, nil + } + if deal == nil { + return 0, 0, repo.ErrNotFound + } + return deal.PayloadSize, deal.PieceSize, nil +} + func (r *directDealRepo) ListDeal(ctx context.Context, params types.DirectDealQueryParams) ([]*types.DirectDeal, error) { var deals []*types.DirectDeal end := params.Limit + params.Offset diff --git a/models/badger/direct_deal_test.go b/models/badger/direct_deal_test.go index 6cb18aac..a406f3f2 100644 --- a/models/badger/direct_deal_test.go +++ b/models/badger/direct_deal_test.go @@ -6,9 +6,11 @@ import ( "time" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/venus/venus-shared/testutil" types "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/google/uuid" + "github.com/ipfs-force-community/droplet/v2/models/repo" "github.com/stretchr/testify/assert" ) @@ -109,3 +111,64 @@ func TestDirectDeal(t *testing.T) { assert.Len(t, res, 10) }) } + +func prepareDirectDealTest(t *testing.T) (context.Context, repo.DirectDealRepo, []types.DirectDeal) { + ctx := context.Background() + repo := setup(t) + r := repo.DirectDealRepo() + + dealCases := make([]types.DirectDeal, 10) + testutil.Provide(t, &dealCases) + dealCases[0].State = types.DealAllocation + return ctx, r, dealCases +} + +func TestGetDirectDealPieceInfo(t *testing.T) { + ctx, r, dealCases := prepareDirectDealTest(t) + + for _, deal := range dealCases { + err := r.SaveDeal(ctx, &deal) + assert.NoError(t, err) + } + + // refresh UpdatedAt + for i := range dealCases { + res, err := r.GetDeal(ctx, dealCases[i].ID) + assert.NoError(t, err) + dealCases[i].UpdatedAt = res.UpdatedAt + } + + res, err := r.GetPieceInfo(ctx, dealCases[0].PieceCID) + assert.NoError(t, err) + expect := piecestore.PieceInfo{ + PieceCID: dealCases[0].PieceCID, + Deals: nil, + } + expect.Deals = append(expect.Deals, piecestore.DealInfo{ + SectorID: dealCases[0].SectorID, + Offset: dealCases[0].Offset, + Length: dealCases[0].PieceSize, + }) + assert.Equal(t, expect, *res) +} + +func TestGetDirectDealPieceSize(t *testing.T) { + ctx, r, dealCases := prepareDirectDealTest(t) + + for _, deal := range dealCases { + err := r.SaveDeal(ctx, &deal) + assert.NoError(t, err) + } + + // refresh UpdatedAt + for i := range dealCases { + res, err := r.GetDeal(ctx, dealCases[i].ID) + assert.NoError(t, err) + dealCases[i].UpdatedAt = res.UpdatedAt + } + + PLSize, PSize, err := r.GetPieceSize(ctx, dealCases[0].PieceCID) + assert.NoError(t, err) + assert.Equal(t, dealCases[0].PieceSize, PSize) + assert.Equal(t, dealCases[0].PayloadSize, PLSize) +} diff --git a/models/badger/testing.go b/models/badger/testing.go index ab242c95..2be7da88 100644 --- a/models/badger/testing.go +++ b/models/badger/testing.go @@ -35,6 +35,7 @@ func WrapDbToRepo(db datastore.Batching) repo.Repo { RetrAskDs: NewRetrievalAskDS(NewRetrievalProviderDS(db)), CidInfoDs: NewCidInfoDs(NewPieceMetaDs(db)), RetrievalDealsDs: NewRetrievalDealsDS(NewRetrievalProviderDS(db)), + DirectDealsDs: NewDirectDealsDS(db), }) } diff --git a/models/mysql/direct_deal.go b/models/mysql/direct_deal.go index af22150f..73ca60ec 100644 --- a/models/mysql/direct_deal.go +++ b/models/mysql/direct_deal.go @@ -4,10 +4,12 @@ import ( "context" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-state-types/abi" types "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/google/uuid" "github.com/ipfs-force-community/droplet/v2/models/repo" + "github.com/ipfs/go-cid" "gorm.io/gorm" ) @@ -151,6 +153,37 @@ func (ddr *directDealRepo) GetDealsByMinerAndState(ctx context.Context, miner ad return out, nil } +func (ddr *directDealRepo) GetPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) { + var deals []*directDeal + if err := ddr.DB.WithContext(ctx).Table(directDealTableName).Find(&deals, "piece_cid = ?", pieceCID.String()).Error; err != nil { + return nil, err + } + + pieceInfo := piecestore.PieceInfo{ + PieceCID: pieceCID, + Deals: nil, + } + + for _, d := range deals { + pieceInfo.Deals = append(pieceInfo.Deals, piecestore.DealInfo{ + SectorID: abi.SectorNumber(d.SectorID), + Offset: abi.PaddedPieceSize(d.Offset), + Length: abi.PaddedPieceSize(d.PieceSize), + }) + } + return &pieceInfo, nil +} + +func (ddr *directDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) { + var deal directDeal + if err := ddr.WithContext(ctx).Table(directDealTableName).Take(&deal, "piece_cid = ? and state != ?", + pieceCID.String(), types.DealError).Error; err != nil { + return 0, 0, err + } + + return deal.PayloadSize, abi.PaddedPieceSize(deal.PieceSize), nil +} + func (ddr *directDealRepo) ListDeal(ctx context.Context, params types.DirectDealQueryParams) ([]*types.DirectDeal, error) { var deals []*directDeal diff --git a/models/mysql/direct_deal_test.go b/models/mysql/direct_deal_test.go index 0f2b1fad..034e3d5d 100644 --- a/models/mysql/direct_deal_test.go +++ b/models/mysql/direct_deal_test.go @@ -6,6 +6,8 @@ import ( "testing" "github.com/DATA-DOG/go-sqlmock" + "github.com/filecoin-project/go-fil-markets/piecestore" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/venus/venus-shared/testutil" types "github.com/filecoin-project/venus/venus-shared/types/market" "github.com/stretchr/testify/assert" @@ -141,3 +143,81 @@ func TestListDirectDeal(t *testing.T) { assert.NoError(t, closeDB(mock, sqlDB)) } + +func TestGetDirectDealPieceSize(t *testing.T) { + ctx := context.Background() + r, mock, sqlDB := setup(t) + + var deals []*types.DirectDeal + testutil.Provide(t, &deals) + dbDeals := make([]*directDeal, 0, len(deals)) + for _, deal := range deals { + dbDeals = append(dbDeals, fromDirectDeal(deal)) + } + + deal := deals[0] + dbDeal := dbDeals[0] + + db, err := getMysqlDryrunDB() + assert.NoError(t, err) + + rows, err := getFullRows(dbDeal) + assert.NoError(t, err) + + var nullDeal *directDeal + sql, vars, err := getSQL(db.Table(directDealTableName).Take(&nullDeal, "piece_cid = ? and state != ?", DBCid(deal.PieceCID).String(), types.DealError)) + assert.NoError(t, err) + + mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows) + + playLoadSize, paddedPieceSize, err := r.DirectDealRepo().GetPieceSize(ctx, deal.PieceCID) + assert.NoError(t, err) + assert.Equal(t, dbDeal.PayloadSize, playLoadSize) + assert.Equal(t, abi.PaddedPieceSize(dbDeal.PieceSize), paddedPieceSize) + + assert.NoError(t, closeDB(mock, sqlDB)) +} + +func TestGetDirectDealPieceInfo(t *testing.T) { + ctx := context.Background() + r, mock, sqlDB := setup(t) + + var deals []*types.DirectDeal + testutil.Provide(t, &deals) + dbDeals := make([]*directDeal, 0, len(deals)) + for _, deal := range deals { + dbDeals = append(dbDeals, fromDirectDeal(deal)) + } + + deal := deals[0] + dbDeal := dbDeals[0] + + db, err := getMysqlDryrunDB() + assert.NoError(t, err) + + rows, err := getFullRows(dbDeal) + assert.NoError(t, err) + + var nullDeal *directDeal + sql, vars, err := getSQL(db.Table(directDealTableName).Find(&nullDeal, "piece_cid = ?", DBCid(deal.PieceCID).String())) + assert.NoError(t, err) + + mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows) + + pInfo := &piecestore.PieceInfo{ + PieceCID: deal.PieceCID, + Deals: []piecestore.DealInfo{ + { + Offset: deal.Offset, + Length: deal.PieceSize, + SectorID: deal.SectorID, + }, + }, + } + + res, err := r.DirectDealRepo().GetPieceInfo(ctx, deal.PieceCID) + assert.NoError(t, err) + assert.Equal(t, pInfo, res) + + assert.NoError(t, closeDB(mock, sqlDB)) +} diff --git a/models/repo/repo.go b/models/repo/repo.go index b31c644f..ca33b415 100644 --- a/models/repo/repo.go +++ b/models/repo/repo.go @@ -139,6 +139,8 @@ type DirectDealRepo interface { GetDeal(ctx context.Context, id uuid.UUID) (*types.DirectDeal, error) GetDealByAllocationID(ctx context.Context, id uint64) (*types.DirectDeal, error) GetDealsByMinerAndState(ctx context.Context, miner address.Address, state types.DirectDealState) ([]*types.DirectDeal, error) + GetPieceInfo(ctx context.Context, pieceCID cid.Cid) (*piecestore.PieceInfo, error) + GetPieceSize(ctx context.Context, pieceCID cid.Cid) (uint64, abi.PaddedPieceSize, error) ListDeal(ctx context.Context, params types.DirectDealQueryParams) ([]*types.DirectDeal, error) } From ed81bd9c547f189eb24c9709e815899b9df291f0 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 13 Mar 2024 14:50:46 +0800 Subject: [PATCH 2/5] fix: return error message --- models/badger/direct_deal.go | 2 +- models/badger/storage_deal.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/models/badger/direct_deal.go b/models/badger/direct_deal.go index f9dd89a2..7f499b03 100644 --- a/models/badger/direct_deal.go +++ b/models/badger/direct_deal.go @@ -128,7 +128,7 @@ func (r *directDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) (ui return false, nil }) if err != nil { - return 0, 0, nil + return 0, 0, err } if deal == nil { return 0, 0, repo.ErrNotFound diff --git a/models/badger/storage_deal.go b/models/badger/storage_deal.go index 2ea17b29..e7002062 100644 --- a/models/badger/storage_deal.go +++ b/models/badger/storage_deal.go @@ -428,7 +428,7 @@ func (sdr *storageDealRepo) GetPieceSize(ctx context.Context, pieceCID cid.Cid) return false, nil }) if err != nil { - return 0, 0, nil + return 0, 0, err } if deal == nil { return 0, 0, repo.ErrNotFound From 5be480f882ce1b4ba03aa4d7093746ca5d7522b2 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:33:30 +0800 Subject: [PATCH 3/5] opt: output payload size --- cli/direct-deal.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/direct-deal.go b/cli/direct-deal.go index b411b9d1..b45fabc9 100644 --- a/cli/direct-deal.go +++ b/cli/direct-deal.go @@ -87,6 +87,7 @@ var getDirectDeal = &cli.Command{ {"SectorID", deal.SectorID}, {"Offset", deal.Offset}, {"Length", deal.Length}, + {"PayloadSize", deal.PayloadSize}, {"StartEpoch", deal.StartEpoch}, {"EndEpoch", deal.EndEpoch}, } From 49e19261c2b5ad3145bc5220c303da8189eefd26 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:33:49 +0800 Subject: [PATCH 4/5] fix: create time is zero --- models/badger/direct_deal.go | 1 + 1 file changed, 1 insertion(+) diff --git a/models/badger/direct_deal.go b/models/badger/direct_deal.go index 7f499b03..1b5cdd05 100644 --- a/models/badger/direct_deal.go +++ b/models/badger/direct_deal.go @@ -25,6 +25,7 @@ type directDealRepo struct { func (r *directDealRepo) SaveDeal(ctx context.Context, d *types.DirectDeal) error { key := keyFromID(d.ID) + d.TimeStamp = makeRefreshedTimeStamp(&d.TimeStamp) data, err := json.Marshal(d) if err != nil { return err From 9fab0c6e2938503cc00538571e6e76db7f187639 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:26:04 +0800 Subject: [PATCH 5/5] fix: reader is empty --- storageprovider/direct_deal_provider.go | 50 ++++++++++++++++--------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/storageprovider/direct_deal_provider.go b/storageprovider/direct_deal_provider.go index 697a92e1..bfa93b56 100644 --- a/storageprovider/direct_deal_provider.go +++ b/storageprovider/direct_deal_provider.go @@ -171,34 +171,39 @@ func (ddp *DirectDealProvider) importData(ctx context.Context, deal *types.Direc var r io.ReadCloser var carSize int64 - + var pieceStore piecestorage.IPieceStorage + var err error pieceCIDStr := deal.PieceCID.String() - pieceStore, err := ddp.pieceStorageMgr.FindStorageForRead(ctx, pieceCIDStr) - if err == nil { - directDealLog.Debugf("found %v already in piece storage", pieceCIDStr) - carSize, err = pieceStore.Len(ctx, pieceCIDStr) - if err != nil { - return fmt.Errorf("got piece size from piece store failed: %v", err) - } - readerCloser, err := pieceStore.GetReaderCloser(ctx, pieceCIDStr) - if err != nil { - return fmt.Errorf("got reader from piece store failed: %v", err) + getReader := func() (io.ReadCloser, error) { + pieceStore, err = ddp.pieceStorageMgr.FindStorageForRead(ctx, pieceCIDStr) + if err == nil { + directDealLog.Debugf("found %v already in piece storage", pieceCIDStr) + + carSize, err = pieceStore.Len(ctx, pieceCIDStr) + if err != nil { + return nil, fmt.Errorf("got piece size from piece store failed: %v", err) + } + readerCloser, err := pieceStore.GetReaderCloser(ctx, pieceCIDStr) + if err != nil { + return nil, fmt.Errorf("got reader from piece store failed: %v", err) + } + return readerCloser, nil } - r = readerCloser - } else { directDealLog.Debugf("not found %s in piece storage", pieceCIDStr) info, err := os.Stat(cParams.filePath) if err != nil { - return err + return nil, err } carSize = info.Size() - r, err = os.Open(cParams.filePath) - if err != nil { - return err - } + return os.Open(cParams.filePath) + } + + r, err = getReader() + if err != nil { + return err } deal.PayloadSize = uint64(carSize) @@ -222,6 +227,15 @@ func (ddp *DirectDealProvider) importData(ctx context.Context, deal *types.Direc if !pieceCid.Equals(deal.PieceCID) { return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, deal.PieceCID) } + + if err := r.Close(); err != nil { + log.Errorf("unable to close reader: %v, %v", pieceCIDStr, err) + } + + r, err = getReader() + if err != nil { + return err + } } // copy car file to piece storage