From 788e5fecf07e9928973a093f4cf59ff7f744760c Mon Sep 17 00:00:00 2001 From: tom <69969590+simlecode@users.noreply.github.com> Date: Thu, 15 Jun 2023 09:39:35 +0800 Subject: [PATCH] Merge pull request #338 from ipfs-force-community/feat/http-retrieve feat: support http retrieval --- cli/retrieval-deals.go | 74 ++++++++ cmd/market-client/main.go | 2 +- cmd/venus-market/run.go | 8 +- config/common.go | 5 + ...25\350\277\207\346\273\244\345\231\250.md" | 2 +- go.mod | 5 +- go.sum | 6 +- retrievalprovider/httpretrieval/READMD.md | 18 ++ retrievalprovider/httpretrieval/server.go | 158 ++++++++++++++++++ .../httpretrieval/server_test.go | 119 +++++++++++++ retrievalprovider/modules.go | 1 + retrievalprovider/provider.go | 6 + retrievalprovider/transports.go | 79 +++++++++ rpc/rpc.go | 7 + types/transports.go | 57 +++++++ types/transports.ipldsch | 15 ++ utils/converters.go | 13 +- 17 files changed, 566 insertions(+), 9 deletions(-) create mode 100644 retrievalprovider/httpretrieval/READMD.md create mode 100644 retrievalprovider/httpretrieval/server.go create mode 100644 retrievalprovider/httpretrieval/server_test.go create mode 100644 retrievalprovider/transports.go create mode 100644 types/transports.go create mode 100644 types/transports.ipldsch diff --git a/cli/retrieval-deals.go b/cli/retrieval-deals.go index 5f0821cd..5cf3583e 100644 --- a/cli/retrieval-deals.go +++ b/cli/retrieval-deals.go @@ -7,8 +7,14 @@ import ( "text/tabwriter" "time" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + mtypes "github.com/filecoin-project/venus-market/v2/types" + "github.com/filecoin-project/venus-market/v2/utils" + "github.com/filecoin-project/venus/venus-shared/types" "github.com/filecoin-project/venus/venus-shared/types/market" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/peer" "github.com/urfave/cli/v2" ) @@ -20,6 +26,7 @@ var RetrievalCmds = &cli.Command{ retrievalDealsCmds, retirevalAsksCmds, retrievalDealSelectionCmds, + queryProtocols, }, } @@ -192,3 +199,70 @@ func outputRetrievalDeal(deal *market.ProviderDealState) error { return nil } + +var queryProtocols = &cli.Command{ + Name: "protocols", + Usage: "query retrieval support protocols", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() == 0 { + return fmt.Errorf("must pass miner") + } + + api, closer, err := NewFullNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + miner, err := address.NewFromString(cctx.Args().First()) + if err != nil { + return err + } + minerInfo, err := api.StateMinerInfo(ctx, miner, types.EmptyTSK) + if err != nil { + return err + } + if minerInfo.PeerId == nil { + return fmt.Errorf("peer id is nil") + } + + h, err := libp2p.New( + libp2p.Identity(nil), + libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"), + ) + if err != nil { + return err + } + + addrs, err := utils.ConvertMultiaddr(minerInfo.Multiaddrs) + if err != nil { + return err + } + if err := h.Connect(ctx, peer.AddrInfo{ID: *minerInfo.PeerId, Addrs: addrs}); err != nil { + return err + } + stream, err := h.NewStream(ctx, *minerInfo.PeerId, mtypes.TransportsProtocolID) + if err != nil { + return fmt.Errorf("failed to open stream to peer: %w", err) + } + _ = stream.SetReadDeadline(time.Now().Add(time.Minute)) + //nolint: errcheck + defer stream.SetReadDeadline(time.Time{}) + + // Read the response from the stream + queryResponsei, err := mtypes.BindnodeRegistry.TypeFromReader(stream, (*mtypes.QueryResponse)(nil), dagcbor.Decode) + if err != nil { + return fmt.Errorf("reading query response: %w", err) + } + queryResponse := queryResponsei.(*mtypes.QueryResponse) + + for _, p := range queryResponse.Protocols { + fmt.Println(p) + } + + return nil + }, +} diff --git a/cmd/market-client/main.go b/cmd/market-client/main.go index 5c0fa744..f7d0e1e7 100644 --- a/cmd/market-client/main.go +++ b/cmd/market-client/main.go @@ -313,5 +313,5 @@ func marketClient(cctx *cli.Context) error { apiHandles := []rpc.APIHandle{ {Path: "/rpc/v0", API: &marketCli}, } - return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh) + return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh, nil) } diff --git a/cmd/venus-market/run.go b/cmd/venus-market/run.go index 9fbf75b9..382c6244 100644 --- a/cmd/venus-market/run.go +++ b/cmd/venus-market/run.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/venus-market/v2/paychmgr" "github.com/filecoin-project/venus-market/v2/piecestorage" "github.com/filecoin-project/venus-market/v2/retrievalprovider" + "github.com/filecoin-project/venus-market/v2/retrievalprovider/httpretrieval" "github.com/filecoin-project/venus-market/v2/rpc" "github.com/filecoin-project/venus-market/v2/storageprovider" types2 "github.com/filecoin-project/venus-market/v2/types" @@ -239,6 +240,10 @@ func runDaemon(cctx *cli.Context) error { if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil { return fmt.Errorf("handle 'resource' failed: %w", err) } + httpRetrievalServer, err := httpretrieval.NewServer(&cfg.PieceStorage) + if err != nil { + return err + } var iMarket marketapiV1.IMarketStruct permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket) @@ -248,5 +253,6 @@ func runDaemon(cctx *cli.Context) error { {Path: "/rpc/v1", API: api}, {Path: "/rpc/v0", API: v0api.WrapperV1IMarket{IMarket: api}}, } - return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh) + + return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh, httpRetrievalServer) } diff --git a/config/common.go b/config/common.go index d7866dee..71f149b3 100644 --- a/config/common.go +++ b/config/common.go @@ -118,6 +118,10 @@ type ProviderConfig struct { RetrievalPaymentAddress Address DealPublishAddress []Address + + // The public multi-address for retrieving deals with venus-market. + // Note: Must be in multiaddr format, eg /ip4/127.0.0.1/tcp/41235/http + HTTPRetrievalMultiaddr string } func defaultProviderConfig() *ProviderConfig { @@ -153,5 +157,6 @@ func defaultProviderConfig() *ProviderConfig { MaxPublishDealsFee: types.FIL(types.NewInt(0)), MaxMarketBalanceAddFee: types.FIL(types.NewInt(0)), + HTTPRetrievalMultiaddr: "", } } diff --git "a/docs/zh/\350\256\242\345\215\225\350\277\207\346\273\244\345\231\250.md" "b/docs/zh/\350\256\242\345\215\225\350\277\207\346\273\244\345\231\250.md" index 5331c7a0..6d62ca57 100644 --- "a/docs/zh/\350\256\242\345\215\225\350\277\207\346\273\244\345\231\250.md" +++ "b/docs/zh/\350\256\242\345\215\225\350\277\207\346\273\244\345\231\250.md" @@ -6,7 +6,7 @@ ## 详情 -为了满足这些需求, 可以在 `Droplet` 的配置文件中给特定的 `miner` 配置一个订单过滤器, 该过滤器在配置文件中的表现形式是一个表示一个 `shell` 命令的字符串, 每当 `Droplet` 决定是否接受指向某个 `miner` 的订单时, 就会调用该命令, 并将 `订单的信息` ( json 字符串) 作为命令的参数 (标准输入) 传递给该命令. 如果命令退出码为 `0`, 则表示接受该订单, 否则拒绝该订单. +为了满足这些需求, 可以在 `venus-market` 的配置文件中给特定的 `miner` 配置一个订单过滤器, 该过滤器在配置文件中的表现形式是一个表示一个 `shell` 命令的字符串, 每当 `venus-market` 决定是否接受指向某个 `miner` 的订单时, 就会调用该命令, 并将 `订单的信息` ( json 字符串) 作为命令的参数 (标准输入) 传递给该命令. 如果命令退出码为 `0`, 则表示接受该订单, 否则拒绝该订单. - exit with 0 : 接受订单 - exit with non-0 : 拒绝订单 diff --git a/go.mod b/go.mod index f508e6e1..321245cc 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v7 v7.0.1 github.com/filecoin-project/venus v1.11.0 - github.com/filecoin-project/venus-auth v1.11.0 + github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6 github.com/filecoin-project/venus-messager v1.11.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 @@ -95,6 +95,7 @@ require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect github.com/DataDog/zstd v1.4.5 // indirect + github.com/NYTimes/gziphandler v1.1.1 github.com/Stebalien/go-bitfield v0.0.1 // indirect github.com/acobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 // indirect github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect @@ -290,7 +291,7 @@ require ( go.opentelemetry.io/otel/trace v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.14.1 // indirect - go.uber.org/zap v1.23.0 // indirect + go.uber.org/zap v1.23.0 go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.5.0 // indirect diff --git a/go.sum b/go.sum index 0d3f8721..8c0a0dd5 100644 --- a/go.sum +++ b/go.sum @@ -88,6 +88,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= +github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -471,8 +473,8 @@ github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9 github.com/filecoin-project/venus v1.11.0 h1:cH7ydd+O2dw7zg8tKfeiuwVd5SokZ8TBu+WoBU60pAA= github.com/filecoin-project/venus v1.11.0/go.mod h1:H8A3djsrHKRWuKnJI/8Y6xZRudbV9V2x5NIP8/PVPfQ= github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU= -github.com/filecoin-project/venus-auth v1.11.0 h1:9PBswWxc113vqaHABMcRyMm+1BtlJCwOFTPQJg/OVtQ= -github.com/filecoin-project/venus-auth v1.11.0/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs= +github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6 h1:i2ONIcb+KFp5jJA2NNwibDmcoP7WTpIT/dWaBeZ1DHc= +github.com/filecoin-project/venus-auth v1.11.1-0.20230614101152-9baf14ab8db6/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs= github.com/filecoin-project/venus-messager v1.11.0 h1:OJNSnWqhQl9PLzwRNR3huz49k+SU6pQ4DzEG4TXDqic= github.com/filecoin-project/venus-messager v1.11.0/go.mod h1:dZlz/xrF2SREfmR3/6xp3AMP9FthCW68N3zEvBOnqM4= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= diff --git a/retrievalprovider/httpretrieval/READMD.md b/retrievalprovider/httpretrieval/READMD.md new file mode 100644 index 00000000..8c219c61 --- /dev/null +++ b/retrievalprovider/httpretrieval/READMD.md @@ -0,0 +1,18 @@ +## http 检索 + +支持通过 piece cid 检索,通过直接去 piecestore 查找和读取 piece,然后返回结果。 + +### 配置 + +需要调整 `venus-market` 配置文件 `config.toml` 中 `HTTPRetrievalMultiaddr` 字段的值,参考下面示例: + +```toml +[CommonProvider] + HTTPRetrievalMultiaddr = "/ip4//tcp/41235/http" +``` + +> 上面配置中的 `ip` 是你本机的 IP 地址,`41235` 要确保和 `venus-market` 使用的端口一致。 + +### TODO + +[filplus 提出的 HTTP V2 检索要求](https://github.com/data-preservation-programs/RetrievalBot/blob/main/filplus.md#http-v2) diff --git a/retrievalprovider/httpretrieval/server.go b/retrievalprovider/httpretrieval/server.go new file mode 100644 index 00000000..c8d29ec1 --- /dev/null +++ b/retrievalprovider/httpretrieval/server.go @@ -0,0 +1,158 @@ +package httpretrieval + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/NYTimes/gziphandler" + "github.com/filecoin-project/venus-market/v2/config" + "github.com/filecoin-project/venus-market/v2/piecestorage" + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "go.uber.org/zap" +) + +var log = logging.Logger("httpserver") + +type Server struct { + // path string + pieceMgr *piecestorage.PieceStorageManager +} + +func NewServer(cfg *config.PieceStorage) (*Server, error) { + pieceMgr, err := piecestorage.NewPieceStorageManager(cfg) + if err != nil { + return nil, err + } + + return &Server{pieceMgr: pieceMgr}, nil +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.RetrievalByPieceCID(w, r) +} + +func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) { + pieceCID, err := convertPieceCID(r.URL.Path) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusBadRequest, err) + return + } + + ctx := r.Context() + pieceCIDStr := pieceCID.String() + log := log.With("piece cid", pieceCIDStr) + log.Info("start retrieval deal") + store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusNotFound, err) + return + } + mountReader, err := store.GetMountReader(ctx, pieceCIDStr) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusNotFound, err) + return + } + + serveContent(w, r, mountReader, log) + log.Info("end retrieval deal") +} + +func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) { + // Set the Content-Type header explicitly so that http.ServeContent doesn't + // try to do it implicitly + w.Header().Set("Content-Type", "application/piece") + + var writer http.ResponseWriter + + // http.ServeContent ignores errors when writing to the stream, so we + // replace the writer with a class that watches for errors + var err error + writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) { + err = e + }} + + writer = writeErrWatcher //Need writeErrWatcher to be of type writeErrorWatcher for addCommas() + + // Note that the last modified time is a constant value because the data + // in a piece identified by a cid will never change. + start := time.Now() + log.Infof("start %s\t %d\tGET %s", start, http.StatusOK, r.URL) + isGzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") + if isGzipped { + // If Accept-Encoding header contains gzip then send a gzipped response + gzwriter := gziphandler.GzipResponseWriter{ + ResponseWriter: writeErrWatcher, + } + // Close the writer to flush buffer + defer gzwriter.Close() // nolint + writer = &gzwriter + } + + if r.Method == "HEAD" { + // For an HTTP HEAD request ServeContent doesn't send any data (just headers) + http.ServeContent(writer, r, "", time.Time{}, content) + log.Infof("%d\tHEAD %s", http.StatusOK, r.URL) + return + } + + // Send the content + http.ServeContent(writer, r, "", time.Unix(1, 0), content) + + // Write a line to the log + end := time.Now() + completeMsg := fmt.Sprintf("GET %s\t%s - %s: %s / %s transferred", + r.URL, end.Format(time.RFC3339), start.Format(time.RFC3339), time.Since(start), + fmt.Sprintf("%s (%d B)", types.SizeStr(types.NewInt(writeErrWatcher.count)), writeErrWatcher.count)) + if isGzipped { + completeMsg += " (gzipped)" + } + if err == nil { + log.Infof("%s %s", completeMsg, "Done") + } else { + log.Warnf("%s %s\n%s", completeMsg, "FAIL", err) + } +} + +func convertPieceCID(path string) (cid.Cid, error) { + l := len("/piece/") + if len(path) <= l { + return cid.Undef, fmt.Errorf("path %s too short", path) + } + + cidStr := path[l:] + c, err := cid.Parse(cidStr) + if err != nil { + return cid.Undef, fmt.Errorf("parse piece cid failed: %s, %v", cidStr, err) + } + + return c, nil +} + +func badResponse(w http.ResponseWriter, code int, err error) { + w.WriteHeader(code) + w.Write([]byte("Error: " + err.Error())) // nolint +} + +// writeErrorWatcher calls onError if there is an error writing to the writer +type writeErrorWatcher struct { + http.ResponseWriter + count uint64 + onError func(err error) +} + +func (w *writeErrorWatcher) Write(bz []byte) (int, error) { + count, err := w.ResponseWriter.Write(bz) + if err != nil { + w.onError(err) + } + w.count += uint64(count) + return count, err +} diff --git a/retrievalprovider/httpretrieval/server_test.go b/retrievalprovider/httpretrieval/server_test.go new file mode 100644 index 00000000..84c92d35 --- /dev/null +++ b/retrievalprovider/httpretrieval/server_test.go @@ -0,0 +1,119 @@ +package httpretrieval + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "regexp" + "testing" + "time" + + "github.com/filecoin-project/venus-market/v2/config" + "github.com/gorilla/mux" + "github.com/stretchr/testify/assert" +) + +func TestPathRegexp(t *testing.T) { + reg, err := regexp.Compile(`/piece/[a-z0-9]+`) + assert.NoError(t, err) + + cases := []struct { + str string + expect bool + }{ + { + str: "xxx", + expect: false, + }, + { + str: "/piece/", + expect: false, + }, + { + str: "/piece/ssss", + expect: true, + }, + { + str: "/piece/ss1ss1", + expect: true, + }, + } + + for _, c := range cases { + assert.Equal(t, c.expect, reg.MatchString(c.str)) + } +} + +func TestRetrievalByPiece(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpDri := t.TempDir() + cfg := config.DefaultMarketConfig + cfg.Home.HomeDir = tmpDri + cfg.PieceStorage.Fs = []*config.FsPieceStorage{ + { + Name: "test", + ReadOnly: false, + Path: tmpDri, + }, + } + assert.NoError(t, config.SaveConfig(cfg)) + + pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq" + buf := &bytes.Buffer{} + f, err := os.Create(filepath.Join(tmpDri, pieceStr)) + assert.NoError(t, err) + for i := 0; i < 100; i++ { + buf.WriteString("TEST TEST\n") + } + _, err = f.Write(buf.Bytes()) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + s, err := NewServer(&cfg.PieceStorage) + assert.NoError(t, err) + port := "34897" + startHTTPServer(ctx, t, port, s) + + url := fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, pieceStr) + req, err := http.NewRequest(http.MethodGet, url, nil) + assert.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() // nolint + + data, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, buf.Bytes(), data) +} + +func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) { + mux := mux.NewRouter() + err := mux.HandleFunc("/piece/{cid}", s.RetrievalByPieceCID).GetError() + assert.NoError(t, err) + + ser := &http.Server{ + Addr: "127.0.0.1:" + port, + Handler: mux, + } + + go func() { + if err := ser.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + assert.NoError(t, err) + } + }() + + go func() { + // wait server exit + <-ctx.Done() + assert.NoError(t, ser.Shutdown(context.TODO())) + }() + // wait serve up + time.Sleep(time.Second * 2) +} diff --git a/retrievalprovider/modules.go b/retrievalprovider/modules.go index 202f0c3d..eb41e8f3 100644 --- a/retrievalprovider/modules.go +++ b/retrievalprovider/modules.go @@ -82,5 +82,6 @@ var RetrievalProviderOpts = func(cfg *config.MarketConfig) builder.Option { builder.Override(new(gatewayAPIV2.IMarketEvent), NewMarketEventStream), builder.Override(new(gatewayAPIV2.IMarketClient), builder.From(new(gatewayAPIV2.IMarketEvent))), builder.Override(new(gatewayAPIV2.IMarketServiceProvider), builder.From(new(gatewayAPIV2.IMarketEvent))), + builder.Override(new(*TransportsListener), NewTransportsListener), ) } diff --git a/retrievalprovider/provider.go b/retrievalprovider/provider.go index a3ece11a..471976fc 100644 --- a/retrievalprovider/provider.go +++ b/retrievalprovider/provider.go @@ -47,6 +47,8 @@ type RetrievalProvider struct { storageDealRepo repo.StorageDealRepo retrievalStreamHandler *RetrievalStreamHandler + + transportListener *TransportsListener } // NewProvider returns a new retrieval Provider @@ -59,6 +61,7 @@ func NewProvider( repo repo.Repo, cfg *config.MarketConfig, rdf config.RetrievalDealFilter, + transportLister *TransportsListener, ) (*RetrievalProvider, error) { storageDealsRepo := repo.StorageDealRepo() retrievalDealRepo := repo.RetrievalDealRepo() @@ -73,6 +76,7 @@ func NewProvider( storageDealRepo: storageDealsRepo, stores: stores.NewReadOnlyBlockstores(), retrievalStreamHandler: NewRetrievalStreamHandler(cfg, retrievalAskRepo, retrievalDealRepo, storageDealsRepo, pieceInfo), + transportListener: transportLister, } retrievalHandler := NewRetrievalDealHandler(&providerDealEnvironment{p}, retrievalDealRepo, storageDealsRepo) @@ -134,12 +138,14 @@ func NewProvider( // Stop stops handling incoming requests. func (p *RetrievalProvider) Stop() error { + p.transportListener.Stop() return p.network.StopHandlingRequests() } // Start begins listening for deals on the given host. // Start must be called in order to accept incoming deals. func (p *RetrievalProvider) Start(ctx context.Context) error { + p.transportListener.Start() return p.network.SetDelegate(p.retrievalStreamHandler) } diff --git a/retrievalprovider/transports.go b/retrievalprovider/transports.go new file mode 100644 index 00000000..feab7ca3 --- /dev/null +++ b/retrievalprovider/transports.go @@ -0,0 +1,79 @@ +package retrievalprovider + +import ( + "fmt" + "time" + + "github.com/filecoin-project/venus-market/v2/config" + "github.com/filecoin-project/venus-market/v2/types" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/multiformats/go-multiaddr" +) + +// TransportsListener listens for incoming queries over libp2p +type TransportsListener struct { + host host.Host + protocols []types.Protocol +} + +func NewTransportsListener(h host.Host, cfg *config.MarketConfig) (*TransportsListener, error) { + var protos []types.Protocol + + // Get the libp2p addresses from the Host + if len(h.Addrs()) > 0 { + protos = append(protos, types.Protocol{ + Name: "libp2p", + Addresses: h.Addrs(), + }) + } + + // If there's an http retrieval address specified, add HTTP to the list + // of supported protocols + // todo: handle cfg.Miners[].HTTPRetrievalMultiaddr? + if len(cfg.CommonProvider.HTTPRetrievalMultiaddr) != 0 { + maddr, err := multiaddr.NewMultiaddr(cfg.CommonProvider.HTTPRetrievalMultiaddr) + if err != nil { + return nil, fmt.Errorf("could not parse '%s' as multiaddr: %w", cfg.CommonProvider.HTTPRetrievalMultiaddr, err) + } + + protos = append(protos, types.Protocol{ + Name: "http", + Addresses: []multiaddr.Multiaddr{maddr}, + }) + } + + return &TransportsListener{ + host: h, + protocols: protos, + }, nil +} + +func (l *TransportsListener) Start() { + l.host.SetStreamHandler(types.TransportsProtocolID, l.handleNewQueryStream) +} + +func (l *TransportsListener) Stop() { + l.host.RemoveStreamHandler(types.TransportsProtocolID) +} + +// Called when the client opens a libp2p stream +func (l *TransportsListener) handleNewQueryStream(s network.Stream) { + defer s.Close() // nolint + + log.Debugw("query", "peer", s.Conn().RemotePeer()) + + response := types.QueryResponse{Protocols: l.protocols} + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(time.Second * 30)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + // Write the response to the client + err := types.BindnodeRegistry.TypeToWriter(&response, s, dagcbor.Encode) + if err != nil { + log.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err) + return + } +} diff --git a/rpc/rpc.go b/rpc/rpc.go index e21768f6..db94e4f0 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "path" + "regexp" "github.com/etherlabsio/healthcheck/v2" "github.com/filecoin-project/go-jsonrpc" @@ -18,6 +19,7 @@ import ( "github.com/filecoin-project/venus-auth/jwtclient" "github.com/filecoin-project/venus-market/v2/config" + "github.com/filecoin-project/venus-market/v2/retrievalprovider/httpretrieval" ) var log = logging.Logger("modules") @@ -37,6 +39,7 @@ func ServeRPC( authClient *jwtclient.AuthClient, apiHandles []APIHandle, shutdownCh <-chan struct{}, + httpRetrievalServer *httpretrieval.Server, ) error { serverOptions := make([]jsonrpc.ServerOption, 0) if maxRequestSize != 0 { // config set @@ -66,6 +69,10 @@ func ServeRPC( authMux = jwtclient.NewAuthMux(localJwtClient, nil, mux) } authMux.TrustHandle("/healthcheck", healthcheck.Handler()) + if httpRetrievalServer != nil { + authMux.TrustHandle("/piece/", httpRetrievalServer, jwtclient.RegexpOption(regexp.MustCompile(`/piece/[a-z0-9]+`))) + } + srv := &http.Server{Handler: authMux} go func() { diff --git a/types/transports.go b/types/transports.go new file mode 100644 index 00000000..38f656ea --- /dev/null +++ b/types/transports.go @@ -0,0 +1,57 @@ +package types + +import ( + _ "embed" + "fmt" + + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/node/bindnode/registry" + "github.com/multiformats/go-multiaddr" +) + +// boost retrieval protocol +const TransportsProtocolID = "/fil/retrieval/transports/1.0.0" + +// copy from https://github.com/filecoin-project/boost/blob/main/retrievalmarket/types/transports.go#L12 +type Protocol struct { + // The name of the transport protocol eg "libp2p" or "http" + Name string + // The address of the endpoint in multiaddr format + Addresses []multiaddr.Multiaddr +} + +type QueryResponse struct { + Protocols []Protocol +} + +//go:embed transports.ipldsch +var embedSchema []byte + +func multiAddrFromBytes(b []byte) (interface{}, error) { + ma, err := multiaddr.NewMultiaddrBytes(b) + if err != nil { + return nil, err + } + return &ma, err +} + +func multiAddrToBytes(iface interface{}) ([]byte, error) { + ma, ok := iface.(*multiaddr.Multiaddr) + if !ok { + return nil, fmt.Errorf("expected *Multiaddr value") + } + + return (*ma).Bytes(), nil +} + +var BindnodeRegistry = registry.NewRegistry() + +func init() { + var dummyMa multiaddr.Multiaddr + var bindnodeOptions = []bindnode.Option{ + bindnode.TypedBytesConverter(&dummyMa, multiAddrFromBytes, multiAddrToBytes), + } + if err := BindnodeRegistry.RegisterType((*QueryResponse)(nil), string(embedSchema), "QueryResponse", bindnodeOptions...); err != nil { + panic(err.Error()) + } +} diff --git a/types/transports.ipldsch b/types/transports.ipldsch new file mode 100644 index 00000000..f6ab6d79 --- /dev/null +++ b/types/transports.ipldsch @@ -0,0 +1,15 @@ +# Defines the response to a query asking which transport protocols a +# Storage Provider supports +type Multiaddr bytes + +type Protocol struct { + # The name of the transport protocol + # Known protocols: "libp2p", "http", "https" + Name String + # The addresses of the endpoint in multiaddr format + Addresses [Multiaddr] +} + +type QueryResponse struct { + Protocols [Protocol] +} diff --git a/utils/converters.go b/utils/converters.go index 0d395202..22880706 100644 --- a/utils/converters.go +++ b/utils/converters.go @@ -11,16 +11,25 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" ) -func NewStorageProviderInfo(miner address.Address, worker address.Address, sectorSize abi.SectorSize, peer peer.ID, addrs []abi.Multiaddrs) storagemarket.StorageProviderInfo { +func ConvertMultiaddr(addrs [][]byte) ([]multiaddr.Multiaddr, error) { multiaddrs := make([]multiaddr.Multiaddr, 0, len(addrs)) for _, a := range addrs { maddr, err := multiaddr.NewMultiaddrBytes(a) if err != nil { - return storagemarket.StorageProviderInfo{} + return nil, err } multiaddrs = append(multiaddrs, maddr) } + return multiaddrs, nil +} + +func NewStorageProviderInfo(miner address.Address, worker address.Address, sectorSize abi.SectorSize, peer peer.ID, addrs []abi.Multiaddrs) storagemarket.StorageProviderInfo { + multiaddrs, err := ConvertMultiaddr(addrs) + if err != nil { + return storagemarket.StorageProviderInfo{} + } + return storagemarket.StorageProviderInfo{ Address: miner, Worker: worker,