Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pull request #338 from ipfs-force-community/feat/http-retrieve #339

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions cli/retrieval-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import (
"text/tabwriter"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
mtypes "github.com/filecoin-project/venus-market/v2/types"
"github.com/filecoin-project/venus-market/v2/utils"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
)
Expand All @@ -20,6 +26,7 @@ var RetrievalCmds = &cli.Command{
retrievalDealsCmds,
retirevalAsksCmds,
retrievalDealSelectionCmds,
queryProtocols,
},
}

Expand Down Expand Up @@ -192,3 +199,70 @@ func outputRetrievalDeal(deal *market.ProviderDealState) error {

return nil
}

var queryProtocols = &cli.Command{
Name: "protocols",
Usage: "query retrieval support protocols",
ArgsUsage: "<miner>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() == 0 {
return fmt.Errorf("must pass miner")
}

api, closer, err := NewFullNode(cctx)
if err != nil {
return err
}
defer closer()

ctx := ReqContext(cctx)

miner, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
minerInfo, err := api.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return err
}
if minerInfo.PeerId == nil {
return fmt.Errorf("peer id is nil")
}

h, err := libp2p.New(
libp2p.Identity(nil),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
return err
}

addrs, err := utils.ConvertMultiaddr(minerInfo.Multiaddrs)
if err != nil {
return err
}
if err := h.Connect(ctx, peer.AddrInfo{ID: *minerInfo.PeerId, Addrs: addrs}); err != nil {
return err
}
stream, err := h.NewStream(ctx, *minerInfo.PeerId, mtypes.TransportsProtocolID)
if err != nil {
return fmt.Errorf("failed to open stream to peer: %w", err)
}
_ = stream.SetReadDeadline(time.Now().Add(time.Minute))
//nolint: errcheck
defer stream.SetReadDeadline(time.Time{})

// Read the response from the stream
queryResponsei, err := mtypes.BindnodeRegistry.TypeFromReader(stream, (*mtypes.QueryResponse)(nil), dagcbor.Decode)
if err != nil {
return fmt.Errorf("reading query response: %w", err)
}
queryResponse := queryResponsei.(*mtypes.QueryResponse)

for _, p := range queryResponse.Protocols {
fmt.Println(p)
}

return nil
},
}
2 changes: 1 addition & 1 deletion cmd/market-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,5 +313,5 @@ func marketClient(cctx *cli.Context) error {
apiHandles := []rpc.APIHandle{
{Path: "/rpc/v0", API: &marketCli},
}
return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh)
return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh, nil)
}
8 changes: 7 additions & 1 deletion cmd/venus-market/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/venus-market/v2/paychmgr"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus-market/v2/retrievalprovider"
"github.com/filecoin-project/venus-market/v2/retrievalprovider/httpretrieval"
"github.com/filecoin-project/venus-market/v2/rpc"
"github.com/filecoin-project/venus-market/v2/storageprovider"
types2 "github.com/filecoin-project/venus-market/v2/types"
Expand Down Expand Up @@ -239,6 +240,10 @@ func runDaemon(cctx *cli.Context) error {
if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil {
return fmt.Errorf("handle 'resource' failed: %w", err)
}
httpRetrievalServer, err := httpretrieval.NewServer(&cfg.PieceStorage)
if err != nil {
return err
}

var iMarket marketapiV1.IMarketStruct
permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket)
Expand All @@ -248,5 +253,6 @@ func runDaemon(cctx *cli.Context) error {
{Path: "/rpc/v1", API: api},
{Path: "/rpc/v0", API: v0api.WrapperV1IMarket{IMarket: api}},
}
return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh)

return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh, httpRetrievalServer)
}
5 changes: 5 additions & 0 deletions config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type ProviderConfig struct {
RetrievalPaymentAddress Address

DealPublishAddress []Address

// The public multi-address for retrieving deals with venus-market.
// Note: Must be in multiaddr format, eg /ip4/127.0.0.1/tcp/41235/http
HTTPRetrievalMultiaddr string
}

func defaultProviderConfig() *ProviderConfig {
Expand Down Expand Up @@ -153,5 +157,6 @@ func defaultProviderConfig() *ProviderConfig {

MaxPublishDealsFee: types.FIL(types.NewInt(0)),
MaxMarketBalanceAddFee: types.FIL(types.NewInt(0)),
HTTPRetrievalMultiaddr: "",
}
}
2 changes: 1 addition & 1 deletion docs/zh/订单过滤器.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

## 详情

为了满足这些需求, 可以在 `Droplet` 的配置文件中给特定的 `miner` 配置一个订单过滤器, 该过滤器在配置文件中的表现形式是一个表示一个 `shell` 命令的字符串, 每当 `Droplet` 决定是否接受指向某个 `miner` 的订单时, 就会调用该命令, 并将 `订单的信息` ( json 字符串) 作为命令的参数 (标准输入) 传递给该命令. 如果命令退出码为 `0`, 则表示接受该订单, 否则拒绝该订单.
为了满足这些需求, 可以在 `venus-market` 的配置文件中给特定的 `miner` 配置一个订单过滤器, 该过滤器在配置文件中的表现形式是一个表示一个 `shell` 命令的字符串, 每当 `venus-market` 决定是否接受指向某个 `miner` 的订单时, 就会调用该命令, 并将 `订单的信息` ( json 字符串) 作为命令的参数 (标准输入) 传递给该命令. 如果命令退出码为 `0`, 则表示接受该订单, 否则拒绝该订单.

- exit with 0 : 接受订单
- exit with non-0 : 拒绝订单
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ require (
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.11.0
github.com/filecoin-project/venus-auth v1.11.0
github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6
github.com/filecoin-project/venus-messager v1.11.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -95,6 +95,7 @@ require (
contrib.go.opencensus.io/exporter/jaeger v0.2.1 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/NYTimes/gziphandler v1.1.1
github.com/Stebalien/go-bitfield v0.0.1 // indirect
github.com/acobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 // indirect
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
Expand Down Expand Up @@ -290,7 +291,7 @@ require (
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.14.1 // indirect
go.uber.org/zap v1.23.0 // indirect
go.uber.org/zap v1.23.0
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.5.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -471,8 +473,8 @@ github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9
github.com/filecoin-project/venus v1.11.0 h1:cH7ydd+O2dw7zg8tKfeiuwVd5SokZ8TBu+WoBU60pAA=
github.com/filecoin-project/venus v1.11.0/go.mod h1:H8A3djsrHKRWuKnJI/8Y6xZRudbV9V2x5NIP8/PVPfQ=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.11.0 h1:9PBswWxc113vqaHABMcRyMm+1BtlJCwOFTPQJg/OVtQ=
github.com/filecoin-project/venus-auth v1.11.0/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6 h1:i2ONIcb+KFp5jJA2NNwibDmcoP7WTpIT/dWaBeZ1DHc=
github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
github.com/filecoin-project/venus-messager v1.11.0 h1:OJNSnWqhQl9PLzwRNR3huz49k+SU6pQ4DzEG4TXDqic=
github.com/filecoin-project/venus-messager v1.11.0/go.mod h1:dZlz/xrF2SREfmR3/6xp3AMP9FthCW68N3zEvBOnqM4=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
Expand Down
18 changes: 18 additions & 0 deletions retrievalprovider/httpretrieval/READMD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## http 检索

支持通过 piece cid 检索,通过直接去 piecestore 查找和读取 piece,然后返回结果。

### 配置

需要调整 `venus-market` 配置文件 `config.toml` 中 `HTTPRetrievalMultiaddr` 字段的值,参考下面示例:

```toml
[CommonProvider]
HTTPRetrievalMultiaddr = "/ip4/<ip>/tcp/41235/http"
```

> 上面配置中的 `ip` 是你本机的 IP 地址,`41235` 要确保和 `venus-market` 使用的端口一致。

### TODO

[filplus 提出的 HTTP V2 检索要求](https://github.com/data-preservation-programs/RetrievalBot/blob/main/filplus.md#http-v2)
158 changes: 158 additions & 0 deletions retrievalprovider/httpretrieval/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package httpretrieval

import (
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/filecoin-project/venus-market/v2/config"
"github.com/filecoin-project/venus-market/v2/piecestorage"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap"
)

var log = logging.Logger("httpserver")

type Server struct {
// path string
pieceMgr *piecestorage.PieceStorageManager
}

func NewServer(cfg *config.PieceStorage) (*Server, error) {
pieceMgr, err := piecestorage.NewPieceStorageManager(cfg)
if err != nil {
return nil, err
}

return &Server{pieceMgr: pieceMgr}, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RetrievalByPieceCID(w, r)
}

func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
pieceCID, err := convertPieceCID(r.URL.Path)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusBadRequest, err)
return
}

ctx := r.Context()
pieceCIDStr := pieceCID.String()
log := log.With("piece cid", pieceCIDStr)
log.Info("start retrieval deal")
store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}
mountReader, err := store.GetMountReader(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}

serveContent(w, r, mountReader, log)
log.Info("end retrieval deal")
}

func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
w.Header().Set("Content-Type", "application/piece")

var writer http.ResponseWriter

// http.ServeContent ignores errors when writing to the stream, so we
// replace the writer with a class that watches for errors
var err error
writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) {
err = e
}}

writer = writeErrWatcher //Need writeErrWatcher to be of type writeErrorWatcher for addCommas()

// Note that the last modified time is a constant value because the data
// in a piece identified by a cid will never change.
start := time.Now()
log.Infof("start %s\t %d\tGET %s", start, http.StatusOK, r.URL)
isGzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
if isGzipped {
// If Accept-Encoding header contains gzip then send a gzipped response
gzwriter := gziphandler.GzipResponseWriter{
ResponseWriter: writeErrWatcher,
}
// Close the writer to flush buffer
defer gzwriter.Close() // nolint
writer = &gzwriter
}

if r.Method == "HEAD" {
// For an HTTP HEAD request ServeContent doesn't send any data (just headers)
http.ServeContent(writer, r, "", time.Time{}, content)
log.Infof("%d\tHEAD %s", http.StatusOK, r.URL)
return
}

// Send the content
http.ServeContent(writer, r, "", time.Unix(1, 0), content)

// Write a line to the log
end := time.Now()
completeMsg := fmt.Sprintf("GET %s\t%s - %s: %s / %s transferred",
r.URL, end.Format(time.RFC3339), start.Format(time.RFC3339), time.Since(start),
fmt.Sprintf("%s (%d B)", types.SizeStr(types.NewInt(writeErrWatcher.count)), writeErrWatcher.count))
if isGzipped {
completeMsg += " (gzipped)"
}
if err == nil {
log.Infof("%s %s", completeMsg, "Done")
} else {
log.Warnf("%s %s\n%s", completeMsg, "FAIL", err)
}
}

func convertPieceCID(path string) (cid.Cid, error) {
l := len("/piece/")
if len(path) <= l {
return cid.Undef, fmt.Errorf("path %s too short", path)
}

cidStr := path[l:]
c, err := cid.Parse(cidStr)
if err != nil {
return cid.Undef, fmt.Errorf("parse piece cid failed: %s, %v", cidStr, err)
}

return c, nil
}

func badResponse(w http.ResponseWriter, code int, err error) {
w.WriteHeader(code)
w.Write([]byte("Error: " + err.Error())) // nolint
}

// writeErrorWatcher calls onError if there is an error writing to the writer
type writeErrorWatcher struct {
http.ResponseWriter
count uint64
onError func(err error)
}

func (w *writeErrorWatcher) Write(bz []byte) (int, error) {
count, err := w.ResponseWriter.Write(bz)
if err != nil {
w.onError(err)
}
w.count += uint64(count)
return count, err
}
Loading