Skip to content

Commit

Permalink
Merge pull request #454 from ipfs-force-community/feat/filter-retriev…
Browse files Browse the repository at this point in the history
…al-deal

feat: filter retrieval by deal piece
  • Loading branch information
LinZexiao authored Oct 10, 2023
2 parents cec5837 + f39a4d3 commit 88e6432
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 4 deletions.
5 changes: 4 additions & 1 deletion cmd/droplet/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ func runDaemon(cctx *cli.Context) error {
if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil {
return fmt.Errorf("handle 'resource' failed: %w", err)
}
httpRetrievalServer := httpretrieval.NewServer(resAPI.PieceStorageMgr)
httpRetrievalServer, err := httpretrieval.NewServer(resAPI.PieceStorageMgr, resAPI)
if err != nil {
return err
}

var iMarket marketapiV1.IMarketStruct
permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket)
Expand Down
3 changes: 3 additions & 0 deletions models/badger/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ func (sdr *storageDealRepo) ListDeal(ctx context.Context, params *types.StorageD
deal.State == storagemarket.StorageDealExpired || deal.State == storagemarket.StorageDealError) {
return false, nil
}
if len(params.PieceCID) != 0 && deal.Proposal.PieceCID.String() != params.PieceCID {
return false, nil
}
if count >= params.Offset && count < end {
storageDeals = append(storageDeals, deal)
}
Expand Down
15 changes: 15 additions & 0 deletions models/badger/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func TestListDeal(t *testing.T) {
ctx, r, dealCases := prepareStorageDealTest(t)

peers := []peer.ID{peer.ID("1"), peer.ID("2")}
byPiece := make(map[string]int)
miner := []address.Address{dealCases[0].Proposal.Provider, testutil.AddressProvider()(t)}
states := []storagemarket.StorageDealStatus{
storagemarket.StorageDealAcceptWait,
Expand All @@ -315,6 +316,7 @@ func TestListDeal(t *testing.T) {
deal.State = states[i%4]
err := r.SaveDeal(ctx, &deal)
assert.NoError(t, err)
byPiece[deal.Proposal.PieceCID.String()]++
}

// refresh UpdatedAt and CreationTime
Expand Down Expand Up @@ -388,6 +390,19 @@ func TestListDeal(t *testing.T) {
})
assert.NoError(t, err)
assert.Len(t, deals, 2)

// test piece
for piece, count := range byPiece {
deals, err = r.ListDeal(ctx, &markettypes.StorageDealQueryParams{
Page: markettypes.Page{
Limit: 100,
},
PieceCID: piece,
})
assert.NoError(t, err)
assert.Len(t, deals, count)
}

}

func TestGetStoragePieceInfo(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions models/mysql/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,9 @@ func (sdr *storageDealRepo) ListDeal(ctx context.Context, params *types.StorageD
if params.State != nil {
query.Where("state = ?", params.State)
}
if len(params.PieceCID) != 0 {
query.Where("cdp_piece_cid = ?", params.PieceCID)
}
if discardFailedDeal {
states := []storagemarket.StorageDealStatus{storagemarket.StorageDealFailing,
storagemarket.StorageDealExpired, storagemarket.StorageDealError, storagemarket.StorageDealSlashed}
Expand Down
11 changes: 11 additions & 0 deletions models/mysql/storage_deal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,17 @@ func TestListDeal(t *testing.T) {
res, err = r.StorageDealRepo().ListDeal(ctx, &types.StorageDealQueryParams{Page: defPage, DiscardFailedDeal: true})
assert.NoError(t, err)
assert.Len(t, res, 2)

// test piece
piece := dbStorageDealCases[0].PieceCID.String()
rows, err = getFullRows(dbStorageDealCases[0])
assert.NoError(t, err)
sql, vars, err = getSQL(newQuery().Where("cdp_piece_cid = ?", piece).Limit(caseCount).Find(&storageDeals))
assert.NoError(t, err)
mock.ExpectQuery(regexp.QuoteMeta(sql)).WithArgs(vars...).WillReturnRows(rows)
res, err = r.StorageDealRepo().ListDeal(ctx, &types.StorageDealQueryParams{Page: defPage, PieceCID: piece})
assert.NoError(t, err)
assert.Len(t, res, 1)
}

func TestListDealByAddr(t *testing.T) {
Expand Down
38 changes: 36 additions & 2 deletions retrievalprovider/httpretrieval/server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package httpretrieval

import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/filecoin-project/go-fil-markets/storagemarket"
marketAPI "github.com/filecoin-project/venus/venus-shared/api/market/v1"
"github.com/filecoin-project/venus/venus-shared/types"
marketTypes "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -19,10 +23,11 @@ var log = logging.Logger("httpserver")

type Server struct {
pieceMgr *piecestorage.PieceStorageManager
api marketAPI.IMarket
}

func NewServer(pieceMgr *piecestorage.PieceStorageManager) *Server {
return &Server{pieceMgr: pieceMgr}
func NewServer(pieceMgr *piecestorage.PieceStorageManager, api marketAPI.IMarket) (*Server, error) {
return &Server{pieceMgr: pieceMgr, api: api}, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -41,9 +46,20 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
pieceCIDStr := pieceCID.String()
log := log.With("piece cid", pieceCIDStr)
log.Infof("start retrieval deal, Range: %s", r.Header.Get("Range"))

_, err = s.listDealsByPiece(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}

store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
// if errors.Is(err, piecestorage.ErrorNotFoundForRead) {
// todo: unseal data
// }
badResponse(w, http.StatusNotFound, err)
return
}
Expand All @@ -67,6 +83,24 @@ func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
log.Info("end retrieval deal")
}

func (s *Server) listDealsByPiece(ctx context.Context, piece string) ([]marketTypes.MinerDeal, error) {
activeState := storagemarket.StorageDealActive
p := &marketTypes.StorageDealQueryParams{
PieceCID: piece,
Page: marketTypes.Page{Limit: 100},
State: &activeState,
}
deals, err := s.api.MarketListIncompleteDeals(ctx, p)
if err != nil {
return nil, err
}
if len(deals) == 0 {
return nil, fmt.Errorf("not found deal")
}

return deals, nil
}

func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
Expand Down
29 changes: 28 additions & 1 deletion retrievalprovider/httpretrieval/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (
"testing"
"time"

"github.com/filecoin-project/venus/venus-shared/api/market/v1/mock"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/golang/mock/gomock"
"github.com/gorilla/mux"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -67,6 +72,8 @@ func TestRetrievalByPiece(t *testing.T) {
assert.NoError(t, config.SaveConfig(cfg))

pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq"
piece, err := cid.Decode(pieceStr)
assert.NoError(t, err)
buf := &bytes.Buffer{}
f, err := os.Create(filepath.Join(tmpDri, pieceStr))
assert.NoError(t, err)
Expand All @@ -79,7 +86,18 @@ func TestRetrievalByPiece(t *testing.T) {

pieceStorage, err := piecestorage.NewPieceStorageManager(&cfg.PieceStorage)
assert.NoError(t, err)
s := NewServer(pieceStorage)
ctrl := gomock.NewController(t)
m := mock.NewMockIMarket(ctrl)
m.EXPECT().MarketListIncompleteDeals(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, p *market.StorageDealQueryParams) ([]market.MinerDeal, error) {
if p.PieceCID != pieceStr {
return nil, fmt.Errorf("not found deal")
}
return append([]market.MinerDeal{}, market.MinerDeal{ClientDealProposal: types.ClientDealProposal{Proposal: types.DealProposal{PieceCID: piece}}}), nil
}).AnyTimes()

s, err := NewServer(pieceStorage, m)
assert.NoError(t, err)
port := "34897"
startHTTPServer(ctx, t, port, s)

Expand All @@ -93,6 +111,15 @@ func TestRetrievalByPiece(t *testing.T) {
data, err := io.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, buf.Bytes(), data)

// deal not exist
url = fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, "bafybeiakou6e7hnx4ms2yangplzl6viapsoyo6phlee6bwrg4j2xt37m3q")
req, err = http.NewRequest(http.MethodGet, url, nil)
assert.NoError(t, err)
resp, err = http.DefaultClient.Do(req)
assert.NoError(t, err)
defer resp.Body.Close() // nolint
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) {
Expand Down

0 comments on commit 88e6432

Please sign in to comment.