Skip to content

Commit

Permalink
Merge pull request #338 from ipfs-force-community/feat/http-retrieve
Browse files Browse the repository at this point in the history
feat: support http retrieval
  • Loading branch information
simlecode committed Jun 15, 2023
1 parent a1a43dd commit 788e5fe
Show file tree
Hide file tree
Showing 17 changed files with 566 additions and 9 deletions.
74 changes: 74 additions & 0 deletions cli/retrieval-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import (
"text/tabwriter"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
mtypes "github.com/filecoin-project/venus-market/v2/types"
"github.com/filecoin-project/venus-market/v2/utils"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
)
Expand All @@ -20,6 +26,7 @@ var RetrievalCmds = &cli.Command{
retrievalDealsCmds,
retirevalAsksCmds,
retrievalDealSelectionCmds,
queryProtocols,
},
}

Expand Down Expand Up @@ -192,3 +199,70 @@ func outputRetrievalDeal(deal *market.ProviderDealState) error {

return nil
}

var queryProtocols = &cli.Command{
Name: "protocols",
Usage: "query retrieval support protocols",
ArgsUsage: "<miner>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() == 0 {
return fmt.Errorf("must pass miner")
}

api, closer, err := NewFullNode(cctx)
if err != nil {
return err
}
defer closer()

ctx := ReqContext(cctx)

miner, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
minerInfo, err := api.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return err
}
if minerInfo.PeerId == nil {
return fmt.Errorf("peer id is nil")
}

h, err := libp2p.New(
libp2p.Identity(nil),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
return err
}

addrs, err := utils.ConvertMultiaddr(minerInfo.Multiaddrs)
if err != nil {
return err
}
if err := h.Connect(ctx, peer.AddrInfo{ID: *minerInfo.PeerId, Addrs: addrs}); err != nil {
return err
}
stream, err := h.NewStream(ctx, *minerInfo.PeerId, mtypes.TransportsProtocolID)
if err != nil {
return fmt.Errorf("failed to open stream to peer: %w", err)
}
_ = stream.SetReadDeadline(time.Now().Add(time.Minute))
//nolint: errcheck
defer stream.SetReadDeadline(time.Time{})

// Read the response from the stream
queryResponsei, err := mtypes.BindnodeRegistry.TypeFromReader(stream, (*mtypes.QueryResponse)(nil), dagcbor.Decode)
if err != nil {
return fmt.Errorf("reading query response: %w", err)
}
queryResponse := queryResponsei.(*mtypes.QueryResponse)

for _, p := range queryResponse.Protocols {
fmt.Println(p)
}

return nil
},
}
2 changes: 1 addition & 1 deletion cmd/market-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,5 +313,5 @@ func marketClient(cctx *cli.Context) error {
apiHandles := []rpc.APIHandle{
{Path: "/rpc/v0", API: &marketCli},
}
return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh)
return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh, nil)
}
8 changes: 7 additions & 1 deletion cmd/venus-market/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/venus-market/v2/paychmgr"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/retrievalprovider"
"github.com/filecoin-project/venus-market/v2/retrievalprovider/httpretrieval"
"github.com/filecoin-project/venus-market/v2/rpc"
"github.com/filecoin-project/venus-market/v2/storageprovider"
types2 "github.com/filecoin-project/venus-market/v2/types"
Expand Down Expand Up @@ -239,6 +240,10 @@ func runDaemon(cctx *cli.Context) error {
if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil {
return fmt.Errorf("handle 'resource' failed: %w", err)
}
httpRetrievalServer, err := httpretrieval.NewServer(&cfg.PieceStorage)
if err != nil {
return err
}

var iMarket marketapiV1.IMarketStruct
permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket)
Expand All @@ -248,5 +253,6 @@ func runDaemon(cctx *cli.Context) error {
{Path: "/rpc/v1", API: api},
{Path: "/rpc/v0", API: v0api.WrapperV1IMarket{IMarket: api}},
}
return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh)

return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh, httpRetrievalServer)
}
5 changes: 5 additions & 0 deletions config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type ProviderConfig struct {
RetrievalPaymentAddress Address

DealPublishAddress []Address

// The public multi-address for retrieving deals with venus-market.
// Note: Must be in multiaddr format, eg /ip4/127.0.0.1/tcp/41235/http
HTTPRetrievalMultiaddr string
}

func defaultProviderConfig() *ProviderConfig {
Expand Down Expand Up @@ -153,5 +157,6 @@ func defaultProviderConfig() *ProviderConfig {

MaxPublishDealsFee: types.FIL(types.NewInt(0)),
MaxMarketBalanceAddFee: types.FIL(types.NewInt(0)),
HTTPRetrievalMultiaddr: "",
}
}
2 changes: 1 addition & 1 deletion docs/zh/订单过滤器.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## 详情

为了满足这些需求, 可以在 `Droplet` 的配置文件中给特定的 `miner` 配置一个订单过滤器, 该过滤器在配置文件中的表现形式是一个表示一个 `shell` 命令的字符串, 每当 `Droplet` 决定是否接受指向某个 `miner` 的订单时, 就会调用该命令, 并将 `订单的信息` ( json 字符串) 作为命令的参数 (标准输入) 传递给该命令. 如果命令退出码为 `0`, 则表示接受该订单, 否则拒绝该订单.
为了满足这些需求, 可以在 `venus-market` 的配置文件中给特定的 `miner` 配置一个订单过滤器, 该过滤器在配置文件中的表现形式是一个表示一个 `shell` 命令的字符串, 每当 `venus-market` 决定是否接受指向某个 `miner` 的订单时, 就会调用该命令, 并将 `订单的信息` ( json 字符串) 作为命令的参数 (标准输入) 传递给该命令. 如果命令退出码为 `0`, 则表示接受该订单, 否则拒绝该订单.

- exit with 0 : 接受订单
- exit with non-0 : 拒绝订单
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.11.0
github.com/filecoin-project/venus-auth v1.11.0
github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6
github.com/filecoin-project/venus-messager v1.11.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -95,6 +95,7 @@ require (
contrib.go.opencensus.io/exporter/jaeger v0.2.1 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/NYTimes/gziphandler v1.1.1
github.com/Stebalien/go-bitfield v0.0.1 // indirect
github.com/acobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 // indirect
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
Expand Down Expand Up @@ -290,7 +291,7 @@ require (
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.14.1 // indirect
go.uber.org/zap v1.23.0 // indirect
go.uber.org/zap v1.23.0
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.5.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -471,8 +473,8 @@ github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9
github.com/filecoin-project/venus v1.11.0 h1:cH7ydd+O2dw7zg8tKfeiuwVd5SokZ8TBu+WoBU60pAA=
github.com/filecoin-project/venus v1.11.0/go.mod h1:H8A3djsrHKRWuKnJI/8Y6xZRudbV9V2x5NIP8/PVPfQ=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.11.0 h1:9PBswWxc113vqaHABMcRyMm+1BtlJCwOFTPQJg/OVtQ=
github.com/filecoin-project/venus-auth v1.11.0/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6 h1:i2ONIcb+KFp5jJA2NNwibDmcoP7WTpIT/dWaBeZ1DHc=
github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
github.com/filecoin-project/venus-messager v1.11.0 h1:OJNSnWqhQl9PLzwRNR3huz49k+SU6pQ4DzEG4TXDqic=
github.com/filecoin-project/venus-messager v1.11.0/go.mod h1:dZlz/xrF2SREfmR3/6xp3AMP9FthCW68N3zEvBOnqM4=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
Expand Down
18 changes: 18 additions & 0 deletions retrievalprovider/httpretrieval/READMD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## http 检索

支持通过 piece cid 检索,通过直接去 piecestore 查找和读取 piece,然后返回结果。

### 配置

需要调整 `venus-market` 配置文件 `config.toml``HTTPRetrievalMultiaddr` 字段的值,参考下面示例:

```toml
[CommonProvider]
HTTPRetrievalMultiaddr = "/ip4/<ip>/tcp/41235/http"
```

> 上面配置中的 `ip` 是你本机的 IP 地址,`41235` 要确保和 `venus-market` 使用的端口一致。
### TODO

[filplus 提出的 HTTP V2 检索要求](https://github.com/data-preservation-programs/RetrievalBot/blob/main/filplus.md#http-v2)
158 changes: 158 additions & 0 deletions retrievalprovider/httpretrieval/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package httpretrieval

import (
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap"
)

var log = logging.Logger("httpserver")

type Server struct {
// path string
pieceMgr *piecestorage.PieceStorageManager
}

func NewServer(cfg *config.PieceStorage) (*Server, error) {
pieceMgr, err := piecestorage.NewPieceStorageManager(cfg)
if err != nil {
return nil, err
}

return &Server{pieceMgr: pieceMgr}, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RetrievalByPieceCID(w, r)
}

func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
pieceCID, err := convertPieceCID(r.URL.Path)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusBadRequest, err)
return
}

ctx := r.Context()
pieceCIDStr := pieceCID.String()
log := log.With("piece cid", pieceCIDStr)
log.Info("start retrieval deal")
store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}
mountReader, err := store.GetMountReader(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}

serveContent(w, r, mountReader, log)
log.Info("end retrieval deal")
}

func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
w.Header().Set("Content-Type", "application/piece")

var writer http.ResponseWriter

// http.ServeContent ignores errors when writing to the stream, so we
// replace the writer with a class that watches for errors
var err error
writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) {
err = e
}}

writer = writeErrWatcher //Need writeErrWatcher to be of type writeErrorWatcher for addCommas()

// Note that the last modified time is a constant value because the data
// in a piece identified by a cid will never change.
start := time.Now()
log.Infof("start %s\t %d\tGET %s", start, http.StatusOK, r.URL)
isGzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
if isGzipped {
// If Accept-Encoding header contains gzip then send a gzipped response
gzwriter := gziphandler.GzipResponseWriter{
ResponseWriter: writeErrWatcher,
}
// Close the writer to flush buffer
defer gzwriter.Close() // nolint
writer = &gzwriter
}

if r.Method == "HEAD" {
// For an HTTP HEAD request ServeContent doesn't send any data (just headers)
http.ServeContent(writer, r, "", time.Time{}, content)
log.Infof("%d\tHEAD %s", http.StatusOK, r.URL)
return
}

// Send the content
http.ServeContent(writer, r, "", time.Unix(1, 0), content)

// Write a line to the log
end := time.Now()
completeMsg := fmt.Sprintf("GET %s\t%s - %s: %s / %s transferred",
r.URL, end.Format(time.RFC3339), start.Format(time.RFC3339), time.Since(start),
fmt.Sprintf("%s (%d B)", types.SizeStr(types.NewInt(writeErrWatcher.count)), writeErrWatcher.count))
if isGzipped {
completeMsg += " (gzipped)"
}
if err == nil {
log.Infof("%s %s", completeMsg, "Done")
} else {
log.Warnf("%s %s\n%s", completeMsg, "FAIL", err)
}
}

func convertPieceCID(path string) (cid.Cid, error) {
l := len("/piece/")
if len(path) <= l {
return cid.Undef, fmt.Errorf("path %s too short", path)
}

cidStr := path[l:]
c, err := cid.Parse(cidStr)
if err != nil {
return cid.Undef, fmt.Errorf("parse piece cid failed: %s, %v", cidStr, err)
}

return c, nil
}

func badResponse(w http.ResponseWriter, code int, err error) {
w.WriteHeader(code)
w.Write([]byte("Error: " + err.Error())) // nolint
}

// writeErrorWatcher calls onError if there is an error writing to the writer
type writeErrorWatcher struct {
http.ResponseWriter
count uint64
onError func(err error)
}

func (w *writeErrorWatcher) Write(bz []byte) (int, error) {
count, err := w.ResponseWriter.Write(bz)
if err != nil {
w.onError(err)
}
w.count += uint64(count)
return count, err
}
Loading

0 comments on commit 788e5fe

Please sign in to comment.