diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 294729c1aa2b..33618d99c4a7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -127,39 +127,6 @@ jobs: working-directory: go-ipfs-api - run: cmd/ipfs/ipfs shutdown if: always() - go-ipfs-http-client: - needs: [interop-prep] - runs-on: ubuntu-latest - timeout-minutes: 5 - env: - TEST_DOCKER: 0 - TEST_FUSE: 0 - TEST_VERBOSE: 1 - TRAVIS: 1 - GIT_PAGER: cat - IPFS_CHECK_RCMGR_DEFAULTS: 1 - defaults: - run: - shell: bash - steps: - - uses: actions/setup-go@v3 - with: - go-version: ${{ env.GO_VERSION }} - - uses: actions/download-artifact@v3 - with: - name: kubo - path: cmd/ipfs - - run: chmod +x cmd/ipfs/ipfs - - uses: actions/checkout@v3 - with: - repository: ipfs/go-ipfs-http-client - path: go-ipfs-http-client - - uses: protocol/cache-go-action@v1 - with: - name: ${{ github.job }} - - run: echo '${{ github.workspace }}/cmd/ipfs' >> $GITHUB_PATH - - run: go test -count=1 -v ./... - working-directory: go-ipfs-http-client ipfs-webui: needs: [interop-prep] runs-on: ${{ fromJSON(github.repository == 'ipfs/kubo' && '["self-hosted", "linux", "x64", "2xlarge"]' || '"ubuntu-latest"') }} diff --git a/.golangci.yml b/.golangci.yml index 2c46046aa2a1..11772b05353c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,5 +4,8 @@ linters: linters-settings: stylecheck: + checks: + - all + - '-ST1003' dot-import-whitelist: - github.com/ipfs/kubo/test/cli/testutils diff --git a/client/rpc/README.md b/client/rpc/README.md new file mode 100644 index 000000000000..e6b534b90e92 --- /dev/null +++ b/client/rpc/README.md @@ -0,0 +1,44 @@ +# `httpapi` + +> IPFS CoreAPI implementation using HTTP API + +This packages implements [`coreiface.CoreAPI`](https://pkg.go.dev/github.com/ipfs/boxo/coreiface#CoreAPI) over the HTTP API. + +## Documentation + +https://pkg.go.dev/github.com/ipfs/kubo/client/rpc + +### Example + +Pin file on your local IPFS node based on its CID: + +```go +package main + +import ( + "context" + "fmt" + + ipfsClient "github.com/ipfs/kubo/client/rpc" + path "github.com/ipfs/boxo/coreiface/path" +) + +func main() { + // "Connect" to local node + node, err := ipfsClient.NewLocalApi() + if err != nil { + fmt.Printf(err) + return + } + // Pin a given file by its CID + ctx := context.Background() + cid := "bafkreidtuosuw37f5xmn65b3ksdiikajy7pwjjslzj2lxxz2vc4wdy3zku" + p := path.New(cid) + err = node.Pin().Add(ctx, p) + if err != nil { + fmt.Printf(err) + return + } + return +} +``` diff --git a/client/rpc/api.go b/client/rpc/api.go new file mode 100644 index 000000000000..5584de85d998 --- /dev/null +++ b/client/rpc/api.go @@ -0,0 +1,215 @@ +package httpapi + +import ( + "errors" + "fmt" + "net/http" + "os" + "path/filepath" + "strings" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/mitchellh/go-homedir" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +const ( + DefaultPathName = ".ipfs" + DefaultPathRoot = "~/" + DefaultPathName + DefaultApiFile = "api" + EnvDir = "IPFS_PATH" +) + +// ErrApiNotFound if we fail to find a running daemon. +var ErrApiNotFound = errors.New("ipfs api address could not be found") + +// HttpApi implements github.com/ipfs/interface-go-ipfs-core/CoreAPI using +// IPFS HTTP API. +// +// For interface docs see +// https://godoc.org/github.com/ipfs/interface-go-ipfs-core#CoreAPI +type HttpApi struct { + url string + httpcli http.Client + Headers http.Header + applyGlobal func(*requestBuilder) +} + +// NewLocalApi tries to construct new HttpApi instance communicating with local +// IPFS daemon +// +// Daemon api address is pulled from the $IPFS_PATH/api file. +// If $IPFS_PATH env var is not present, it defaults to ~/.ipfs +func NewLocalApi() (*HttpApi, error) { + baseDir := os.Getenv(EnvDir) + if baseDir == "" { + baseDir = DefaultPathRoot + } + + return NewPathApi(baseDir) +} + +// NewPathApi constructs new HttpApi by pulling api address from specified +// ipfspath. Api file should be located at $ipfspath/api +func NewPathApi(ipfspath string) (*HttpApi, error) { + a, err := ApiAddr(ipfspath) + if err != nil { + if os.IsNotExist(err) { + err = ErrApiNotFound + } + return nil, err + } + return NewApi(a) +} + +// ApiAddr reads api file in specified ipfs path +func ApiAddr(ipfspath string) (ma.Multiaddr, error) { + baseDir, err := homedir.Expand(ipfspath) + if err != nil { + return nil, err + } + + apiFile := filepath.Join(baseDir, DefaultApiFile) + + api, err := os.ReadFile(apiFile) + if err != nil { + return nil, err + } + + return ma.NewMultiaddr(strings.TrimSpace(string(api))) +} + +// NewApi constructs HttpApi with specified endpoint +func NewApi(a ma.Multiaddr) (*HttpApi, error) { + c := &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DisableKeepAlives: true, + }, + } + + return NewApiWithClient(a, c) +} + +// NewApiWithClient constructs HttpApi with specified endpoint and custom http client +func NewApiWithClient(a ma.Multiaddr, c *http.Client) (*HttpApi, error) { + _, url, err := manet.DialArgs(a) + if err != nil { + return nil, err + } + + if a, err := ma.NewMultiaddr(url); err == nil { + _, host, err := manet.DialArgs(a) + if err == nil { + url = host + } + } + + proto := "http://" + + // By default, DialArgs is going to provide details suitable for connecting + // a socket to, but not really suitable for making an informed choice of http + // protocol. For multiaddresses specifying tls and/or https we want to make + // a https request instead of a http request. + protocols := a.Protocols() + for _, p := range protocols { + if p.Code == ma.P_HTTPS || p.Code == ma.P_TLS { + proto = "https://" + break + } + } + + return NewURLApiWithClient(proto+url, c) +} + +func NewURLApiWithClient(url string, c *http.Client) (*HttpApi, error) { + api := &HttpApi{ + url: url, + httpcli: *c, + Headers: make(map[string][]string), + applyGlobal: func(*requestBuilder) {}, + } + + // We don't support redirects. + api.httpcli.CheckRedirect = func(_ *http.Request, _ []*http.Request) error { + return fmt.Errorf("unexpected redirect") + } + return api, nil +} + +func (api *HttpApi) WithOptions(opts ...caopts.ApiOption) (iface.CoreAPI, error) { + options, err := caopts.ApiOptions(opts...) + if err != nil { + return nil, err + } + + subApi := *api + subApi.applyGlobal = func(req *requestBuilder) { + if options.Offline { + req.Option("offline", options.Offline) + } + } + + return &subApi, nil +} + +func (api *HttpApi) Request(command string, args ...string) RequestBuilder { + headers := make(map[string]string) + if api.Headers != nil { + for k := range api.Headers { + headers[k] = api.Headers.Get(k) + } + } + return &requestBuilder{ + command: command, + args: args, + shell: api, + headers: headers, + } +} + +func (api *HttpApi) Unixfs() iface.UnixfsAPI { + return (*UnixfsAPI)(api) +} + +func (api *HttpApi) Block() iface.BlockAPI { + return (*BlockAPI)(api) +} + +func (api *HttpApi) Dag() iface.APIDagService { + return (*HttpDagServ)(api) +} + +func (api *HttpApi) Name() iface.NameAPI { + return (*NameAPI)(api) +} + +func (api *HttpApi) Key() iface.KeyAPI { + return (*KeyAPI)(api) +} + +func (api *HttpApi) Pin() iface.PinAPI { + return (*PinAPI)(api) +} + +func (api *HttpApi) Object() iface.ObjectAPI { + return (*ObjectAPI)(api) +} + +func (api *HttpApi) Dht() iface.DhtAPI { + return (*DhtAPI)(api) +} + +func (api *HttpApi) Swarm() iface.SwarmAPI { + return (*SwarmAPI)(api) +} + +func (api *HttpApi) PubSub() iface.PubSubAPI { + return (*PubsubAPI)(api) +} + +func (api *HttpApi) Routing() iface.RoutingAPI { + return (*RoutingAPI)(api) +} diff --git a/client/rpc/api_test.go b/client/rpc/api_test.go new file mode 100644 index 000000000000..ef6ea62fd7bf --- /dev/null +++ b/client/rpc/api_test.go @@ -0,0 +1,151 @@ +package httpapi + +import ( + "context" + "net/http" + "net/http/httptest" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + iface "github.com/ipfs/boxo/coreiface" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/boxo/coreiface/tests" + "github.com/ipfs/kubo/test/cli/harness" + ma "github.com/multiformats/go-multiaddr" + "go.uber.org/multierr" +) + +type NodeProvider struct{} + +func (np NodeProvider) MakeAPISwarm(t *testing.T, ctx context.Context, fullIdentity, online bool, n int) ([]iface.CoreAPI, error) { + h := harness.NewT(t) + + apis := make([]iface.CoreAPI, n) + + var wg, zero sync.WaitGroup + var zeroNode *harness.Node + wg.Add(len(apis)) + zero.Add(1) + + var errs []error + var errsLk sync.Mutex + + for i := range apis { + go func(i int, h *harness.Harness) { + if err := func() error { + defer wg.Done() + var err error + + n := h.NewNode().Init() + zeroNode = n + + c := n.ReadConfig() + c.Experimental.FilestoreEnabled = true + n.WriteConfig(c) + + n.StartDaemon("--enable-pubsub-experiment", "--offline="+strconv.FormatBool(!online)) + + if online { + if i > 0 { + zero.Wait() + n.Connect(zeroNode) + } else { + zero.Done() + } + } + + api, err := NewApi(n.APIListenAddr) + if err != nil { + return err + } + apis[i] = api + + // empty node is pinned even with --empty-repo, we don't want that + emptyNode := path.New("/ipfs/QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn") + if err := api.Pin().Rm(ctx, emptyNode); err != nil { + return err + } + return nil + }(); err != nil { + errsLk.Lock() + errs = append(errs, err) + errsLk.Unlock() + } + }(i, h) + } + + wg.Wait() + + return apis, multierr.Combine(errs...) +} + +func TestHttpApi(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("skipping due to #9905") + } + + tests.TestApi(NodeProvider{})(t) +} + +func Test_NewURLApiWithClient_With_Headers(t *testing.T) { + var ( + headerToTest = "Test-Header" + expectedHeaderValue = "thisisaheadertest" + ) + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + val := r.Header.Get(headerToTest) + if val != expectedHeaderValue { + w.WriteHeader(400) + return + } + http.ServeContent(w, r, "", time.Now(), strings.NewReader("test")) + }), + ) + defer ts.Close() + api, err := NewURLApiWithClient(ts.URL, &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DisableKeepAlives: true, + }, + }) + if err != nil { + t.Fatal(err) + } + api.Headers.Set(headerToTest, expectedHeaderValue) + if err := api.Pin().Rm(context.Background(), path.New("/ipfs/QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv")); err != nil { + t.Fatal(err) + } +} + +func Test_NewURLApiWithClient_HTTP_Variant(t *testing.T) { + testcases := []struct { + address string + expected string + }{ + {address: "/ip4/127.0.0.1/tcp/80", expected: "http://127.0.0.1:80"}, + {address: "/ip4/127.0.0.1/tcp/443/tls", expected: "https://127.0.0.1:443"}, + {address: "/ip4/127.0.0.1/tcp/443/https", expected: "https://127.0.0.1:443"}, + {address: "/ip4/127.0.0.1/tcp/443/tls/http", expected: "https://127.0.0.1:443"}, + } + + for _, tc := range testcases { + address, err := ma.NewMultiaddr(tc.address) + if err != nil { + t.Fatal(err) + } + + api, err := NewApiWithClient(address, &http.Client{}) + if err != nil { + t.Fatal(err) + } + + if api.url != tc.expected { + t.Errorf("Expected = %s; got %s", tc.expected, api.url) + } + } +} diff --git a/client/rpc/apifile.go b/client/rpc/apifile.go new file mode 100644 index 000000000000..25fd7c3b3718 --- /dev/null +++ b/client/rpc/apifile.go @@ -0,0 +1,270 @@ +package httpapi + +import ( + "context" + "encoding/json" + "fmt" + "io" + + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/boxo/files" + unixfs "github.com/ipfs/boxo/ipld/unixfs" + "github.com/ipfs/go-cid" +) + +const forwardSeekLimit = 1 << 14 //16k + +func (api *UnixfsAPI) Get(ctx context.Context, p path.Path) (files.Node, error) { + if p.Mutable() { // use resolved path in case we are dealing with IPNS / MFS + var err error + p, err = api.core().ResolvePath(ctx, p) + if err != nil { + return nil, err + } + } + + var stat struct { + Hash string + Type string + Size int64 // unixfs size + } + err := api.core().Request("files/stat", p.String()).Exec(ctx, &stat) + if err != nil { + return nil, err + } + + switch stat.Type { + case "file": + return api.getFile(ctx, p, stat.Size) + case "directory": + return api.getDir(ctx, p, stat.Size) + default: + return nil, fmt.Errorf("unsupported file type '%s'", stat.Type) + } +} + +type apiFile struct { + ctx context.Context + core *HttpApi + size int64 + path path.Path + + r *Response + at int64 +} + +func (f *apiFile) reset() error { + if f.r != nil { + _ = f.r.Cancel() + f.r = nil + } + req := f.core.Request("cat", f.path.String()) + if f.at != 0 { + req.Option("offset", f.at) + } + resp, err := req.Send(f.ctx) + if err != nil { + return err + } + if resp.Error != nil { + return resp.Error + } + f.r = resp + return nil +} + +func (f *apiFile) Read(p []byte) (int, error) { + n, err := f.r.Output.Read(p) + if n > 0 { + f.at += int64(n) + } + return n, err +} + +func (f *apiFile) ReadAt(p []byte, off int64) (int, error) { + // Always make a new request. This method should be parallel-safe. + resp, err := f.core.Request("cat", f.path.String()). + Option("offset", off).Option("length", len(p)).Send(f.ctx) + if err != nil { + return 0, err + } + if resp.Error != nil { + return 0, resp.Error + } + defer resp.Output.Close() + + n, err := io.ReadFull(resp.Output, p) + if err == io.ErrUnexpectedEOF { + err = io.EOF + } + return n, err +} + +func (f *apiFile) Seek(offset int64, whence int) (int64, error) { + switch whence { + case io.SeekEnd: + offset = f.size + offset + case io.SeekCurrent: + offset = f.at + offset + } + if f.at == offset { //noop + return offset, nil + } + + if f.at < offset && offset-f.at < forwardSeekLimit { //forward skip + r, err := io.CopyN(io.Discard, f.r.Output, offset-f.at) + + f.at += r + return f.at, err + } + f.at = offset + return f.at, f.reset() +} + +func (f *apiFile) Close() error { + if f.r != nil { + return f.r.Cancel() + } + return nil +} + +func (f *apiFile) Size() (int64, error) { + return f.size, nil +} + +func (api *UnixfsAPI) getFile(ctx context.Context, p path.Path, size int64) (files.Node, error) { + f := &apiFile{ + ctx: ctx, + core: api.core(), + size: size, + path: p, + } + + return f, f.reset() +} + +type apiIter struct { + ctx context.Context + core *UnixfsAPI + + err error + + dec *json.Decoder + curFile files.Node + cur lsLink +} + +func (it *apiIter) Err() error { + return it.err +} + +func (it *apiIter) Name() string { + return it.cur.Name +} + +func (it *apiIter) Next() bool { + if it.ctx.Err() != nil { + it.err = it.ctx.Err() + return false + } + + var out lsOutput + if err := it.dec.Decode(&out); err != nil { + if err != io.EOF { + it.err = err + } + return false + } + + if len(out.Objects) != 1 { + it.err = fmt.Errorf("ls returned more objects than expected (%d)", len(out.Objects)) + return false + } + + if len(out.Objects[0].Links) != 1 { + it.err = fmt.Errorf("ls returned more links than expected (%d)", len(out.Objects[0].Links)) + return false + } + + it.cur = out.Objects[0].Links[0] + c, err := cid.Parse(it.cur.Hash) + if err != nil { + it.err = err + return false + } + + switch it.cur.Type { + case unixfs.THAMTShard, unixfs.TMetadata, unixfs.TDirectory: + it.curFile, err = it.core.getDir(it.ctx, path.IpfsPath(c), int64(it.cur.Size)) + if err != nil { + it.err = err + return false + } + case unixfs.TFile: + it.curFile, err = it.core.getFile(it.ctx, path.IpfsPath(c), int64(it.cur.Size)) + if err != nil { + it.err = err + return false + } + default: + it.err = fmt.Errorf("file type %d not supported", it.cur.Type) + return false + } + return true +} + +func (it *apiIter) Node() files.Node { + return it.curFile +} + +type apiDir struct { + ctx context.Context + core *UnixfsAPI + size int64 + path path.Path + + dec *json.Decoder +} + +func (d *apiDir) Close() error { + return nil +} + +func (d *apiDir) Size() (int64, error) { + return d.size, nil +} + +func (d *apiDir) Entries() files.DirIterator { + return &apiIter{ + ctx: d.ctx, + core: d.core, + dec: d.dec, + } +} + +func (api *UnixfsAPI) getDir(ctx context.Context, p path.Path, size int64) (files.Node, error) { + resp, err := api.core().Request("ls", p.String()). + Option("resolve-size", true). + Option("stream", true).Send(ctx) + + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + d := &apiDir{ + ctx: ctx, + core: api, + size: size, + path: p, + + dec: json.NewDecoder(resp.Output), + } + + return d, nil +} + +var _ files.File = &apiFile{} +var _ files.Directory = &apiDir{} diff --git a/client/rpc/block.go b/client/rpc/block.go new file mode 100644 index 000000000000..2a794c26f529 --- /dev/null +++ b/client/rpc/block.go @@ -0,0 +1,134 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/go-cid" + mc "github.com/multiformats/go-multicodec" + mh "github.com/multiformats/go-multihash" +) + +type BlockAPI HttpApi + +type blockStat struct { + Key string + BSize int `json:"Size"` + + cid cid.Cid +} + +func (s *blockStat) Size() int { + return s.BSize +} + +func (s *blockStat) Path() path.Resolved { + return path.IpldPath(s.cid) +} + +func (api *BlockAPI) Put(ctx context.Context, r io.Reader, opts ...caopts.BlockPutOption) (iface.BlockStat, error) { + options, err := caopts.BlockPutOptions(opts...) + px := options.CidPrefix + if err != nil { + return nil, err + } + + mht, ok := mh.Codes[px.MhType] + if !ok { + return nil, fmt.Errorf("unknowm mhType %d", px.MhType) + } + + var cidOptKey, cidOptVal string + switch { + case px.Version == 0 && px.Codec == cid.DagProtobuf: + // ensure legacy --format=v0 passes as BlockPutOption still works + cidOptKey = "format" + cidOptVal = "v0" + default: + // pass codec as string + cidOptKey = "cid-codec" + cidOptVal = mc.Code(px.Codec).String() + } + + req := api.core().Request("block/put"). + Option("mhtype", mht). + Option("mhlen", px.MhLength). + Option(cidOptKey, cidOptVal). + Option("pin", options.Pin). + FileBody(r) + + var out blockStat + if err := req.Exec(ctx, &out); err != nil { + return nil, err + } + out.cid, err = cid.Parse(out.Key) + if err != nil { + return nil, err + } + + return &out, nil +} + +func (api *BlockAPI) Get(ctx context.Context, p path.Path) (io.Reader, error) { + resp, err := api.core().Request("block/get", p.String()).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, parseErrNotFoundWithFallbackToError(resp.Error) + } + + //TODO: make get return ReadCloser to avoid copying + defer resp.Close() + b := new(bytes.Buffer) + if _, err := io.Copy(b, resp.Output); err != nil { + return nil, err + } + + return b, nil +} + +func (api *BlockAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.BlockRmOption) error { + options, err := caopts.BlockRmOptions(opts...) + if err != nil { + return err + } + + removedBlock := struct { + Hash string `json:",omitempty"` + Error string `json:",omitempty"` + }{} + + req := api.core().Request("block/rm"). + Option("force", options.Force). + Arguments(p.String()) + + if err := req.Exec(ctx, &removedBlock); err != nil { + return err + } + + return parseErrNotFoundWithFallbackToMSG(removedBlock.Error) +} + +func (api *BlockAPI) Stat(ctx context.Context, p path.Path) (iface.BlockStat, error) { + var out blockStat + err := api.core().Request("block/stat", p.String()).Exec(ctx, &out) + if err != nil { + return nil, parseErrNotFoundWithFallbackToError(err) + } + out.cid, err = cid.Parse(out.Key) + if err != nil { + return nil, err + } + + return &out, nil +} + +func (api *BlockAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/dag.go b/client/rpc/dag.go new file mode 100644 index 000000000000..795e1d78d9e3 --- /dev/null +++ b/client/rpc/dag.go @@ -0,0 +1,136 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io" + + "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + multicodec "github.com/multiformats/go-multicodec" +) + +type httpNodeAdder HttpApi +type HttpDagServ httpNodeAdder +type pinningHttpNodeAdder httpNodeAdder + +func (api *HttpDagServ) Get(ctx context.Context, c cid.Cid) (format.Node, error) { + r, err := api.core().Block().Get(ctx, path.IpldPath(c)) + if err != nil { + return nil, err + } + + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + blk, err := blocks.NewBlockWithCid(data, c) + if err != nil { + return nil, err + } + + return format.DefaultBlockDecoder.Decode(blk) +} + +func (api *HttpDagServ) GetMany(ctx context.Context, cids []cid.Cid) <-chan *format.NodeOption { + out := make(chan *format.NodeOption) + + for _, c := range cids { + // TODO: Consider limiting concurrency of this somehow + go func(c cid.Cid) { + n, err := api.Get(ctx, c) + + select { + case out <- &format.NodeOption{Node: n, Err: err}: + case <-ctx.Done(): + } + }(c) + } + return out +} + +func (api *httpNodeAdder) add(ctx context.Context, nd format.Node, pin bool) error { + c := nd.Cid() + prefix := c.Prefix() + + // preserve 'cid-codec' when sent over HTTP + cidCodec := multicodec.Code(prefix.Codec).String() + + // 'format' got replaced by 'cid-codec' in https://github.com/ipfs/interface-go-ipfs-core/pull/80 + // but we still support it here for backward-compatibility with use of CIDv0 + format := "" + if prefix.Version == 0 { + cidCodec = "" + format = "v0" + } + + stat, err := api.core().Block().Put(ctx, bytes.NewReader(nd.RawData()), + options.Block.Hash(prefix.MhType, prefix.MhLength), + options.Block.CidCodec(cidCodec), + options.Block.Format(format), + options.Block.Pin(pin)) + if err != nil { + return err + } + if !stat.Path().Cid().Equals(c) { + return fmt.Errorf("cids didn't match - local %s, remote %s", c.String(), stat.Path().Cid().String()) + } + return nil +} + +func (api *httpNodeAdder) addMany(ctx context.Context, nds []format.Node, pin bool) error { + for _, nd := range nds { + // TODO: optimize + if err := api.add(ctx, nd, pin); err != nil { + return err + } + } + return nil +} + +func (api *HttpDagServ) AddMany(ctx context.Context, nds []format.Node) error { + return (*httpNodeAdder)(api).addMany(ctx, nds, false) +} + +func (api *HttpDagServ) Add(ctx context.Context, nd format.Node) error { + return (*httpNodeAdder)(api).add(ctx, nd, false) +} + +func (api *pinningHttpNodeAdder) Add(ctx context.Context, nd format.Node) error { + return (*httpNodeAdder)(api).add(ctx, nd, true) +} + +func (api *pinningHttpNodeAdder) AddMany(ctx context.Context, nds []format.Node) error { + return (*httpNodeAdder)(api).addMany(ctx, nds, true) +} + +func (api *HttpDagServ) Pinning() format.NodeAdder { + return (*pinningHttpNodeAdder)(api) +} + +func (api *HttpDagServ) Remove(ctx context.Context, c cid.Cid) error { + return api.core().Block().Rm(ctx, path.IpldPath(c)) //TODO: should we force rm? +} + +func (api *HttpDagServ) RemoveMany(ctx context.Context, cids []cid.Cid) error { + for _, c := range cids { + // TODO: optimize + if err := api.Remove(ctx, c); err != nil { + return err + } + } + return nil +} + +func (api *httpNodeAdder) core() *HttpApi { + return (*HttpApi)(api) +} + +func (api *HttpDagServ) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/dht.go b/client/rpc/dht.go new file mode 100644 index 000000000000..a2910fef6d37 --- /dev/null +++ b/client/rpc/dht.go @@ -0,0 +1,113 @@ +package httpapi + +import ( + "context" + "encoding/json" + + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" +) + +type DhtAPI HttpApi + +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { + var out struct { + Type routing.QueryEventType + Responses []peer.AddrInfo + } + resp, err := api.core().Request("dht/findpeer", p.Pretty()).Send(ctx) + if err != nil { + return peer.AddrInfo{}, err + } + if resp.Error != nil { + return peer.AddrInfo{}, resp.Error + } + defer resp.Close() + dec := json.NewDecoder(resp.Output) + for { + if err := dec.Decode(&out); err != nil { + return peer.AddrInfo{}, err + } + if out.Type == routing.FinalPeer { + return out.Responses[0], nil + } + } +} + +func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) { + options, err := caopts.DhtFindProvidersOptions(opts...) + if err != nil { + return nil, err + } + + rp, err := api.core().ResolvePath(ctx, p) + if err != nil { + return nil, err + } + + resp, err := api.core().Request("dht/findprovs", rp.Cid().String()). + Option("num-providers", options.NumProviders). + Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + res := make(chan peer.AddrInfo) + + go func() { + defer resp.Close() + defer close(res) + dec := json.NewDecoder(resp.Output) + + for { + var out struct { + Extra string + Type routing.QueryEventType + Responses []peer.AddrInfo + } + + if err := dec.Decode(&out); err != nil { + return // todo: handle this somehow + } + if out.Type == routing.QueryError { + return // usually a 'not found' error + // todo: handle other errors + } + if out.Type == routing.Provider { + for _, pi := range out.Responses { + select { + case res <- pi: + case <-ctx.Done(): + return + } + } + } + } + }() + + return res, nil +} + +func (api *DhtAPI) Provide(ctx context.Context, p path.Path, opts ...caopts.DhtProvideOption) error { + options, err := caopts.DhtProvideOptions(opts...) + if err != nil { + return err + } + + rp, err := api.core().ResolvePath(ctx, p) + if err != nil { + return err + } + + return api.core().Request("dht/provide", rp.Cid().String()). + Option("recursive", options.Recursive). + Exec(ctx, nil) +} + +func (api *DhtAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/errors.go b/client/rpc/errors.go new file mode 100644 index 000000000000..59e4ad705432 --- /dev/null +++ b/client/rpc/errors.go @@ -0,0 +1,166 @@ +package httpapi + +import ( + "errors" + "strings" + "unicode/utf8" + + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + mbase "github.com/multiformats/go-multibase" +) + +// This file handle parsing and returning the correct ABI based errors from error messages + +type prePostWrappedNotFoundError struct { + pre string + post string + + wrapped ipld.ErrNotFound +} + +func (e prePostWrappedNotFoundError) String() string { + return e.Error() +} + +func (e prePostWrappedNotFoundError) Error() string { + return e.pre + e.wrapped.Error() + e.post +} + +func (e prePostWrappedNotFoundError) Unwrap() error { + return e.wrapped +} + +func parseErrNotFoundWithFallbackToMSG(msg string) error { + err, handled := parseErrNotFound(msg) + if handled { + return err + } + + return errors.New(msg) +} + +func parseErrNotFoundWithFallbackToError(msg error) error { + err, handled := parseErrNotFound(msg.Error()) + if handled { + return err + } + + return msg +} + +func parseErrNotFound(msg string) (error, bool) { + if msg == "" { + return nil, true // Fast path + } + + if err, handled := parseIPLDErrNotFound(msg); handled { + return err, true + } + + if err, handled := parseBlockstoreNotFound(msg); handled { + return err, true + } + + return nil, false +} + +// Assume CIDs break on: +// - Whitespaces: " \t\n\r\v\f" +// - Semicolon: ";" this is to parse ipld.ErrNotFound wrapped in multierr +// - Double Quotes: "\"" this is for parsing %q and %#v formating +const cidBreakSet = " \t\n\r\v\f;\"" + +func parseIPLDErrNotFound(msg string) (error, bool) { + // The patern we search for is: + const ipldErrNotFoundKey = "ipld: could not find " /*CID*/ + // We try to parse the CID, if it's invalid we give up and return a simple text error. + // We also accept "node" in place of the CID because that means it's an Undefined CID. + + keyIndex := strings.Index(msg, ipldErrNotFoundKey) + + if keyIndex < 0 { // Unknown error + return nil, false + } + + cidStart := keyIndex + len(ipldErrNotFoundKey) + + msgPostKey := msg[cidStart:] + var c cid.Cid + var postIndex int + if strings.HasPrefix(msgPostKey, "node") { + // Fallback case + c = cid.Undef + postIndex = len("node") + } else { + postIndex = strings.IndexFunc(msgPostKey, func(r rune) bool { + return strings.ContainsAny(string(r), cidBreakSet) + }) + if postIndex < 0 { + // no breakage meaning the string look like this something + "ipld: could not find bafy" + postIndex = len(msgPostKey) + } + + cidStr := msgPostKey[:postIndex] + + var err error + c, err = cid.Decode(cidStr) + if err != nil { + // failed to decode CID give up + return nil, false + } + + // check that the CID is either a CIDv0 or a base32 multibase + // because that what ipld.ErrNotFound.Error() -> cid.Cid.String() do currently + if c.Version() != 0 { + baseRune, _ := utf8.DecodeRuneInString(cidStr) + if baseRune == utf8.RuneError || baseRune != mbase.Base32 { + // not a multibase we expect, give up + return nil, false + } + } + } + + err := ipld.ErrNotFound{Cid: c} + pre := msg[:keyIndex] + post := msgPostKey[postIndex:] + + if len(pre) > 0 || len(post) > 0 { + return prePostWrappedNotFoundError{ + pre: pre, + post: post, + wrapped: err, + }, true + } + + return err, true +} + +// This is a simple error type that just return msg as Error(). +// But that also match ipld.ErrNotFound when called with Is(err). +// That is needed to keep compatiblity with code that use string.Contains(err.Error(), "blockstore: block not found") +// and code using ipld.ErrNotFound +type blockstoreNotFoundMatchingIPLDErrNotFound struct { + msg string +} + +func (e blockstoreNotFoundMatchingIPLDErrNotFound) String() string { + return e.Error() +} + +func (e blockstoreNotFoundMatchingIPLDErrNotFound) Error() string { + return e.msg +} + +func (e blockstoreNotFoundMatchingIPLDErrNotFound) Is(err error) bool { + _, ok := err.(ipld.ErrNotFound) + return ok +} + +func parseBlockstoreNotFound(msg string) (error, bool) { + if !strings.Contains(msg, "blockstore: block not found") { + return nil, false + } + + return blockstoreNotFoundMatchingIPLDErrNotFound{msg: msg}, true +} diff --git a/client/rpc/errors_test.go b/client/rpc/errors_test.go new file mode 100644 index 000000000000..c8b98d08eb24 --- /dev/null +++ b/client/rpc/errors_test.go @@ -0,0 +1,95 @@ +package httpapi + +import ( + "errors" + "fmt" + "testing" + + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + mbase "github.com/multiformats/go-multibase" + mh "github.com/multiformats/go-multihash" +) + +var randomSha256MH = mh.Multihash{0x12, 0x20, 0x88, 0x82, 0x73, 0x37, 0x7c, 0xc1, 0xc9, 0x96, 0xad, 0xee, 0xd, 0x26, 0x84, 0x2, 0xc9, 0xc9, 0x5c, 0xf9, 0x5c, 0x4d, 0x9b, 0xc3, 0x3f, 0xfb, 0x4a, 0xd8, 0xaf, 0x28, 0x6b, 0xca, 0x1a, 0xf2} + +func doParseIpldNotFoundTest(t *testing.T, original error) { + originalMsg := original.Error() + + rebuilt := parseErrNotFoundWithFallbackToMSG(originalMsg) + + rebuiltMsg := rebuilt.Error() + + if originalMsg != rebuiltMsg { + t.Errorf("expected message to be %q; got %q", originalMsg, rebuiltMsg) + } + + originalNotFound := ipld.IsNotFound(original) + rebuiltNotFound := ipld.IsNotFound(rebuilt) + if originalNotFound != rebuiltNotFound { + t.Errorf("for %q expected Ipld.IsNotFound to be %t; got %t", originalMsg, originalNotFound, rebuiltNotFound) + } +} + +func TestParseIPLDNotFound(t *testing.T) { + if err := parseErrNotFoundWithFallbackToMSG(""); err != nil { + t.Errorf("expected empty string to give no error; got %T %q", err, err.Error()) + } + + cidBreaks := make([]string, len(cidBreakSet)) + for i, v := range cidBreakSet { + cidBreaks[i] = "%w" + string(v) + } + + base58BTCEncoder, err := mbase.NewEncoder(mbase.Base58BTC) + if err != nil { + t.Fatalf("expected to find Base58BTC encoder; got error %q", err.Error()) + } + + for _, wrap := range append(cidBreaks, + "", + "merkledag: %w", + "testing: %w the test", + "%w is wrong", + ) { + for _, err := range [...]error{ + errors.New("ipld: could not find "), + errors.New("ipld: could not find Bad_CID"), + errors.New("ipld: could not find " + cid.NewCidV1(cid.Raw, randomSha256MH).Encode(base58BTCEncoder)), // Test that we only accept CIDv0 and base32 CIDs + errors.New("network connection timeout"), + ipld.ErrNotFound{Cid: cid.Undef}, + ipld.ErrNotFound{Cid: cid.NewCidV0(randomSha256MH)}, + ipld.ErrNotFound{Cid: cid.NewCidV1(cid.Raw, randomSha256MH)}, + } { + if wrap != "" { + err = fmt.Errorf(wrap, err) + } + + doParseIpldNotFoundTest(t, err) + } + } +} + +func TestBlockstoreNotFoundMatchingIPLDErrNotFound(t *testing.T) { + if !ipld.IsNotFound(blockstoreNotFoundMatchingIPLDErrNotFound{}) { + t.Fatalf("expected blockstoreNotFoundMatchingIPLDErrNotFound to match ipld.IsNotFound; got false") + } + + for _, wrap := range [...]string{ + "", + "merkledag: %w", + "testing: %w the test", + "%w is wrong", + } { + for _, err := range [...]error{ + errors.New("network connection timeout"), + blockstoreNotFoundMatchingIPLDErrNotFound{"blockstore: block not found"}, + } { + if wrap != "" { + err = fmt.Errorf(wrap, err) + } + + doParseIpldNotFoundTest(t, err) + } + } +} diff --git a/client/rpc/key.go b/client/rpc/key.go new file mode 100644 index 000000000000..434e98fe5214 --- /dev/null +++ b/client/rpc/key.go @@ -0,0 +1,123 @@ +package httpapi + +import ( + "context" + "errors" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/libp2p/go-libp2p/core/peer" +) + +type KeyAPI HttpApi + +type keyOutput struct { + JName string `json:"Name"` + Id string + + pid peer.ID +} + +func (k *keyOutput) Name() string { + return k.JName +} + +func (k *keyOutput) Path() path.Path { + return path.New("/ipns/" + k.Id) +} + +func (k *keyOutput) ID() peer.ID { + return k.pid +} + +func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.KeyGenerateOption) (iface.Key, error) { + options, err := caopts.KeyGenerateOptions(opts...) + if err != nil { + return nil, err + } + + var out keyOutput + err = api.core().Request("key/gen", name). + Option("type", options.Algorithm). + Option("size", options.Size). + Exec(ctx, &out) + if err != nil { + return nil, err + } + out.pid, err = peer.Decode(out.Id) + return &out, err +} + +func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, opts ...caopts.KeyRenameOption) (iface.Key, bool, error) { + options, err := caopts.KeyRenameOptions(opts...) + if err != nil { + return nil, false, err + } + + var out struct { + Was string + Now string + Id string + Overwrite bool + } + err = api.core().Request("key/rename", oldName, newName). + Option("force", options.Force). + Exec(ctx, &out) + if err != nil { + return nil, false, err + } + + id := &keyOutput{JName: out.Now, Id: out.Id} + id.pid, err = peer.Decode(id.Id) + return id, out.Overwrite, err +} + +func (api *KeyAPI) List(ctx context.Context) ([]iface.Key, error) { + var out struct{ Keys []*keyOutput } + if err := api.core().Request("key/list").Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]iface.Key, len(out.Keys)) + for i, k := range out.Keys { + var err error + k.pid, err = peer.Decode(k.Id) + if err != nil { + return nil, err + } + res[i] = k + } + + return res, nil +} + +func (api *KeyAPI) Self(ctx context.Context) (iface.Key, error) { + var id struct{ ID string } + if err := api.core().Request("id").Exec(ctx, &id); err != nil { + return nil, err + } + + var err error + out := keyOutput{JName: "self", Id: id.ID} + out.pid, err = peer.Decode(out.Id) + return &out, err +} + +func (api *KeyAPI) Remove(ctx context.Context, name string) (iface.Key, error) { + var out struct{ Keys []keyOutput } + if err := api.core().Request("key/rm", name).Exec(ctx, &out); err != nil { + return nil, err + } + if len(out.Keys) != 1 { + return nil, errors.New("got unexpected number of keys back") + } + + var err error + out.Keys[0].pid, err = peer.Decode(out.Keys[0].Id) + return &out.Keys[0], err +} + +func (api *KeyAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/name.go b/client/rpc/name.go new file mode 100644 index 000000000000..f82f69f3ac42 --- /dev/null +++ b/client/rpc/name.go @@ -0,0 +1,140 @@ +package httpapi + +import ( + "context" + "encoding/json" + "fmt" + "io" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + nsopts "github.com/ipfs/boxo/coreiface/options/namesys" + "github.com/ipfs/boxo/coreiface/path" +) + +type NameAPI HttpApi + +type ipnsEntry struct { + JName string `json:"Name"` + JValue string `json:"Value"` + + path path.Path +} + +func (e *ipnsEntry) Name() string { + return e.JName +} + +func (e *ipnsEntry) Value() path.Path { + return e.path +} + +func (api *NameAPI) Publish(ctx context.Context, p path.Path, opts ...caopts.NamePublishOption) (iface.IpnsEntry, error) { + options, err := caopts.NamePublishOptions(opts...) + if err != nil { + return nil, err + } + + req := api.core().Request("name/publish", p.String()). + Option("key", options.Key). + Option("allow-offline", options.AllowOffline). + Option("lifetime", options.ValidTime). + Option("resolve", false) + + if options.TTL != nil { + req.Option("ttl", options.TTL) + } + + var out ipnsEntry + if err := req.Exec(ctx, &out); err != nil { + return nil, err + } + out.path = path.New(out.JValue) + return &out, out.path.IsValid() +} + +func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan iface.IpnsResult, error) { + options, err := caopts.NameResolveOptions(opts...) + if err != nil { + return nil, err + } + + ropts := nsopts.ProcessOpts(options.ResolveOpts) + if ropts.Depth != nsopts.DefaultDepthLimit && ropts.Depth != 1 { + return nil, fmt.Errorf("Name.Resolve: depth other than 1 or %d not supported", nsopts.DefaultDepthLimit) + } + + req := api.core().Request("name/resolve", name). + Option("nocache", !options.Cache). + Option("recursive", ropts.Depth != 1). + Option("dht-record-count", ropts.DhtRecordCount). + Option("dht-timeout", ropts.DhtTimeout). + Option("stream", true) + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + res := make(chan iface.IpnsResult) + + go func() { + defer close(res) + defer resp.Close() + + dec := json.NewDecoder(resp.Output) + + for { + var out struct{ Path string } + err := dec.Decode(&out) + if err == io.EOF { + return + } + var ires iface.IpnsResult + if err == nil { + ires.Path = path.New(out.Path) + } + + select { + case res <- ires: + case <-ctx.Done(): + } + if err != nil { + return + } + } + }() + + return res, nil +} + +func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (path.Path, error) { + options, err := caopts.NameResolveOptions(opts...) + if err != nil { + return nil, err + } + + ropts := nsopts.ProcessOpts(options.ResolveOpts) + if ropts.Depth != nsopts.DefaultDepthLimit && ropts.Depth != 1 { + return nil, fmt.Errorf("Name.Resolve: depth other than 1 or %d not supported", nsopts.DefaultDepthLimit) + } + + req := api.core().Request("name/resolve", name). + Option("nocache", !options.Cache). + Option("recursive", ropts.Depth != 1). + Option("dht-record-count", ropts.DhtRecordCount). + Option("dht-timeout", ropts.DhtTimeout) + + var out struct{ Path string } + if err := req.Exec(ctx, &out); err != nil { + return nil, err + } + + return path.New(out.Path), nil +} + +func (api *NameAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/object.go b/client/rpc/object.go new file mode 100644 index 000000000000..4e3b9ef6be18 --- /dev/null +++ b/client/rpc/object.go @@ -0,0 +1,260 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/boxo/ipld/merkledag" + ft "github.com/ipfs/boxo/ipld/unixfs" + "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" +) + +type ObjectAPI HttpApi + +type objectOut struct { + Hash string +} + +func (api *ObjectAPI) New(ctx context.Context, opts ...caopts.ObjectNewOption) (ipld.Node, error) { + options, err := caopts.ObjectNewOptions(opts...) + if err != nil { + return nil, err + } + + var n ipld.Node + switch options.Type { + case "empty": + n = new(merkledag.ProtoNode) + case "unixfs-dir": + n = ft.EmptyDirNode() + default: + return nil, fmt.Errorf("unknown object type: %s", options.Type) + } + + return n, nil +} + +func (api *ObjectAPI) Put(ctx context.Context, r io.Reader, opts ...caopts.ObjectPutOption) (path.Resolved, error) { + options, err := caopts.ObjectPutOptions(opts...) + if err != nil { + return nil, err + } + + var out objectOut + err = api.core().Request("object/put"). + Option("inputenc", options.InputEnc). + Option("datafieldenc", options.DataType). + Option("pin", options.Pin). + FileBody(r). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return path.IpfsPath(c), nil +} + +func (api *ObjectAPI) Get(ctx context.Context, p path.Path) (ipld.Node, error) { + r, err := api.core().Block().Get(ctx, p) + if err != nil { + return nil, err + } + b, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + return merkledag.DecodeProtobuf(b) +} + +func (api *ObjectAPI) Data(ctx context.Context, p path.Path) (io.Reader, error) { + resp, err := api.core().Request("object/data", p.String()).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + //TODO: make Data return ReadCloser to avoid copying + defer resp.Close() + b := new(bytes.Buffer) + if _, err := io.Copy(b, resp.Output); err != nil { + return nil, err + } + + return b, nil +} + +func (api *ObjectAPI) Links(ctx context.Context, p path.Path) ([]*ipld.Link, error) { + var out struct { + Links []struct { + Name string + Hash string + Size uint64 + } + } + if err := api.core().Request("object/links", p.String()).Exec(ctx, &out); err != nil { + return nil, err + } + res := make([]*ipld.Link, len(out.Links)) + for i, l := range out.Links { + c, err := cid.Parse(l.Hash) + if err != nil { + return nil, err + } + + res[i] = &ipld.Link{ + Cid: c, + Name: l.Name, + Size: l.Size, + } + } + + return res, nil +} + +func (api *ObjectAPI) Stat(ctx context.Context, p path.Path) (*iface.ObjectStat, error) { + var out struct { + Hash string + NumLinks int + BlockSize int + LinksSize int + DataSize int + CumulativeSize int + } + if err := api.core().Request("object/stat", p.String()).Exec(ctx, &out); err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return &iface.ObjectStat{ + Cid: c, + NumLinks: out.NumLinks, + BlockSize: out.BlockSize, + LinksSize: out.LinksSize, + DataSize: out.DataSize, + CumulativeSize: out.CumulativeSize, + }, nil +} + +func (api *ObjectAPI) AddLink(ctx context.Context, base path.Path, name string, child path.Path, opts ...caopts.ObjectAddLinkOption) (path.Resolved, error) { + options, err := caopts.ObjectAddLinkOptions(opts...) + if err != nil { + return nil, err + } + + var out objectOut + err = api.core().Request("object/patch/add-link", base.String(), name, child.String()). + Option("create", options.Create). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return path.IpfsPath(c), nil +} + +func (api *ObjectAPI) RmLink(ctx context.Context, base path.Path, link string) (path.Resolved, error) { + var out objectOut + err := api.core().Request("object/patch/rm-link", base.String(), link). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return path.IpfsPath(c), nil +} + +func (api *ObjectAPI) AppendData(ctx context.Context, p path.Path, r io.Reader) (path.Resolved, error) { + var out objectOut + err := api.core().Request("object/patch/append-data", p.String()). + FileBody(r). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return path.IpfsPath(c), nil +} + +func (api *ObjectAPI) SetData(ctx context.Context, p path.Path, r io.Reader) (path.Resolved, error) { + var out objectOut + err := api.core().Request("object/patch/set-data", p.String()). + FileBody(r). + Exec(ctx, &out) + if err != nil { + return nil, err + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return path.IpfsPath(c), nil +} + +type change struct { + Type iface.ChangeType + Path string + Before cid.Cid + After cid.Cid +} + +func (api *ObjectAPI) Diff(ctx context.Context, a path.Path, b path.Path) ([]iface.ObjectChange, error) { + var out struct { + Changes []change + } + if err := api.core().Request("object/diff", a.String(), b.String()).Exec(ctx, &out); err != nil { + return nil, err + } + res := make([]iface.ObjectChange, len(out.Changes)) + for i, ch := range out.Changes { + res[i] = iface.ObjectChange{ + Type: ch.Type, + Path: ch.Path, + } + if ch.Before != cid.Undef { + res[i].Before = path.IpfsPath(ch.Before) + } + if ch.After != cid.Undef { + res[i].After = path.IpfsPath(ch.After) + } + } + return res, nil +} + +func (api *ObjectAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/path.go b/client/rpc/path.go new file mode 100644 index 000000000000..d69d425ab664 --- /dev/null +++ b/client/rpc/path.go @@ -0,0 +1,52 @@ +package httpapi + +import ( + "context" + + "github.com/ipfs/boxo/coreiface/path" + ipfspath "github.com/ipfs/boxo/path" + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" +) + +func (api *HttpApi) ResolvePath(ctx context.Context, p path.Path) (path.Resolved, error) { + var out struct { + Cid cid.Cid + RemPath string + } + + //TODO: this is hacky, fixing https://github.com/ipfs/go-ipfs/issues/5703 would help + + var err error + if p.Namespace() == "ipns" { + if p, err = api.Name().Resolve(ctx, p.String()); err != nil { + return nil, err + } + } + + if err := api.Request("dag/resolve", p.String()).Exec(ctx, &out); err != nil { + return nil, err + } + + // TODO: + ipath, err := ipfspath.FromSegments("/"+p.Namespace()+"/", out.Cid.String(), out.RemPath) + if err != nil { + return nil, err + } + + root, err := cid.Parse(ipfspath.Path(p.String()).Segments()[1]) + if err != nil { + return nil, err + } + + return path.NewResolvedPath(ipath, out.Cid, root, out.RemPath), nil +} + +func (api *HttpApi) ResolveNode(ctx context.Context, p path.Path) (ipld.Node, error) { + rp, err := api.ResolvePath(ctx, p) + if err != nil { + return nil, err + } + + return api.Dag().Get(ctx, rp.Cid()) +} diff --git a/client/rpc/pin.go b/client/rpc/pin.go new file mode 100644 index 000000000000..30a3d7b7a902 --- /dev/null +++ b/client/rpc/pin.go @@ -0,0 +1,220 @@ +package httpapi + +import ( + "context" + "encoding/json" + "strings" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/go-cid" + "github.com/pkg/errors" +) + +type PinAPI HttpApi + +type pinRefKeyObject struct { + Type string +} + +type pinRefKeyList struct { + Keys map[string]pinRefKeyObject +} + +type pin struct { + path path.Resolved + typ string + err error +} + +func (p *pin) Err() error { + return p.err +} + +func (p *pin) Path() path.Resolved { + return p.path +} + +func (p *pin) Type() string { + return p.typ +} + +func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOption) error { + options, err := caopts.PinAddOptions(opts...) + if err != nil { + return err + } + + return api.core().Request("pin/add", p.String()). + Option("recursive", options.Recursive).Exec(ctx, nil) +} + +func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) (<-chan iface.Pin, error) { + options, err := caopts.PinLsOptions(opts...) + if err != nil { + return nil, err + } + + var out pinRefKeyList + err = api.core().Request("pin/ls"). + Option("type", options.Type).Exec(ctx, &out) + if err != nil { + return nil, err + } + + pins := make(chan iface.Pin) + go func(ch chan<- iface.Pin) { + defer close(ch) + for hash, p := range out.Keys { + c, e := cid.Parse(hash) + if e != nil { + ch <- &pin{typ: p.Type, err: e} + return + } + ch <- &pin{typ: p.Type, path: path.IpldPath(c), err: e} + } + }(pins) + return pins, nil +} + +// IsPinned returns whether or not the given cid is pinned +// and an explanation of why its pinned +func (api *PinAPI) IsPinned(ctx context.Context, p path.Path, opts ...caopts.PinIsPinnedOption) (string, bool, error) { + options, err := caopts.PinIsPinnedOptions(opts...) + if err != nil { + return "", false, err + } + var out pinRefKeyList + err = api.core().Request("pin/ls"). + Option("type", options.WithType). + Option("arg", p.String()). + Exec(ctx, &out) + if err != nil { + // TODO: This error-type discrimination based on sub-string matching is brittle. + // It is addressed by this open issue: https://github.com/ipfs/go-ipfs/issues/7563 + if strings.Contains(err.Error(), "is not pinned") { + return "", false, nil + } + return "", false, err + } + + for _, obj := range out.Keys { + return obj.Type, true, nil + } + return "", false, errors.New("http api returned no error and no results") +} + +func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOption) error { + options, err := caopts.PinRmOptions(opts...) + if err != nil { + return err + } + + return api.core().Request("pin/rm", p.String()). + Option("recursive", options.Recursive). + Exec(ctx, nil) +} + +func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opts ...caopts.PinUpdateOption) error { + options, err := caopts.PinUpdateOptions(opts...) + if err != nil { + return err + } + + return api.core().Request("pin/update", from.String(), to.String()). + Option("unpin", options.Unpin).Exec(ctx, nil) +} + +type pinVerifyRes struct { + ok bool + badNodes []iface.BadPinNode +} + +func (r *pinVerifyRes) Ok() bool { + return r.ok +} + +func (r *pinVerifyRes) BadNodes() []iface.BadPinNode { + return r.badNodes +} + +type badNode struct { + err error + cid cid.Cid +} + +func (n *badNode) Path() path.Resolved { + return path.IpldPath(n.cid) +} + +func (n *badNode) Err() error { + return n.err +} + +func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) { + resp, err := api.core().Request("pin/verify").Option("verbose", true).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + res := make(chan iface.PinStatus) + + go func() { + defer resp.Close() + defer close(res) + dec := json.NewDecoder(resp.Output) + for { + var out struct { + Cid string + Ok bool + + BadNodes []struct { + Cid string + Err string + } + } + if err := dec.Decode(&out); err != nil { + return // todo: handle non io.EOF somehow + } + + badNodes := make([]iface.BadPinNode, len(out.BadNodes)) + for i, n := range out.BadNodes { + c, err := cid.Decode(n.Cid) + if err != nil { + badNodes[i] = &badNode{ + cid: c, + err: err, + } + continue + } + + if n.Err != "" { + err = errors.New(n.Err) + } + badNodes[i] = &badNode{ + cid: c, + err: err, + } + } + + select { + case res <- &pinVerifyRes{ + ok: out.Ok, + + badNodes: badNodes, + }: + case <-ctx.Done(): + return + } + } + }() + + return res, nil +} + +func (api *PinAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/pubsub.go b/client/rpc/pubsub.go new file mode 100644 index 000000000000..28f1ef8e609c --- /dev/null +++ b/client/rpc/pubsub.go @@ -0,0 +1,214 @@ +package httpapi + +import ( + "bytes" + "context" + "encoding/json" + "io" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/libp2p/go-libp2p/core/peer" + mbase "github.com/multiformats/go-multibase" +) + +type PubsubAPI HttpApi + +func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) { + var out struct { + Strings []string + } + + if err := api.core().Request("pubsub/ls").Exec(ctx, &out); err != nil { + return nil, err + } + topics := make([]string, len(out.Strings)) + for n, mb := range out.Strings { + _, topic, err := mbase.Decode(mb) + if err != nil { + return nil, err + } + topics[n] = string(topic) + } + return topics, nil +} + +func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { + options, err := caopts.PubSubPeersOptions(opts...) + if err != nil { + return nil, err + } + + var out struct { + Strings []string + } + + var optionalTopic string + if len(options.Topic) > 0 { + optionalTopic = toMultibase([]byte(options.Topic)) + } + if err := api.core().Request("pubsub/peers", optionalTopic).Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]peer.ID, len(out.Strings)) + for i, sid := range out.Strings { + id, err := peer.Decode(sid) + if err != nil { + return nil, err + } + res[i] = id + } + return res, nil +} + +func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error { + return api.core().Request("pubsub/pub", toMultibase([]byte(topic))). + FileBody(bytes.NewReader(message)). + Exec(ctx, nil) +} + +type pubsubSub struct { + messages chan pubsubMessage + + done chan struct{} + rcloser func() error +} + +type pubsubMessage struct { + JFrom string `json:"from,omitempty"` + JData string `json:"data,omitempty"` + JSeqno string `json:"seqno,omitempty"` + JTopicIDs []string `json:"topicIDs,omitempty"` + + // real values after unpacking from text/multibase envelopes + from peer.ID + data []byte + seqno []byte + topics []string + + err error +} + +func (msg *pubsubMessage) From() peer.ID { + return msg.from +} + +func (msg *pubsubMessage) Data() []byte { + return msg.data +} + +func (msg *pubsubMessage) Seq() []byte { + return msg.seqno +} + +// TODO: do we want to keep this interface as []string, +// or change to more correct [][]byte? +func (msg *pubsubMessage) Topics() []string { + return msg.topics +} + +func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) { + select { + case msg, ok := <-s.messages: + if !ok { + return nil, io.EOF + } + if msg.err != nil { + return nil, msg.err + } + // unpack values from text/multibase envelopes + var err error + msg.from, err = peer.Decode(msg.JFrom) + if err != nil { + return nil, err + } + _, msg.data, err = mbase.Decode(msg.JData) + if err != nil { + return nil, err + } + _, msg.seqno, err = mbase.Decode(msg.JSeqno) + if err != nil { + return nil, err + } + for _, mbt := range msg.JTopicIDs { + _, topic, err := mbase.Decode(mbt) + if err != nil { + return nil, err + } + msg.topics = append(msg.topics, string(topic)) + } + return &msg, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) { + /* right now we have no options (discover got deprecated) + options, err := caopts.PubSubSubscribeOptions(opts...) + if err != nil { + return nil, err + } + */ + resp, err := api.core().Request("pubsub/sub", toMultibase([]byte(topic))).Send(ctx) + + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + sub := &pubsubSub{ + messages: make(chan pubsubMessage), + done: make(chan struct{}), + rcloser: func() error { + return resp.Cancel() + }, + } + + dec := json.NewDecoder(resp.Output) + + go func() { + defer close(sub.messages) + + for { + var msg pubsubMessage + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + return + } + msg.err = err + } + + select { + case sub.messages <- msg: + case <-sub.done: + return + case <-ctx.Done(): + return + } + } + }() + + return sub, nil +} + +func (s *pubsubSub) Close() error { + if s.done != nil { + close(s.done) + s.done = nil + } + return s.rcloser() +} + +func (api *PubsubAPI) core() *HttpApi { + return (*HttpApi)(api) +} + +// Encodes bytes into URL-safe multibase that can be sent over HTTP RPC (URL or body) +func toMultibase(data []byte) string { + mb, _ := mbase.Encode(mbase.Base64url, data) + return mb +} diff --git a/client/rpc/request.go b/client/rpc/request.go new file mode 100644 index 000000000000..dd5293b7a6ed --- /dev/null +++ b/client/rpc/request.go @@ -0,0 +1,36 @@ +package httpapi + +import ( + "context" + "io" + "strings" +) + +type Request struct { + Ctx context.Context + ApiBase string + Command string + Args []string + Opts map[string]string + Body io.Reader + Headers map[string]string +} + +func NewRequest(ctx context.Context, url, command string, args ...string) *Request { + if !strings.HasPrefix(url, "http") { + url = "http://" + url + } + + opts := map[string]string{ + "encoding": "json", + "stream-channels": "true", + } + return &Request{ + Ctx: ctx, + ApiBase: url + "/api/v0", + Command: command, + Args: args, + Opts: opts, + Headers: make(map[string]string), + } +} diff --git a/client/rpc/requestbuilder.go b/client/rpc/requestbuilder.go new file mode 100644 index 000000000000..476aed7866d1 --- /dev/null +++ b/client/rpc/requestbuilder.go @@ -0,0 +1,127 @@ +package httpapi + +import ( + "bytes" + "context" + "fmt" + "io" + "strconv" + "strings" + + "github.com/ipfs/boxo/files" +) + +type RequestBuilder interface { + Arguments(args ...string) RequestBuilder + BodyString(body string) RequestBuilder + BodyBytes(body []byte) RequestBuilder + Body(body io.Reader) RequestBuilder + FileBody(body io.Reader) RequestBuilder + Option(key string, value interface{}) RequestBuilder + Header(name, value string) RequestBuilder + Send(ctx context.Context) (*Response, error) + Exec(ctx context.Context, res interface{}) error +} + +// requestBuilder is an IPFS commands request builder. +type requestBuilder struct { + command string + args []string + opts map[string]string + headers map[string]string + body io.Reader + + shell *HttpApi +} + +// Arguments adds the arguments to the args. +func (r *requestBuilder) Arguments(args ...string) RequestBuilder { + r.args = append(r.args, args...) + return r +} + +// BodyString sets the request body to the given string. +func (r *requestBuilder) BodyString(body string) RequestBuilder { + return r.Body(strings.NewReader(body)) +} + +// BodyBytes sets the request body to the given buffer. +func (r *requestBuilder) BodyBytes(body []byte) RequestBuilder { + return r.Body(bytes.NewReader(body)) +} + +// Body sets the request body to the given reader. +func (r *requestBuilder) Body(body io.Reader) RequestBuilder { + r.body = body + return r +} + +// FileBody sets the request body to the given reader wrapped into multipartreader. +func (r *requestBuilder) FileBody(body io.Reader) RequestBuilder { + pr, _ := files.NewReaderPathFile("/dev/stdin", io.NopCloser(body), nil) + d := files.NewMapDirectory(map[string]files.Node{"": pr}) + r.body = files.NewMultiFileReader(d, false) + + return r +} + +// Option sets the given option. +func (r *requestBuilder) Option(key string, value interface{}) RequestBuilder { + var s string + switch v := value.(type) { + case bool: + s = strconv.FormatBool(v) + case string: + s = v + case []byte: + s = string(v) + default: + // slow case. + s = fmt.Sprint(value) + } + if r.opts == nil { + r.opts = make(map[string]string, 1) + } + r.opts[key] = s + return r +} + +// Header sets the given header. +func (r *requestBuilder) Header(name, value string) RequestBuilder { + if r.headers == nil { + r.headers = make(map[string]string, 1) + } + r.headers[name] = value + return r +} + +// Send sends the request and return the response. +func (r *requestBuilder) Send(ctx context.Context) (*Response, error) { + r.shell.applyGlobal(r) + + req := NewRequest(ctx, r.shell.url, r.command, r.args...) + req.Opts = r.opts + req.Headers = r.headers + req.Body = r.body + return req.Send(&r.shell.httpcli) +} + +// Exec sends the request a request and decodes the response. +func (r *requestBuilder) Exec(ctx context.Context, res interface{}) error { + httpRes, err := r.Send(ctx) + if err != nil { + return err + } + + if res == nil { + lateErr := httpRes.Close() + if httpRes.Error != nil { + return httpRes.Error + } + return lateErr + } + + return httpRes.decode(res) +} + +var _ RequestBuilder = &requestBuilder{} diff --git a/client/rpc/response.go b/client/rpc/response.go new file mode 100644 index 000000000000..189b43671614 --- /dev/null +++ b/client/rpc/response.go @@ -0,0 +1,170 @@ +package httpapi + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "mime" + "net/http" + "net/url" + "os" + + "github.com/ipfs/boxo/files" + cmds "github.com/ipfs/go-ipfs-cmds" + cmdhttp "github.com/ipfs/go-ipfs-cmds/http" +) + +type Error = cmds.Error + +type trailerReader struct { + resp *http.Response +} + +func (r *trailerReader) Read(b []byte) (int, error) { + n, err := r.resp.Body.Read(b) + if err != nil { + if e := r.resp.Trailer.Get(cmdhttp.StreamErrHeader); e != "" { + err = errors.New(e) + } + } + return n, err +} + +func (r *trailerReader) Close() error { + return r.resp.Body.Close() +} + +type Response struct { + Output io.ReadCloser + Error *Error +} + +func (r *Response) Close() error { + if r.Output != nil { + + // drain output (response body) + _, err1 := io.Copy(io.Discard, r.Output) + err2 := r.Output.Close() + if err1 != nil { + return err1 + } + return err2 + } + return nil +} + +// Cancel aborts running request (without draining request body) +func (r *Response) Cancel() error { + if r.Output != nil { + return r.Output.Close() + } + + return nil +} + +// Decode reads request body and decodes it as json +func (r *Response) decode(dec interface{}) error { + if r.Error != nil { + return r.Error + } + + err := json.NewDecoder(r.Output).Decode(dec) + err2 := r.Close() + if err != nil { + return err + } + + return err2 +} + +func (r *Request) Send(c *http.Client) (*Response, error) { + url := r.getURL() + req, err := http.NewRequest("POST", url, r.Body) + if err != nil { + return nil, err + } + + req = req.WithContext(r.Ctx) + + // Add any headers that were supplied via the requestBuilder. + for k, v := range r.Headers { + req.Header.Add(k, v) + } + + if fr, ok := r.Body.(*files.MultiFileReader); ok { + req.Header.Set("Content-Type", "multipart/form-data; boundary="+fr.Boundary()) + req.Header.Set("Content-Disposition", "form-data; name=\"files\"") + } + + resp, err := c.Do(req) + if err != nil { + return nil, err + } + + contentType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return nil, err + } + + nresp := new(Response) + + nresp.Output = &trailerReader{resp} + if resp.StatusCode >= http.StatusBadRequest { + e := new(Error) + switch { + case resp.StatusCode == http.StatusNotFound: + e.Message = "command not found" + case contentType == "text/plain": + out, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Fprintf(os.Stderr, "ipfs-shell: warning! response (%d) read error: %s\n", resp.StatusCode, err) + } + e.Message = string(out) + + // set special status codes. + switch resp.StatusCode { + case http.StatusNotFound, http.StatusBadRequest: + e.Code = cmds.ErrClient + case http.StatusTooManyRequests: + e.Code = cmds.ErrRateLimited + case http.StatusForbidden: + e.Code = cmds.ErrForbidden + } + case contentType == "application/json": + if err = json.NewDecoder(resp.Body).Decode(e); err != nil { + fmt.Fprintf(os.Stderr, "ipfs-shell: warning! response (%d) unmarshall error: %s\n", resp.StatusCode, err) + } + default: + // This is a server-side bug (probably). + e.Code = cmds.ErrImplementation + fmt.Fprintf(os.Stderr, "ipfs-shell: warning! unhandled response (%d) encoding: %s", resp.StatusCode, contentType) + out, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Fprintf(os.Stderr, "ipfs-shell: response (%d) read error: %s\n", resp.StatusCode, err) + } + e.Message = fmt.Sprintf("unknown ipfs-shell error encoding: %q - %q", contentType, out) + } + nresp.Error = e + nresp.Output = nil + + // drain body and close + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } + + return nresp, nil +} + +func (r *Request) getURL() string { + + values := make(url.Values) + for _, arg := range r.Args { + values.Add("arg", arg) + } + for k, v := range r.Opts { + values.Add(k, v) + } + + return fmt.Sprintf("%s/%s?%s", r.ApiBase, r.Command, values.Encode()) +} diff --git a/client/rpc/routing.go b/client/rpc/routing.go new file mode 100644 index 000000000000..abff25efe911 --- /dev/null +++ b/client/rpc/routing.go @@ -0,0 +1,63 @@ +package httpapi + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + + "github.com/ipfs/boxo/coreiface/options" + "github.com/libp2p/go-libp2p/core/routing" +) + +type RoutingAPI HttpApi + +func (api *RoutingAPI) Get(ctx context.Context, key string) ([]byte, error) { + resp, err := api.core().Request("routing/get", key).Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + defer resp.Close() + + var out routing.QueryEvent + + dec := json.NewDecoder(resp.Output) + if err := dec.Decode(&out); err != nil { + return nil, err + } + + res, err := base64.StdEncoding.DecodeString(out.Extra) + if err != nil { + return nil, err + } + + return res, nil +} + +func (api *RoutingAPI) Put(ctx context.Context, key string, value []byte, opts ...options.RoutingPutOption) error { + var cfg options.RoutingPutSettings + for _, o := range opts { + if err := o(&cfg); err != nil { + return err + } + } + + resp, err := api.core().Request("routing/put", key). + FileBody(bytes.NewReader(value)). + Send(ctx) + + if err != nil { + return err + } + if resp.Error != nil { + return resp.Error + } + return nil +} + +func (api *RoutingAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/swarm.go b/client/rpc/swarm.go new file mode 100644 index 000000000000..9b073078da0a --- /dev/null +++ b/client/rpc/swarm.go @@ -0,0 +1,187 @@ +package httpapi + +import ( + "context" + "time" + + iface "github.com/ipfs/boxo/coreiface" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" +) + +type SwarmAPI HttpApi + +func (api *SwarmAPI) Connect(ctx context.Context, pi peer.AddrInfo) error { + pidma, err := multiaddr.NewComponent("p2p", pi.ID.Pretty()) + if err != nil { + return err + } + + saddrs := make([]string, len(pi.Addrs)) + for i, addr := range pi.Addrs { + saddrs[i] = addr.Encapsulate(pidma).String() + } + + return api.core().Request("swarm/connect", saddrs...).Exec(ctx, nil) +} + +func (api *SwarmAPI) Disconnect(ctx context.Context, addr multiaddr.Multiaddr) error { + return api.core().Request("swarm/disconnect", addr.String()).Exec(ctx, nil) +} + +type connInfo struct { + addr multiaddr.Multiaddr + peer peer.ID + latency time.Duration + muxer string + direction network.Direction + streams []protocol.ID +} + +func (c *connInfo) ID() peer.ID { + return c.peer +} + +func (c *connInfo) Address() multiaddr.Multiaddr { + return c.addr +} + +func (c *connInfo) Direction() network.Direction { + return c.direction +} + +func (c *connInfo) Latency() (time.Duration, error) { + return c.latency, nil +} + +func (c *connInfo) Streams() ([]protocol.ID, error) { + return c.streams, nil +} + +func (api *SwarmAPI) Peers(ctx context.Context) ([]iface.ConnectionInfo, error) { + var resp struct { + Peers []struct { + Addr string + Peer string + Latency string + Muxer string + Direction network.Direction + Streams []struct { + Protocol string + } + } + } + + err := api.core().Request("swarm/peers"). + Option("streams", true). + Option("latency", true). + Exec(ctx, &resp) + if err != nil { + return nil, err + } + + res := make([]iface.ConnectionInfo, len(resp.Peers)) + for i, conn := range resp.Peers { + latency, _ := time.ParseDuration(conn.Latency) + out := &connInfo{ + latency: latency, + muxer: conn.Muxer, + direction: conn.Direction, + } + + out.peer, err = peer.Decode(conn.Peer) + if err != nil { + return nil, err + } + + out.addr, err = multiaddr.NewMultiaddr(conn.Addr) + if err != nil { + return nil, err + } + + out.streams = make([]protocol.ID, len(conn.Streams)) + for i, p := range conn.Streams { + out.streams[i] = protocol.ID(p.Protocol) + } + + res[i] = out + } + + return res, nil +} + +func (api *SwarmAPI) KnownAddrs(ctx context.Context) (map[peer.ID][]multiaddr.Multiaddr, error) { + var out struct { + Addrs map[string][]string + } + if err := api.core().Request("swarm/addrs").Exec(ctx, &out); err != nil { + return nil, err + } + res := map[peer.ID][]multiaddr.Multiaddr{} + for spid, saddrs := range out.Addrs { + addrs := make([]multiaddr.Multiaddr, len(saddrs)) + + for i, addr := range saddrs { + a, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + addrs[i] = a + } + + pid, err := peer.Decode(spid) + if err != nil { + return nil, err + } + + res[pid] = addrs + } + + return res, nil +} + +func (api *SwarmAPI) LocalAddrs(ctx context.Context) ([]multiaddr.Multiaddr, error) { + var out struct { + Strings []string + } + + if err := api.core().Request("swarm/addrs/local").Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]multiaddr.Multiaddr, len(out.Strings)) + for i, addr := range out.Strings { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + res[i] = ma + } + return res, nil +} + +func (api *SwarmAPI) ListenAddrs(ctx context.Context) ([]multiaddr.Multiaddr, error) { + var out struct { + Strings []string + } + + if err := api.core().Request("swarm/addrs/listen").Exec(ctx, &out); err != nil { + return nil, err + } + + res := make([]multiaddr.Multiaddr, len(out.Strings)) + for i, addr := range out.Strings { + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + return nil, err + } + res[i] = ma + } + return res, nil +} + +func (api *SwarmAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/client/rpc/unixfs.go b/client/rpc/unixfs.go new file mode 100644 index 000000000000..b9c34c59ffcf --- /dev/null +++ b/client/rpc/unixfs.go @@ -0,0 +1,230 @@ +package httpapi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + + iface "github.com/ipfs/boxo/coreiface" + caopts "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/boxo/files" + unixfs "github.com/ipfs/boxo/ipld/unixfs" + unixfs_pb "github.com/ipfs/boxo/ipld/unixfs/pb" + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +type addEvent struct { + Name string + Hash string `json:",omitempty"` + Bytes int64 `json:",omitempty"` + Size string `json:",omitempty"` +} + +type UnixfsAPI HttpApi + +func (api *UnixfsAPI) Add(ctx context.Context, f files.Node, opts ...caopts.UnixfsAddOption) (path.Resolved, error) { + options, _, err := caopts.UnixfsAddOptions(opts...) + if err != nil { + return nil, err + } + + mht, ok := mh.Codes[options.MhType] + if !ok { + return nil, fmt.Errorf("unknowm mhType %d", options.MhType) + } + + req := api.core().Request("add"). + Option("hash", mht). + Option("chunker", options.Chunker). + Option("cid-version", options.CidVersion). + Option("fscache", options.FsCache). + Option("inline", options.Inline). + Option("inline-limit", options.InlineLimit). + Option("nocopy", options.NoCopy). + Option("only-hash", options.OnlyHash). + Option("pin", options.Pin). + Option("silent", options.Silent). + Option("progress", options.Progress) + + if options.RawLeavesSet { + req.Option("raw-leaves", options.RawLeaves) + } + + switch options.Layout { + case caopts.BalancedLayout: + // noop, default + case caopts.TrickleLayout: + req.Option("trickle", true) + } + + d := files.NewMapDirectory(map[string]files.Node{"": f}) // unwrapped on the other side + req.Body(files.NewMultiFileReader(d, false)) + + var out addEvent + resp, err := req.Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + defer resp.Output.Close() + dec := json.NewDecoder(resp.Output) +loop: + for { + var evt addEvent + switch err := dec.Decode(&evt); err { + case nil: + case io.EOF: + break loop + default: + return nil, err + } + out = evt + + if options.Events != nil { + ifevt := &iface.AddEvent{ + Name: out.Name, + Size: out.Size, + Bytes: out.Bytes, + } + + if out.Hash != "" { + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + ifevt.Path = path.IpfsPath(c) + } + + select { + case options.Events <- ifevt: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + } + + c, err := cid.Parse(out.Hash) + if err != nil { + return nil, err + } + + return path.IpfsPath(c), nil +} + +type lsLink struct { + Name, Hash string + Size uint64 + Type unixfs_pb.Data_DataType + Target string +} + +type lsObject struct { + Hash string + Links []lsLink +} + +type lsOutput struct { + Objects []lsObject +} + +func (api *UnixfsAPI) Ls(ctx context.Context, p path.Path, opts ...caopts.UnixfsLsOption) (<-chan iface.DirEntry, error) { + options, err := caopts.UnixfsLsOptions(opts...) + if err != nil { + return nil, err + } + + resp, err := api.core().Request("ls", p.String()). + Option("resolve-type", options.ResolveChildren). + Option("size", options.ResolveChildren). + Option("stream", true). + Send(ctx) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + dec := json.NewDecoder(resp.Output) + out := make(chan iface.DirEntry) + + go func() { + defer resp.Close() + defer close(out) + + for { + var link lsOutput + if err := dec.Decode(&link); err != nil { + if err == io.EOF { + return + } + select { + case out <- iface.DirEntry{Err: err}: + case <-ctx.Done(): + } + return + } + + if len(link.Objects) != 1 { + select { + case out <- iface.DirEntry{Err: errors.New("unexpected Objects len")}: + case <-ctx.Done(): + } + return + } + + if len(link.Objects[0].Links) != 1 { + select { + case out <- iface.DirEntry{Err: errors.New("unexpected Links len")}: + case <-ctx.Done(): + } + return + } + + l0 := link.Objects[0].Links[0] + + c, err := cid.Decode(l0.Hash) + if err != nil { + select { + case out <- iface.DirEntry{Err: err}: + case <-ctx.Done(): + } + return + } + + var ftype iface.FileType + switch l0.Type { + case unixfs.TRaw, unixfs.TFile: + ftype = iface.TFile + case unixfs.THAMTShard, unixfs.TDirectory, unixfs.TMetadata: + ftype = iface.TDirectory + case unixfs.TSymlink: + ftype = iface.TSymlink + } + + select { + case out <- iface.DirEntry{ + Name: l0.Name, + Cid: c, + Size: l0.Size, + Type: ftype, + Target: l0.Target, + }: + case <-ctx.Done(): + } + } + }() + + return out, nil +} + +func (api *UnixfsAPI) core() *HttpApi { + return (*HttpApi)(api) +} diff --git a/core/coreapi/test/api_test.go b/core/coreapi/test/api_test.go index 64d8738963c1..fa09a96d8fbb 100644 --- a/core/coreapi/test/api_test.go +++ b/core/coreapi/test/api_test.go @@ -31,7 +31,7 @@ const testPeerID = "QmTFauExutTsy4XP6JbMFcw2Wa9645HJt2bTqL6qYDCKfe" type NodeProvider struct{} -func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, online bool, n int) ([]coreiface.CoreAPI, error) { +func (NodeProvider) MakeAPISwarm(t *testing.T, ctx context.Context, fullIdentity bool, online bool, n int) ([]coreiface.CoreAPI, error) { mn := mocknet.New() nodes := make([]*core.IpfsNode, n) @@ -120,5 +120,5 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, online } func TestIface(t *testing.T) { - tests.TestApi(&NodeProvider{})(t) + tests.TestApi(NodeProvider{})(t) } diff --git a/core/coreapi/test/path_test.go b/core/coreapi/test/path_test.go index c89ce48c9602..0dce72627990 100644 --- a/core/coreapi/test/path_test.go +++ b/core/coreapi/test/path_test.go @@ -19,7 +19,7 @@ func TestPathUnixFSHAMTPartial(t *testing.T) { defer cancel() // Create a node - apis, err := NodeProvider{}.MakeAPISwarm(ctx, true, true, 1) + apis, err := NodeProvider{}.MakeAPISwarm(t, ctx, true, true, 1) if err != nil { t.Fatal(err) } diff --git a/docs/examples/kubo-as-a-library/go.mod b/docs/examples/kubo-as-a-library/go.mod index e21648bd45be..656126fe4c90 100644 --- a/docs/examples/kubo-as-a-library/go.mod +++ b/docs/examples/kubo-as-a-library/go.mod @@ -7,7 +7,7 @@ go 1.18 replace github.com/ipfs/kubo => ./../../.. require ( - github.com/ipfs/boxo v0.8.2-0.20230529214945-86cdb2485dad + github.com/ipfs/boxo v0.8.2-0.20230530170953-5f18224deb24 github.com/ipfs/kubo v0.0.0-00010101000000-000000000000 github.com/libp2p/go-libp2p v0.27.3 github.com/multiformats/go-multiaddr v0.9.0 diff --git a/docs/examples/kubo-as-a-library/go.sum b/docs/examples/kubo-as-a-library/go.sum index d6dd8467980c..572f361775f3 100644 --- a/docs/examples/kubo-as-a-library/go.sum +++ b/docs/examples/kubo-as-a-library/go.sum @@ -321,8 +321,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230529214945-86cdb2485dad h1:2vkMvvVa5f9fWzts7OcJL6ZS0QaKCcEeOV6I+doPMo0= -github.com/ipfs/boxo v0.8.2-0.20230529214945-86cdb2485dad/go.mod h1:Ej2r08Z4VIaFKqY08UXMNhwcLf6VekHhK8c+KqA1B9Y= +github.com/ipfs/boxo v0.8.2-0.20230530170953-5f18224deb24 h1:En26p9blUTfBHuETTKW+jibsa5GrrsN0PlPS7IWy/70= +github.com/ipfs/boxo v0.8.2-0.20230530170953-5f18224deb24/go.mod h1:Ej2r08Z4VIaFKqY08UXMNhwcLf6VekHhK8c+KqA1B9Y= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= diff --git a/docs/http-rpc-clients.md b/docs/http-rpc-clients.md index b77c90a56470..0a6ea369ddda 100644 --- a/docs/http-rpc-clients.md +++ b/docs/http-rpc-clients.md @@ -2,13 +2,13 @@ Kubo provides official HTTP RPC (`/api/v0`) clients for selected languages: -- [js-kubo-rpc-client](https://github.com/ipfs/js-kubo-rpc-client) - Official JS client for talking to Kubo RPC over HTTP -- [go-ipfs-api](https://github.com/ipfs/go-ipfs-api) - The go interface to ipfs's HTTP RPC - Follow https://github.com/ipfs/kubo/issues/9124 for coming changes. -- [go-ipfs-http-client](https://github.com/ipfs/go-ipfs-http-client) - IPFS CoreAPI implementation using HTTP RPC - Follow https://github.com/ipfs/kubo/issues/9124 for coming changes. +- [`js-kubo-rpc-client`](https://github.com/ipfs/js-kubo-rpc-client) - Official JS client for talking to Kubo RPC over HTTP +- [`go-ipfs-api`](https://github.com/ipfs/go-ipfs-api) - The go interface to ipfs's HTTP RPC - Follow https://github.com/ipfs/kubo/issues/9124 for coming changes. +- [`httpapi`](./client/rpc) (previously `go-ipfs-http-client`)) - IPFS CoreAPI implementation using HTTP RPC ## Recommended clients -| Language | Package Name | Github Repository | -|:--------:|:-------------------:|---------------------------------------------| -| JS | kubo-rpc-client | https://github.com/ipfs/js-kubo-rpc-client | -| Go | go-ipfs-http-client | https://github.com/ipfs/go-ipfs-http-client | +| Language | Package Name | Github Repository | +|:--------:|:-------------------:|--------------------------------------------| +| JS | kubo-rpc-client | https://github.com/ipfs/js-kubo-rpc-client | +| Go | `httpapi` | [`./client/rpc`](./client/rpc) | diff --git a/go.mod b/go.mod index 006c3ab58e51..0bfaac95ff14 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/google/uuid v1.3.0 github.com/hashicorp/go-multierror v1.1.1 - github.com/ipfs/boxo v0.8.2-0.20230529214945-86cdb2485dad + github.com/ipfs/boxo v0.8.2-0.20230530170953-5f18224deb24 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-cidutil v0.1.0 @@ -79,6 +79,7 @@ require ( go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/dig v1.16.1 go.uber.org/fx v1.19.2 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.9.0 golang.org/x/mod v0.10.0 @@ -213,7 +214,6 @@ require ( go.opentelemetry.io/otel/metric v0.37.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.11.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/net v0.10.0 // indirect diff --git a/go.sum b/go.sum index 2717767c09c9..179cf3ff458d 100644 --- a/go.sum +++ b/go.sum @@ -356,8 +356,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230529214945-86cdb2485dad h1:2vkMvvVa5f9fWzts7OcJL6ZS0QaKCcEeOV6I+doPMo0= -github.com/ipfs/boxo v0.8.2-0.20230529214945-86cdb2485dad/go.mod h1:Ej2r08Z4VIaFKqY08UXMNhwcLf6VekHhK8c+KqA1B9Y= +github.com/ipfs/boxo v0.8.2-0.20230530170953-5f18224deb24 h1:En26p9blUTfBHuETTKW+jibsa5GrrsN0PlPS7IWy/70= +github.com/ipfs/boxo v0.8.2-0.20230530170953-5f18224deb24/go.mod h1:Ej2r08Z4VIaFKqY08UXMNhwcLf6VekHhK8c+KqA1B9Y= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=