Skip to content

Commit

Permalink
impl the new piece store
Browse files Browse the repository at this point in the history
  • Loading branch information
ta0li committed Nov 11, 2021
1 parent 87ccc66 commit a284db5
Show file tree
Hide file tree
Showing 23 changed files with 477 additions and 707 deletions.
35 changes: 20 additions & 15 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,31 @@ import (
"context"
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"

mTypes "github.com/filecoin-project/venus-messager/types"

"github.com/ipfs-force-community/venus-gateway/marketevent"
types2 "github.com/ipfs-force-community/venus-gateway/types"

"github.com/filecoin-project/venus/app/submodule/apitypes"
vTypes "github.com/filecoin-project/venus/pkg/types"

"github.com/filecoin-project/venus-market/client"
"github.com/filecoin-project/venus-market/imports"
"github.com/filecoin-project/venus-market/piece"
"github.com/filecoin-project/venus-market/types"
"github.com/filecoin-project/venus-market/utils"
mTypes "github.com/filecoin-project/venus-messager/types"
"github.com/filecoin-project/venus/app/submodule/apitypes"
vTypes "github.com/filecoin-project/venus/pkg/types"
"github.com/ipfs-force-community/venus-gateway/marketevent"
types2 "github.com/ipfs-force-community/venus-gateway/types"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
)

//mock for gen
Expand Down Expand Up @@ -134,13 +139,13 @@ type MarketFullNode interface {
// DagstoreGC runs garbage collection on the DAG store.
DagstoreGC(ctx context.Context) ([]types.DagstoreShardResult, error) //perm:admin

//todo validate miner identify
GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*piece.DealInfo, error) //perm:read
AssignUnPackedDeals(spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) //perm:write
GetUnPackedDeals(ctx context.Context, miner address.Address, spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) //perm:read
MarkDealsAsPacking(ctx context.Context, miner address.Address, deals []abi.DealID) error //perm:write
UpdateDealOnPacking(ctx context.Context, miner address.Address, pieceCID cid.Cid, dealId abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error //perm:write
UpdateDealStatus(ctx context.Context, miner address.Address, dealId abi.DealID, status string) error //perm:write
MarkDealsAsPacking(ctx context.Context, miner address.Address, deals []abi.DealID) error //perm:write
UpdateDealOnPacking(ctx context.Context, miner address.Address, dealID abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error //perm:write
UpdateDealStatus(ctx context.Context, miner address.Address, dealID abi.DealID, pieceStatus string) error //perm:write
GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*piece.DealInfo, error) //perm:read
GetUnPackedDeals(ctx context.Context, miner address.Address, spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) //perm:read
AssignUnPackedDeals(ctx context.Context, miner address.Address, ssize abi.SectorSize, spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) //perm:write

//market event
ResponseMarketEvent(ctx context.Context, resp *types2.ResponseEvent) error //perm:read
ListenMarketEvent(ctx context.Context, policy *marketevent.MarketRegisterPolicy) (<-chan *types2.RequestEvent, error) //perm:read
Expand Down
18 changes: 9 additions & 9 deletions api/impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type MarketNodeImpl struct {
RetrievalAskHandler retrievaladapter.IAskHandler
DataTransfer network.ProviderDataTransfer
DealPublisher *storageadapter2.DealPublisher
PieceStore piece.ExtendPieceStore
PieceStore piece.PieceStoreEx
Messager clients2.IMessager `optional:"true"`
DAGStore *dagstore.DAGStore
PieceStorage piece.IPieceStorage
Expand Down Expand Up @@ -630,23 +630,23 @@ func (m MarketNodeImpl) DagstoreGC(ctx context.Context) ([]types.DagstoreShardRe
}

func (m MarketNodeImpl) GetUnPackedDeals(ctx context.Context, miner address.Address, spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) {
return m.PieceStore.GetUnPackedDeals(spec)
return m.PieceStore.GetUnPackedDeals(ctx, miner, spec)
}

func (m MarketNodeImpl) AssignUnPackedDeals(spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) {
return m.PieceStore.AssignUnPackedDeals(spec)
func (m MarketNodeImpl) AssignUnPackedDeals(ctx context.Context, miner address.Address, ssize abi.SectorSize, spec *piece.GetDealSpec) ([]*piece.DealInfoIncludePath, error) {
return m.PieceStore.AssignUnPackedDeals(ctx, miner, ssize, spec)
}

func (m MarketNodeImpl) MarkDealsAsPacking(ctx context.Context, miner address.Address, deals []abi.DealID) error {
return m.PieceStore.MarkDealsAsPacking(deals)
return m.PieceStore.MarkDealsAsPacking(ctx, miner, deals)
}

func (m MarketNodeImpl) UpdateDealOnPacking(ctx context.Context, miner address.Address, pieceCID cid.Cid, dealId abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error {
return m.PieceStore.UpdateDealOnPacking(pieceCID, dealId, sectorid, offset)
func (m MarketNodeImpl) UpdateDealOnPacking(ctx context.Context, miner address.Address, dealId abi.DealID, sectorid abi.SectorNumber, offset abi.PaddedPieceSize) error {
return m.PieceStore.UpdateDealOnPacking(ctx, miner, dealId, sectorid, offset)
}

func (m MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Address, dealId abi.DealID, status string) error {
return m.PieceStore.UpdateDealStatus(dealId, status)
return m.PieceStore.UpdateDealStatus(ctx, miner, dealId, status)
}

func (m MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string) error {
Expand All @@ -660,7 +660,7 @@ func (m MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid
}

func (m MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*piece.DealInfo, error) {
return m.PieceStore.GetDeals(pageIndex, pageSize)
return m.PieceStore.GetDeals(ctx, miner, pageIndex, pageSize)
}

func (m MarketNodeImpl) PaychVoucherList(ctx context.Context, pch address.Address) ([]*paych.SignedVoucher, error) {
Expand Down
18 changes: 9 additions & 9 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions cli/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,6 @@ var dealsListCmd = &cli.Command{
return nil
}

// TODO: 判断miner在矿池中是否存在

ctx := DaemonContext(cctx)

deals, err := api.MarketListIncompleteDeals(ctx, maddr)
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const DefaultDAGStoreDir = "dagstore"
type API struct {
fx.In

Full apiface.FullNode
Full apiface.FullNode

SMDealClient storagemarket.StorageClient
RetDiscovery discovery.PeerResolver
Expand Down
12 changes: 6 additions & 6 deletions dagstore/market_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ type MarketAPI interface {
}

type marketAPI struct {
pieceStorage piece.IPieceStorage
pieceRepo repo.StorageDealRepo
throttle throttle.Throttler
pieceStorage piece.IPieceStorage
pieceRepo repo.StorageDealRepo
throttle throttle.Throttler
}

var _ MarketAPI = (*marketAPI)(nil)

func NewMinerAPI(repo repo.Repo, pieceStorage piece.IPieceStorage, concurrency int) MarketAPI {
return &marketAPI{
pieceRepo: repo.StorageDealRepo(),
pieceStorage: pieceStorage,
throttle: throttle.Fixed(concurrency),
pieceRepo: repo.StorageDealRepo(),
pieceStorage: pieceStorage,
throttle: throttle.Fixed(concurrency),
}
}

Expand Down
22 changes: 12 additions & 10 deletions models/badger/cid_info.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
package badger

import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-statestore"

"github.com/filecoin-project/venus-market/models/repo"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("badgerpieces")

func NewBadgerCidInfoRepo(cidInfoDs repo.CIDInfoDS) repo.ICidInfoRepo {
return &baderCidInfoRepo{cidInfos: statestore.New(cidInfoDs)}
return &badgerCidInfoRepo{cidInfos: statestore.New(cidInfoDs)}
}

type baderCidInfoRepo struct {
type badgerCidInfoRepo struct {
cidInfos *statestore.StateStore
}

var _ repo.ICidInfoRepo = (*baderCidInfoRepo)(nil)
var _ repo.ICidInfoRepo = (*badgerCidInfoRepo)(nil)

// Store the map of blockLocations in the PieceStore's CIDInfo store, with key `pieceCID`
func (ps *baderCidInfoRepo) AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error {
func (ps *badgerCidInfoRepo) AddPieceBlockLocations(pieceCID cid.Cid, blockLocations map[cid.Cid]piecestore.BlockLocation) error {
for c, blockLocation := range blockLocations {
err := ps.mutateCIDInfo(c, func(ci *piecestore.CIDInfo) error {
for _, pbl := range ci.PieceBlockLocations {
Expand All @@ -39,7 +41,7 @@ func (ps *baderCidInfoRepo) AddPieceBlockLocations(pieceCID cid.Cid, blockLocati
return nil
}

func (ps *baderCidInfoRepo) ListCidInfoKeys() ([]cid.Cid, error) {
func (ps *badgerCidInfoRepo) ListCidInfoKeys() ([]cid.Cid, error) {
var cis []piecestore.CIDInfo
if err := ps.cidInfos.List(&cis); err != nil {
return nil, err
Expand All @@ -54,15 +56,15 @@ func (ps *baderCidInfoRepo) ListCidInfoKeys() ([]cid.Cid, error) {
}

// Retrieve the CIDInfo associated with `pieceCID` from the CID info store.
func (ps *baderCidInfoRepo) GetCIDInfo(payloadCID cid.Cid) (piecestore.CIDInfo, error) {
func (ps *badgerCidInfoRepo) GetCIDInfo(payloadCID cid.Cid) (piecestore.CIDInfo, error) {
var out piecestore.CIDInfo
if err := ps.cidInfos.Get(payloadCID).Get(&out); err != nil {
return piecestore.CIDInfo{}, err
}
return out, nil
}

func (ps *baderCidInfoRepo) ensureCIDInfo(c cid.Cid) error {
func (ps *badgerCidInfoRepo) ensureCIDInfo(c cid.Cid) error {
has, err := ps.cidInfos.Has(c)

if err != nil {
Expand All @@ -77,7 +79,7 @@ func (ps *baderCidInfoRepo) ensureCIDInfo(c cid.Cid) error {
return ps.cidInfos.Begin(c, &cidInfo)
}

func (ps *baderCidInfoRepo) mutateCIDInfo(c cid.Cid, mutator interface{}) error {
func (ps *badgerCidInfoRepo) mutateCIDInfo(c cid.Cid, mutator interface{}) error {
err := ps.ensureCIDInfo(c)
if err != nil {
return err
Expand Down
69 changes: 69 additions & 0 deletions models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package badger

import (
"bytes"
"errors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/venus-market/types"
xerrors "github.com/pkg/errors"

cborrpc "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-statestore"
Expand Down Expand Up @@ -101,3 +104,69 @@ func (dsr *storageDealRepo) travelDeals(travelFn func(deal *types.MinerDeal) err
}
return nil
}

func (dsr *storageDealRepo) ListPieceInfoKeys() ([]cid.Cid, error) {
var cidsMap = make(map[cid.Cid]interface{})
err := dsr.travelDeals(
func(deal *types.MinerDeal) error {
cidsMap[deal.ClientDealProposal.Proposal.PieceCID] = nil
return nil
})
if err != nil {
return nil, err
}

cids := make([]cid.Cid, len(cidsMap))
idx := 0
for cid, _ := range cidsMap {
cids[idx] = cid
idx++
}
return cids, nil
}

var justWantStopTravelErr = errors.New("stop travel")

func (dsr *storageDealRepo) GetDealByDealID(mAddr address.Address, dealID abi.DealID) (*types.MinerDeal, error) {
var deal *types.MinerDeal
var err error
if err = dsr.travelDeals(
func(inDeal *types.MinerDeal) error {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.DealID == dealID {
deal = inDeal
return xerrors.Errorf("find the deal, so stop:%w", justWantStopTravelErr)
}
return nil
}); err != nil {
if xerrors.Is(err, justWantStopTravelErr) {
return deal, nil
}
return nil, err
}

return nil, repo.ErrNotFound
}

func (dsr *storageDealRepo) GetDeals(mAddr address.Address, pageIndex, pageSize int) ([]*types.MinerDeal, error) {
return nil, nil
}

func (dsr *storageDealRepo) GetDealsByPieceStatus(mAddr address.Address, pieceStatus string) ([]*types.MinerDeal, error) {
var deals []*types.MinerDeal
var err error
if err = dsr.travelDeals(
func(inDeal *types.MinerDeal) error {
if inDeal.ClientDealProposal.Proposal.Provider == mAddr && inDeal.PieceStatus == pieceStatus {
deals = append(deals, inDeal)
return xerrors.Errorf("find the deal, so stop:%w", justWantStopTravelErr)
}
return nil
}); err != nil {
if xerrors.Is(err, justWantStopTravelErr) {
return deals, nil
}
return nil, err
}

return nil, repo.ErrNotFound
}
Loading

0 comments on commit a284db5

Please sign in to comment.