From 1ba943f02f716bcbb42450dc8402337b5f2519ab Mon Sep 17 00:00:00 2001 From: Will Date: Wed, 10 Aug 2022 10:50:27 +0000 Subject: [PATCH] Add Provide RPC (#37) * Add Provide RPC per https://github.com/ipfs/specs/pull/285 This commit was moved from ipfs/go-delegated-routing@a6fd1a59adee7198f4206450577935e36f3dc943 --- routing/http/client/contentrouting.go | 13 +- routing/http/client/contentrouting_test.go | 9 + routing/http/client/findproviders.go | 43 +- routing/http/client/provide.go | 404 +++++++++ routing/http/gen/proto/proto_edelweiss.go | 906 +++++++++++++++++---- routing/http/gen/routing.go | 31 + routing/http/server/findproviders.go | 43 + routing/http/test/clientserver_test.go | 29 +- routing/http/test/fallbacks_test.go | 9 +- routing/http/test/provide_test.go | 54 ++ routing/http/test/servererror_test.go | 9 +- 11 files changed, 1396 insertions(+), 154 deletions(-) create mode 100644 routing/http/client/provide.go create mode 100644 routing/http/test/provide_test.go diff --git a/routing/http/client/contentrouting.go b/routing/http/client/contentrouting.go index 0756d380d..92ba4bb7f 100644 --- a/routing/http/client/contentrouting.go +++ b/routing/http/client/contentrouting.go @@ -2,6 +2,7 @@ package client import ( "context" + "time" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" @@ -18,8 +19,16 @@ func NewContentRoutingClient(c DelegatedRoutingClient) *ContentRoutingClient { return &ContentRoutingClient{client: c} } -func (c *ContentRoutingClient) Provide(context.Context, cid.Cid, bool) error { - return routing.ErrNotSupported +func (c *ContentRoutingClient) Provide(ctx context.Context, key cid.Cid, announce bool) error { + // If 'true' is + // passed, it also announces it, otherwise it is just kept in the local + // accounting of which objects are being provided. + if !announce { + return nil + } + + _, err := c.client.Provide(ctx, key, 24*time.Hour) + return err } func (c *ContentRoutingClient) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo { diff --git a/routing/http/client/contentrouting_test.go b/routing/http/client/contentrouting_test.go index e1438bc09..436c412df 100644 --- a/routing/http/client/contentrouting_test.go +++ b/routing/http/client/contentrouting_test.go @@ -3,6 +3,7 @@ package client import ( "context" "testing" + "time" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" @@ -45,6 +46,14 @@ func (t TestDelegatedRoutingClient) PutIPNSAsync(ctx context.Context, id []byte, panic("not supported") } +func (t TestDelegatedRoutingClient) ProvideAsync(ctx context.Context, key cid.Cid, ttl time.Duration) (<-chan time.Duration, error) { + panic("not supported") +} + +func (t TestDelegatedRoutingClient) Provide(ctx context.Context, key cid.Cid, tl time.Duration) (time.Duration, error) { + panic("not supported") +} + // TestContentRoutingFindProvidersUnlimitedResults is testing that ContentRoutingClient.FindProvidersAsync // correctly wraps DelegatedRoutingClient.FindProvidersAsync in the regime when the former allows for unlimited results. // This is a test of async semantics only. This is why values are not checked for validity. diff --git a/routing/http/client/findproviders.go b/routing/http/client/findproviders.go index 46a487aa1..b8e34187c 100644 --- a/routing/http/client/findproviders.go +++ b/routing/http/client/findproviders.go @@ -2,11 +2,15 @@ package client import ( "context" + "errors" + "time" "github.com/ipfs/go-cid" proto "github.com/ipfs/go-delegated-routing/gen/proto" ipns "github.com/ipfs/go-ipns" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/edelweiss/values" + "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" record "github.com/libp2p/go-libp2p-record" "github.com/multiformats/go-multiaddr" @@ -21,15 +25,33 @@ type DelegatedRoutingClient interface { GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error) PutIPNS(ctx context.Context, id []byte, record []byte) error PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error) + Provide(ctx context.Context, key cid.Cid, ttl time.Duration) (time.Duration, error) + ProvideAsync(ctx context.Context, key cid.Cid, ttl time.Duration) (<-chan time.Duration, error) } type Client struct { client proto.DelegatedRouting_Client validator record.Validator + + provider *Provider + identity crypto.PrivKey } -func NewClient(c proto.DelegatedRouting_Client) *Client { - return &Client{client: c, validator: ipns.Validator{}} +var _ DelegatedRoutingClient = (*Client)(nil) + +// NewClient creates a client. +// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function. +func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) { + if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) { + return nil, errors.New("identity does not match provider") + } + + return &Client{ + client: c, + validator: ipns.Validator{}, + provider: p, + identity: identity, + }, nil } func (fp *Client) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) { @@ -142,5 +164,22 @@ func ParseNodeAddresses(n *proto.Peer) []peer.AddrInfo { } infos = append(infos, peer.AddrInfo{ID: peerID, Addrs: []multiaddr.Multiaddr{ma}}) } + if len(n.Multiaddresses) == 0 { + infos = append(infos, peer.AddrInfo{ID: peerID}) + } return infos } + +// ToProtoPeer creates a protocol Peer structure from address info. +func ToProtoPeer(ai peer.AddrInfo) *proto.Peer { + p := proto.Peer{ + ID: values.Bytes(ai.ID), + Multiaddresses: make(proto.AnonList20, 0), + } + + for _, addr := range ai.Addrs { + p.Multiaddresses = append(p.Multiaddresses, addr.Bytes()) + } + + return &p +} diff --git a/routing/http/client/provide.go b/routing/http/client/provide.go new file mode 100644 index 000000000..0f62100d7 --- /dev/null +++ b/routing/http/client/provide.go @@ -0,0 +1,404 @@ +package client + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "fmt" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-delegated-routing/gen/proto" + "github.com/ipld/edelweiss/values" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multicodec" + "github.com/polydawn/refmt/cbor" +) + +// Provider represents the source publishing one or more CIDs +type Provider struct { + Peer peer.AddrInfo + ProviderProto []TransferProtocol +} + +// ToProto convers a provider into the wire proto form +func (p *Provider) ToProto() *proto.Provider { + pp := proto.Provider{ + ProviderNode: proto.Node{ + Peer: ToProtoPeer(p.Peer), + }, + ProviderProto: proto.TransferProtocolList{}, + } + for _, tp := range p.ProviderProto { + pp.ProviderProto = append(pp.ProviderProto, tp.ToProto()) + } + return &pp +} + +// TransferProtocol represents a data transfer protocol +type TransferProtocol struct { + Codec multicodec.Code + Payload []byte +} + +// GraphSyncFILv1 is the current filecoin storage provider protocol. +type GraphSyncFILv1 struct { + PieceCID cid.Cid + VerifiedDeal bool + FastRetrieval bool +} + +// ToProto converts a TransferProtocol to the wire representation +func (tp *TransferProtocol) ToProto() proto.TransferProtocol { + if tp.Codec == multicodec.TransportBitswap { + return proto.TransferProtocol{ + Bitswap: &proto.BitswapProtocol{}, + } + } else if tp.Codec == multicodec.TransportGraphsyncFilecoinv1 { + into := GraphSyncFILv1{} + if err := cbor.Unmarshal(cbor.DecodeOptions{}, tp.Payload, &into); err != nil { + return proto.TransferProtocol{} + } + return proto.TransferProtocol{ + GraphSyncFILv1: &proto.GraphSyncFILv1Protocol{ + PieceCID: proto.LinkToAny(into.PieceCID), + VerifiedDeal: values.Bool(into.VerifiedDeal), + FastRetrieval: values.Bool(into.FastRetrieval), + }, + } + } else { + return proto.TransferProtocol{} + } +} + +func parseProtocol(tp *proto.TransferProtocol) (TransferProtocol, error) { + if tp.Bitswap != nil { + return TransferProtocol{Codec: multicodec.TransportBitswap}, nil + } else if tp.GraphSyncFILv1 != nil { + pl := GraphSyncFILv1{ + PieceCID: cid.Cid(tp.GraphSyncFILv1.PieceCID), + VerifiedDeal: bool(tp.GraphSyncFILv1.VerifiedDeal), + FastRetrieval: bool(tp.GraphSyncFILv1.FastRetrieval), + } + plBytes, err := cbor.Marshal(&pl) + if err != nil { + return TransferProtocol{}, err + } + return TransferProtocol{ + Codec: multicodec.TransportGraphsyncFilecoinv1, + Payload: plBytes, + }, nil + } + return TransferProtocol{}, nil +} + +// ProvideRequest is a message indicating a provider can provide a Key for a given TTL +type ProvideRequest struct { + Key cid.Cid + *Provider + Timestamp int64 + AdvisoryTTL time.Duration + Signature []byte +} + +var provideSchema, provideSchemaErr = ipld.LoadSchemaBytes([]byte(` + type ProvideRequest struct { + Key &Any + Provider Provider + Timestamp Int + AdvisoryTTL Int + Signature Bytes + } + type Provider struct { + Peer Peer + ProviderProto [TransferProtocol] + } + type Peer struct { + ID String + Multiaddresses [Bytes] + } + type TransferProtocol struct { + Codec Int + Payload Bytes + } + `)) + +func init() { + if provideSchemaErr != nil { + panic(provideSchemaErr) + } +} + +func bytesToMA(b []byte) (interface{}, error) { + return multiaddr.NewMultiaddrBytes(b) +} +func maToBytes(iface interface{}) ([]byte, error) { + if ma, ok := iface.(multiaddr.Multiaddr); ok { + return ma.Bytes(), nil + } + return nil, fmt.Errorf("did not get expected MA type") +} + +// Sign a provide request +func (pr *ProvideRequest) Sign(key crypto.PrivKey) error { + if pr.IsSigned() { + return errors.New("already Signed") + } + pr.Timestamp = time.Now().Unix() + pr.Signature = []byte{} + + if key == nil { + return errors.New("no key provided") + } + + sid, err := peer.IDFromPrivateKey(key) + if err != nil { + return err + } + if sid != pr.Provider.Peer.ID { + return errors.New("not the correct signing key") + } + + ma, _ := multiaddr.NewMultiaddr("/") + opts := []bindnode.Option{ + bindnode.TypedBytesConverter(&ma, bytesToMA, maToBytes), + } + + node := bindnode.Wrap(pr, provideSchema.TypeByName("ProvideRequest"), opts...) + nodeRepr := node.Representation() + outBuf := bytes.NewBuffer(nil) + if err = dagjson.Encode(nodeRepr, outBuf); err != nil { + return err + } + hash := sha256.New().Sum(outBuf.Bytes()) + sig, err := key.Sign(hash) + if err != nil { + return err + } + pr.Signature = sig + return nil +} + +func (pr *ProvideRequest) Verify() error { + if !pr.IsSigned() { + return errors.New("not signed") + } + sig := pr.Signature + pr.Signature = []byte{} + defer func() { + pr.Signature = sig + }() + + ma, _ := multiaddr.NewMultiaddr("/") + opts := []bindnode.Option{ + bindnode.TypedBytesConverter(&ma, bytesToMA, maToBytes), + } + + node := bindnode.Wrap(pr, provideSchema.TypeByName("ProvideRequest"), opts...) + nodeRepr := node.Representation() + outBuf := bytes.NewBuffer(nil) + if err := dagjson.Encode(nodeRepr, outBuf); err != nil { + return err + } + hash := sha256.New().Sum(outBuf.Bytes()) + + pk, err := pr.Peer.ID.ExtractPublicKey() + if err != nil { + return err + } + + ok, err := pk.Verify(hash, sig) + if err != nil { + return err + } + if !ok { + return errors.New("signature failed to verify") + } + + return nil +} + +// IsSigned indicates if the ProvideRequest has been signed +func (pr *ProvideRequest) IsSigned() bool { + return pr.Signature != nil +} + +func ParseProvideRequest(req *proto.ProvideRequest) (*ProvideRequest, error) { + prov, err := parseProvider(&req.Provider) + if err != nil { + return nil, err + } + pr := ProvideRequest{ + Key: cid.Cid(req.Key), + Provider: prov, + AdvisoryTTL: time.Duration(req.AdvisoryTTL), + Timestamp: int64(req.Timestamp), + Signature: req.Signature, + } + + if err := pr.Verify(); err != nil { + return nil, err + } + return &pr, nil +} + +func parseProvider(p *proto.Provider) (*Provider, error) { + prov := Provider{ + Peer: parseProtoNodeToAddrInfo(p.ProviderNode)[0], + ProviderProto: make([]TransferProtocol, 0), + } + for _, tp := range p.ProviderProto { + proto, err := parseProtocol(&tp) + if err != nil { + return nil, err + } + prov.ProviderProto = append(prov.ProviderProto, proto) + } + return &prov, nil +} + +type ProvideAsyncResult struct { + AdvisoryTTL time.Duration + Err error +} + +func (fp *Client) Provide(ctx context.Context, key cid.Cid, ttl time.Duration) (time.Duration, error) { + req := ProvideRequest{ + Key: key, + Provider: fp.provider, + AdvisoryTTL: ttl, + Timestamp: time.Now().Unix(), + } + + if fp.identity != nil { + if err := req.Sign(fp.identity); err != nil { + return 0, err + } + } + + record, err := fp.ProvideSignedRecord(ctx, &req) + if err != nil { + return 0, err + } + + var d time.Duration + var set bool + for resp := range record { + if resp.Err == nil { + set = true + if resp.AdvisoryTTL > d { + d = resp.AdvisoryTTL + } + } else if resp.Err != nil { + err = resp.Err + } + } + + if set { + return d, nil + } else if err == nil { + return 0, fmt.Errorf("no response") + } + return 0, err +} + +func (fp *Client) ProvideAsync(ctx context.Context, key cid.Cid, ttl time.Duration) (<-chan time.Duration, error) { + req := ProvideRequest{ + Key: key, + Provider: fp.provider, + AdvisoryTTL: ttl, + Timestamp: time.Now().Unix(), + } + ch := make(chan time.Duration, 1) + + if fp.identity != nil { + if err := req.Sign(fp.identity); err != nil { + close(ch) + return ch, err + } + } + + record, err := fp.ProvideSignedRecord(ctx, &req) + if err != nil { + close(ch) + return ch, err + } + go func() { + defer close(ch) + for resp := range record { + if resp.Err != nil { + logger.Infof("dropping partial provide failure (%v)", err) + } else { + ch <- resp.AdvisoryTTL + } + } + }() + return ch, nil +} + +// ProvideAsync makes a provide request to a delegated router +func (fp *Client) ProvideSignedRecord(ctx context.Context, req *ProvideRequest) (<-chan ProvideAsyncResult, error) { + if !req.IsSigned() { + return nil, errors.New("request is not signed") + } + + var providerProto proto.Provider + if req.Provider != nil { + providerProto = *req.Provider.ToProto() + } + ch0, err := fp.client.Provide_Async(ctx, &proto.ProvideRequest{ + Key: proto.LinkToAny(req.Key), + Provider: providerProto, + Timestamp: values.Int(req.Timestamp), + AdvisoryTTL: values.Int(req.AdvisoryTTL), + Signature: req.Signature, + }) + if err != nil { + return nil, err + } + ch1 := make(chan ProvideAsyncResult, 1) + go func() { + defer close(ch1) + for { + select { + case <-ctx.Done(): + return + case r0, ok := <-ch0: + if !ok { + return + } + + var r1 ProvideAsyncResult + + if r0.Err != nil { + r1.Err = r0.Err + select { + case <-ctx.Done(): + return + case ch1 <- r1: + } + continue + } + + if r0.Resp == nil { + continue + } + + r1.AdvisoryTTL = time.Duration(r0.Resp.AdvisoryTTL) + + select { + case <-ctx.Done(): + return + case ch1 <- r1: + } + } + } + }() + return ch1, nil +} diff --git a/routing/http/gen/proto/proto_edelweiss.go b/routing/http/gen/proto/proto_edelweiss.go index c7d76f84c..06ec943a3 100644 --- a/routing/http/gen/proto/proto_edelweiss.go +++ b/routing/http/gen/proto/proto_edelweiss.go @@ -3,24 +3,23 @@ package proto import ( - pd11 "bytes" - pd6 "context" - pd10 "errors" + pd12 "bytes" + pd8 "context" + pd6 "errors" pd3 "fmt" + pd17 "github.com/ipfs/go-cid" + pd4 "github.com/ipfs/go-log/v2" + pd13 "github.com/ipld/edelweiss/services" + pd2 "github.com/ipld/edelweiss/values" + pd5 "github.com/ipld/go-ipld-prime" + pd9 "github.com/ipld/go-ipld-prime/codec/dagjson" + pd1 "github.com/ipld/go-ipld-prime/datamodel" + pd16 "github.com/ipld/go-ipld-prime/linking/cid" pd7 "io" pd15 "io/ioutil" - pd5 "net/http" - pd4 "net/url" + pd10 "net/http" + pd11 "net/url" pd14 "sync" - - pd16 "github.com/ipfs/go-cid" - pd13 "github.com/ipfs/go-log/v2" - pd12 "github.com/ipld/edelweiss/services" - pd2 "github.com/ipld/edelweiss/values" - pd9 "github.com/ipld/go-ipld-prime" - pd8 "github.com/ipld/go-ipld-prime/codec/dagjson" - pd1 "github.com/ipld/go-ipld-prime/datamodel" - pd17 "github.com/ipld/go-ipld-prime/linking/cid" ) // -- protocol type DelegatedRouting_IdentifyArg -- @@ -627,6 +626,7 @@ type AnonInductive4 struct { FindProviders *FindProvidersRequest GetIPNS *GetIPNSRequest PutIPNS *PutIPNSRequest + Provide *ProvideRequest } func (x *AnonInductive4) Parse(n pd1.Node) error { @@ -672,6 +672,13 @@ func (x *AnonInductive4) Parse(n pd1.Node) error { } x.PutIPNS = &y return nil + case "ProvideRequest": + var y ProvideRequest + if err := y.Parse(vn); err != nil { + return err + } + x.Provide = &y + return nil } @@ -698,6 +705,8 @@ func (x *AnonInductive4_MapIterator) Next() (key pd1.Node, value pd1.Node, err e return pd2.String("GetIPNSRequest"), x.s.GetIPNS.Node(), nil case x.s.PutIPNS != nil: return pd2.String("PutIPNSRequest"), x.s.PutIPNS.Node(), nil + case x.s.Provide != nil: + return pd2.String("ProvideRequest"), x.s.Provide.Node(), nil default: return nil, nil, pd3.Errorf("no inductive cases are set") @@ -727,6 +736,8 @@ func (x AnonInductive4) LookupByString(key string) (pd1.Node, error) { return x.GetIPNS.Node(), nil case x.PutIPNS != nil && key == "PutIPNSRequest": return x.PutIPNS.Node(), nil + case x.Provide != nil && key == "ProvideRequest": + return x.Provide.Node(), nil } return nil, pd2.ErrNA @@ -757,6 +768,8 @@ func (x AnonInductive4) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { return x.GetIPNS.Node(), nil case "PutIPNSRequest": return x.PutIPNS.Node(), nil + case "ProvideRequest": + return x.Provide.Node(), nil } return nil, pd2.ErrNA @@ -817,6 +830,7 @@ type AnonInductive5 struct { FindProviders *FindProvidersResponse GetIPNS *GetIPNSResponse PutIPNS *PutIPNSResponse + Provide *ProvideResponse Error *DelegatedRouting_Error } @@ -863,6 +877,13 @@ func (x *AnonInductive5) Parse(n pd1.Node) error { } x.PutIPNS = &y return nil + case "ProvideResponse": + var y ProvideResponse + if err := y.Parse(vn); err != nil { + return err + } + x.Provide = &y + return nil case "Error": var y DelegatedRouting_Error if err := y.Parse(vn); err != nil { @@ -896,6 +917,8 @@ func (x *AnonInductive5_MapIterator) Next() (key pd1.Node, value pd1.Node, err e return pd2.String("GetIPNSResponse"), x.s.GetIPNS.Node(), nil case x.s.PutIPNS != nil: return pd2.String("PutIPNSResponse"), x.s.PutIPNS.Node(), nil + case x.s.Provide != nil: + return pd2.String("ProvideResponse"), x.s.Provide.Node(), nil case x.s.Error != nil: return pd2.String("Error"), x.s.Error.Node(), nil @@ -927,6 +950,8 @@ func (x AnonInductive5) LookupByString(key string) (pd1.Node, error) { return x.GetIPNS.Node(), nil case x.PutIPNS != nil && key == "PutIPNSResponse": return x.PutIPNS.Node(), nil + case x.Provide != nil && key == "ProvideResponse": + return x.Provide.Node(), nil case x.Error != nil && key == "Error": return x.Error.Node(), nil @@ -959,6 +984,8 @@ func (x AnonInductive5) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { return x.GetIPNS.Node(), nil case "PutIPNSResponse": return x.PutIPNS.Node(), nil + case "ProvideResponse": + return x.Provide.Node(), nil case "Error": return x.Error.Node(), nil @@ -1014,24 +1041,28 @@ func (x AnonInductive5) Prototype() pd1.NodePrototype { return nil } -var logger_client_DelegatedRouting = pd13.Logger("service/client/delegatedrouting") +var logger_client_DelegatedRouting = pd4.Logger("service/client/delegatedrouting") type DelegatedRouting_Client interface { - Identify(ctx pd6.Context, req *DelegatedRouting_IdentifyArg) ([]*DelegatedRouting_IdentifyResult, error) + Identify(ctx pd8.Context, req *DelegatedRouting_IdentifyArg) ([]*DelegatedRouting_IdentifyResult, error) + + FindProviders(ctx pd8.Context, req *FindProvidersRequest) ([]*FindProvidersResponse, error) - FindProviders(ctx pd6.Context, req *FindProvidersRequest) ([]*FindProvidersResponse, error) + GetIPNS(ctx pd8.Context, req *GetIPNSRequest) ([]*GetIPNSResponse, error) - GetIPNS(ctx pd6.Context, req *GetIPNSRequest) ([]*GetIPNSResponse, error) + PutIPNS(ctx pd8.Context, req *PutIPNSRequest) ([]*PutIPNSResponse, error) - PutIPNS(ctx pd6.Context, req *PutIPNSRequest) ([]*PutIPNSResponse, error) + Provide(ctx pd8.Context, req *ProvideRequest) ([]*ProvideResponse, error) - Identify_Async(ctx pd6.Context, req *DelegatedRouting_IdentifyArg) (<-chan DelegatedRouting_Identify_AsyncResult, error) + Identify_Async(ctx pd8.Context, req *DelegatedRouting_IdentifyArg) (<-chan DelegatedRouting_Identify_AsyncResult, error) - FindProviders_Async(ctx pd6.Context, req *FindProvidersRequest) (<-chan DelegatedRouting_FindProviders_AsyncResult, error) + FindProviders_Async(ctx pd8.Context, req *FindProvidersRequest) (<-chan DelegatedRouting_FindProviders_AsyncResult, error) - GetIPNS_Async(ctx pd6.Context, req *GetIPNSRequest) (<-chan DelegatedRouting_GetIPNS_AsyncResult, error) + GetIPNS_Async(ctx pd8.Context, req *GetIPNSRequest) (<-chan DelegatedRouting_GetIPNS_AsyncResult, error) - PutIPNS_Async(ctx pd6.Context, req *PutIPNSRequest) (<-chan DelegatedRouting_PutIPNS_AsyncResult, error) + PutIPNS_Async(ctx pd8.Context, req *PutIPNSRequest) (<-chan DelegatedRouting_PutIPNS_AsyncResult, error) + + Provide_Async(ctx pd8.Context, req *ProvideRequest) (<-chan DelegatedRouting_Provide_AsyncResult, error) } type DelegatedRouting_Identify_AsyncResult struct { @@ -1054,16 +1085,21 @@ type DelegatedRouting_PutIPNS_AsyncResult struct { Err error } +type DelegatedRouting_Provide_AsyncResult struct { + Resp *ProvideResponse + Err error +} + type DelegatedRouting_ClientOption func(*client_DelegatedRouting) error type client_DelegatedRouting struct { - httpClient *pd5.Client - endpoint *pd4.URL + httpClient *pd10.Client + endpoint *pd11.URL ulk pd14.Mutex unsupported map[string]bool // cache of methods not supported by server } -func DelegatedRouting_Client_WithHTTPClient(hc *pd5.Client) DelegatedRouting_ClientOption { +func DelegatedRouting_Client_WithHTTPClient(hc *pd10.Client) DelegatedRouting_ClientOption { return func(c *client_DelegatedRouting) error { c.httpClient = hc return nil @@ -1071,11 +1107,11 @@ func DelegatedRouting_Client_WithHTTPClient(hc *pd5.Client) DelegatedRouting_Cli } func New_DelegatedRouting_Client(endpoint string, opts ...DelegatedRouting_ClientOption) (*client_DelegatedRouting, error) { - u, err := pd4.Parse(endpoint) + u, err := pd11.Parse(endpoint) if err != nil { return nil, err } - c := &client_DelegatedRouting{endpoint: u, httpClient: pd5.DefaultClient, unsupported: make(map[string]bool)} + c := &client_DelegatedRouting{endpoint: u, httpClient: pd10.DefaultClient, unsupported: make(map[string]bool)} for _, o := range opts { if err := o(c); err != nil { return nil, err @@ -1084,8 +1120,8 @@ func New_DelegatedRouting_Client(endpoint string, opts ...DelegatedRouting_Clien return c, nil } -func (c *client_DelegatedRouting) Identify(ctx pd6.Context, req *DelegatedRouting_IdentifyArg) ([]*DelegatedRouting_IdentifyResult, error) { - ctx, cancel := pd6.WithCancel(ctx) +func (c *client_DelegatedRouting) Identify(ctx pd8.Context, req *DelegatedRouting_IdentifyArg) ([]*DelegatedRouting_IdentifyResult, error) { + ctx, cancel := pd8.WithCancel(ctx) defer cancel() ch, err := c.Identify_Async(ctx, req) if err != nil { @@ -1113,27 +1149,27 @@ func (c *client_DelegatedRouting) Identify(ctx pd6.Context, req *DelegatedRoutin } } -func (c *client_DelegatedRouting) Identify_Async(ctx pd6.Context, req *DelegatedRouting_IdentifyArg) (<-chan DelegatedRouting_Identify_AsyncResult, error) { +func (c *client_DelegatedRouting) Identify_Async(ctx pd8.Context, req *DelegatedRouting_IdentifyArg) (<-chan DelegatedRouting_Identify_AsyncResult, error) { // check if we have memoized that this method is not supported by the server c.ulk.Lock() notSupported := c.unsupported["Identify"] c.ulk.Unlock() if notSupported { - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } envelope := &AnonInductive4{ Identify: req, } - buf, err := pd9.Encode(envelope, pd8.Encode) + buf, err := pd5.Encode(envelope, pd9.Encode) if err != nil { return nil, pd3.Errorf("unexpected serialization error (%v)", err) } // encode request in URL u := *c.endpoint - httpReq, err := pd5.NewRequestWithContext(ctx, "POST", u.String(), pd11.NewReader(buf)) + httpReq, err := pd10.NewRequestWithContext(ctx, "POST", u.String(), pd12.NewReader(buf)) if err != nil { return nil, err } @@ -1155,7 +1191,7 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd6.Context, req *Delegated c.ulk.Lock() c.unsupported["Identify"] = true c.ulk.Unlock() - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1163,7 +1199,7 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd6.Context, req *Delegated resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd12.ErrService{Cause: pd3.Errorf("%s", errValues[0])} + err = pd13.ErrService{Cause: pd3.Errorf("%s", errValues[0])} } else { err = pd3.Errorf("service rejected the call, no cause provided") } @@ -1178,10 +1214,10 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd6.Context, req *Delegated return ch, nil } -func process_DelegatedRouting_Identify_AsyncResult(ctx pd6.Context, ch chan<- DelegatedRouting_Identify_AsyncResult, r pd7.ReadCloser) { +func process_DelegatedRouting_Identify_AsyncResult(ctx pd8.Context, ch chan<- DelegatedRouting_Identify_AsyncResult, r pd7.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1189,24 +1225,24 @@ func process_DelegatedRouting_Identify_AsyncResult(ctx pd6.Context, ch chan<- De for { var out DelegatedRouting_Identify_AsyncResult - n, err := pd9.DecodeStreaming(r, opt.Decode) + n, err := pd5.DecodeStreaming(r, opt.Decode) - if pd10.Is(err, pd7.EOF) || pd10.Is(err, pd7.ErrUnexpectedEOF) || pd10.Is(err, pd6.DeadlineExceeded) || pd10.Is(err, pd6.Canceled) { + if pd6.Is(err, pd7.EOF) || pd6.Is(err, pd7.ErrUnexpectedEOF) || pd6.Is(err, pd8.DeadlineExceeded) || pd6.Is(err, pd8.Canceled) { return } if err != nil { - out = DelegatedRouting_Identify_AsyncResult{Err: pd12.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_Identify_AsyncResult{Err: pd12.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_Identify_AsyncResult{Err: pd12.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_Identify_AsyncResult{Err: pd12.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrService{Cause: pd6.New(string(env.Error.Code))}} // service-level error } else if env.Identify != nil { out = DelegatedRouting_Identify_AsyncResult{Resp: env.Identify} } else { @@ -1223,8 +1259,8 @@ func process_DelegatedRouting_Identify_AsyncResult(ctx pd6.Context, ch chan<- De } } -func (c *client_DelegatedRouting) FindProviders(ctx pd6.Context, req *FindProvidersRequest) ([]*FindProvidersResponse, error) { - ctx, cancel := pd6.WithCancel(ctx) +func (c *client_DelegatedRouting) FindProviders(ctx pd8.Context, req *FindProvidersRequest) ([]*FindProvidersResponse, error) { + ctx, cancel := pd8.WithCancel(ctx) defer cancel() ch, err := c.FindProviders_Async(ctx, req) if err != nil { @@ -1252,27 +1288,27 @@ func (c *client_DelegatedRouting) FindProviders(ctx pd6.Context, req *FindProvid } } -func (c *client_DelegatedRouting) FindProviders_Async(ctx pd6.Context, req *FindProvidersRequest) (<-chan DelegatedRouting_FindProviders_AsyncResult, error) { +func (c *client_DelegatedRouting) FindProviders_Async(ctx pd8.Context, req *FindProvidersRequest) (<-chan DelegatedRouting_FindProviders_AsyncResult, error) { // check if we have memoized that this method is not supported by the server c.ulk.Lock() notSupported := c.unsupported["FindProviders"] c.ulk.Unlock() if notSupported { - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } envelope := &AnonInductive4{ FindProviders: req, } - buf, err := pd9.Encode(envelope, pd8.Encode) + buf, err := pd5.Encode(envelope, pd9.Encode) if err != nil { return nil, pd3.Errorf("unexpected serialization error (%v)", err) } // encode request in URL u := *c.endpoint - httpReq, err := pd5.NewRequestWithContext(ctx, "POST", u.String(), pd11.NewReader(buf)) + httpReq, err := pd10.NewRequestWithContext(ctx, "POST", u.String(), pd12.NewReader(buf)) if err != nil { return nil, err } @@ -1294,7 +1330,7 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd6.Context, req *Find c.ulk.Lock() c.unsupported["FindProviders"] = true c.ulk.Unlock() - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1302,7 +1338,7 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd6.Context, req *Find resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd12.ErrService{Cause: pd3.Errorf("%s", errValues[0])} + err = pd13.ErrService{Cause: pd3.Errorf("%s", errValues[0])} } else { err = pd3.Errorf("service rejected the call, no cause provided") } @@ -1317,10 +1353,10 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd6.Context, req *Find return ch, nil } -func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd6.Context, ch chan<- DelegatedRouting_FindProviders_AsyncResult, r pd7.ReadCloser) { +func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd8.Context, ch chan<- DelegatedRouting_FindProviders_AsyncResult, r pd7.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1328,24 +1364,24 @@ func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd6.Context, ch chan for { var out DelegatedRouting_FindProviders_AsyncResult - n, err := pd9.DecodeStreaming(r, opt.Decode) + n, err := pd5.DecodeStreaming(r, opt.Decode) - if pd10.Is(err, pd7.EOF) || pd10.Is(err, pd7.ErrUnexpectedEOF) || pd10.Is(err, pd6.DeadlineExceeded) || pd10.Is(err, pd6.Canceled) { + if pd6.Is(err, pd7.EOF) || pd6.Is(err, pd7.ErrUnexpectedEOF) || pd6.Is(err, pd8.DeadlineExceeded) || pd6.Is(err, pd8.Canceled) { return } if err != nil { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd12.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd12.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd12.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd12.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrService{Cause: pd6.New(string(env.Error.Code))}} // service-level error } else if env.FindProviders != nil { out = DelegatedRouting_FindProviders_AsyncResult{Resp: env.FindProviders} } else { @@ -1362,8 +1398,8 @@ func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd6.Context, ch chan } } -func (c *client_DelegatedRouting) GetIPNS(ctx pd6.Context, req *GetIPNSRequest) ([]*GetIPNSResponse, error) { - ctx, cancel := pd6.WithCancel(ctx) +func (c *client_DelegatedRouting) GetIPNS(ctx pd8.Context, req *GetIPNSRequest) ([]*GetIPNSResponse, error) { + ctx, cancel := pd8.WithCancel(ctx) defer cancel() ch, err := c.GetIPNS_Async(ctx, req) if err != nil { @@ -1391,27 +1427,27 @@ func (c *client_DelegatedRouting) GetIPNS(ctx pd6.Context, req *GetIPNSRequest) } } -func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd6.Context, req *GetIPNSRequest) (<-chan DelegatedRouting_GetIPNS_AsyncResult, error) { +func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd8.Context, req *GetIPNSRequest) (<-chan DelegatedRouting_GetIPNS_AsyncResult, error) { // check if we have memoized that this method is not supported by the server c.ulk.Lock() notSupported := c.unsupported["GetIPNS"] c.ulk.Unlock() if notSupported { - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } envelope := &AnonInductive4{ GetIPNS: req, } - buf, err := pd9.Encode(envelope, pd8.Encode) + buf, err := pd5.Encode(envelope, pd9.Encode) if err != nil { return nil, pd3.Errorf("unexpected serialization error (%v)", err) } // encode request in URL u := *c.endpoint - httpReq, err := pd5.NewRequestWithContext(ctx, "POST", u.String(), pd11.NewReader(buf)) + httpReq, err := pd10.NewRequestWithContext(ctx, "POST", u.String(), pd12.NewReader(buf)) if err != nil { return nil, err } @@ -1433,7 +1469,7 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd6.Context, req *GetIPNSReq c.ulk.Lock() c.unsupported["GetIPNS"] = true c.ulk.Unlock() - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1441,7 +1477,7 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd6.Context, req *GetIPNSReq resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd12.ErrService{Cause: pd3.Errorf("%s", errValues[0])} + err = pd13.ErrService{Cause: pd3.Errorf("%s", errValues[0])} } else { err = pd3.Errorf("service rejected the call, no cause provided") } @@ -1456,10 +1492,10 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd6.Context, req *GetIPNSReq return ch, nil } -func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd6.Context, ch chan<- DelegatedRouting_GetIPNS_AsyncResult, r pd7.ReadCloser) { +func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd8.Context, ch chan<- DelegatedRouting_GetIPNS_AsyncResult, r pd7.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1467,24 +1503,24 @@ func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd6.Context, ch chan<- Del for { var out DelegatedRouting_GetIPNS_AsyncResult - n, err := pd9.DecodeStreaming(r, opt.Decode) + n, err := pd5.DecodeStreaming(r, opt.Decode) - if pd10.Is(err, pd7.EOF) || pd10.Is(err, pd7.ErrUnexpectedEOF) || pd10.Is(err, pd6.DeadlineExceeded) || pd10.Is(err, pd6.Canceled) { + if pd6.Is(err, pd7.EOF) || pd6.Is(err, pd7.ErrUnexpectedEOF) || pd6.Is(err, pd8.DeadlineExceeded) || pd6.Is(err, pd8.Canceled) { return } if err != nil { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd12.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd12.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd12.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd12.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrService{Cause: pd6.New(string(env.Error.Code))}} // service-level error } else if env.GetIPNS != nil { out = DelegatedRouting_GetIPNS_AsyncResult{Resp: env.GetIPNS} } else { @@ -1501,8 +1537,8 @@ func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd6.Context, ch chan<- Del } } -func (c *client_DelegatedRouting) PutIPNS(ctx pd6.Context, req *PutIPNSRequest) ([]*PutIPNSResponse, error) { - ctx, cancel := pd6.WithCancel(ctx) +func (c *client_DelegatedRouting) PutIPNS(ctx pd8.Context, req *PutIPNSRequest) ([]*PutIPNSResponse, error) { + ctx, cancel := pd8.WithCancel(ctx) defer cancel() ch, err := c.PutIPNS_Async(ctx, req) if err != nil { @@ -1530,27 +1566,27 @@ func (c *client_DelegatedRouting) PutIPNS(ctx pd6.Context, req *PutIPNSRequest) } } -func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd6.Context, req *PutIPNSRequest) (<-chan DelegatedRouting_PutIPNS_AsyncResult, error) { +func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd8.Context, req *PutIPNSRequest) (<-chan DelegatedRouting_PutIPNS_AsyncResult, error) { // check if we have memoized that this method is not supported by the server c.ulk.Lock() notSupported := c.unsupported["PutIPNS"] c.ulk.Unlock() if notSupported { - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } envelope := &AnonInductive4{ PutIPNS: req, } - buf, err := pd9.Encode(envelope, pd8.Encode) + buf, err := pd5.Encode(envelope, pd9.Encode) if err != nil { return nil, pd3.Errorf("unexpected serialization error (%v)", err) } // encode request in URL u := *c.endpoint - httpReq, err := pd5.NewRequestWithContext(ctx, "POST", u.String(), pd11.NewReader(buf)) + httpReq, err := pd10.NewRequestWithContext(ctx, "POST", u.String(), pd12.NewReader(buf)) if err != nil { return nil, err } @@ -1572,7 +1608,7 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd6.Context, req *PutIPNSReq c.ulk.Lock() c.unsupported["PutIPNS"] = true c.ulk.Unlock() - return nil, pd12.ErrSchema + return nil, pd13.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1580,7 +1616,7 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd6.Context, req *PutIPNSReq resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd12.ErrService{Cause: pd3.Errorf("%s", errValues[0])} + err = pd13.ErrService{Cause: pd3.Errorf("%s", errValues[0])} } else { err = pd3.Errorf("service rejected the call, no cause provided") } @@ -1595,10 +1631,10 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd6.Context, req *PutIPNSReq return ch, nil } -func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd6.Context, ch chan<- DelegatedRouting_PutIPNS_AsyncResult, r pd7.ReadCloser) { +func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd8.Context, ch chan<- DelegatedRouting_PutIPNS_AsyncResult, r pd7.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1606,24 +1642,24 @@ func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd6.Context, ch chan<- Del for { var out DelegatedRouting_PutIPNS_AsyncResult - n, err := pd9.DecodeStreaming(r, opt.Decode) + n, err := pd5.DecodeStreaming(r, opt.Decode) - if pd10.Is(err, pd7.EOF) || pd10.Is(err, pd7.ErrUnexpectedEOF) || pd10.Is(err, pd6.DeadlineExceeded) || pd10.Is(err, pd6.Canceled) { + if pd6.Is(err, pd7.EOF) || pd6.Is(err, pd7.ErrUnexpectedEOF) || pd6.Is(err, pd8.DeadlineExceeded) || pd6.Is(err, pd8.Canceled) { return } if err != nil { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd12.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd12.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd12.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd12.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrService{Cause: pd6.New(string(env.Error.Code))}} // service-level error } else if env.PutIPNS != nil { out = DelegatedRouting_PutIPNS_AsyncResult{Resp: env.PutIPNS} } else { @@ -1640,16 +1676,156 @@ func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd6.Context, ch chan<- Del } } -var logger_server_DelegatedRouting = pd13.Logger("service/server/delegatedrouting") +func (c *client_DelegatedRouting) Provide(ctx pd8.Context, req *ProvideRequest) ([]*ProvideResponse, error) { + ctx, cancel := pd8.WithCancel(ctx) + defer cancel() + ch, err := c.Provide_Async(ctx, req) + if err != nil { + return nil, err + } + var resps []*ProvideResponse + for { + select { + case r, ok := <-ch: + if !ok { + cancel() + return resps, nil + } else { + if r.Err == nil { + resps = append(resps, r.Resp) + } else { + logger_client_DelegatedRouting.Errorf("client received error response (%v)", r.Err) + cancel() + return resps, r.Err + } + } + case <-ctx.Done(): + return resps, ctx.Err() + } + } +} + +func (c *client_DelegatedRouting) Provide_Async(ctx pd8.Context, req *ProvideRequest) (<-chan DelegatedRouting_Provide_AsyncResult, error) { + // check if we have memoized that this method is not supported by the server + c.ulk.Lock() + notSupported := c.unsupported["Provide"] + c.ulk.Unlock() + if notSupported { + return nil, pd13.ErrSchema + } + + envelope := &AnonInductive4{ + Provide: req, + } + + buf, err := pd5.Encode(envelope, pd9.Encode) + if err != nil { + return nil, pd3.Errorf("unexpected serialization error (%v)", err) + } + + // encode request in URL + u := *c.endpoint + httpReq, err := pd10.NewRequestWithContext(ctx, "POST", u.String(), pd12.NewReader(buf)) + if err != nil { + return nil, err + } + httpReq.Header = map[string][]string{ + "Accept": { + "application/vnd.ipfs.rpc+dag-json; version=1", + }, + } + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, pd3.Errorf("sending HTTP request: %w", err) + } + + // HTTP codes 400 and 404 correspond to unrecognized method or request schema + if resp.StatusCode == 400 || resp.StatusCode == 404 { + resp.Body.Close() + // memoize that this method is not supported by the server + c.ulk.Lock() + c.unsupported["Provide"] = true + c.ulk.Unlock() + return nil, pd13.ErrSchema + } + // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received + // for reasons unrelated to protocol schema + if resp.StatusCode != 200 { + resp.Body.Close() + if resp.Header != nil { + if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { + err = pd13.ErrService{Cause: pd3.Errorf("%s", errValues[0])} + } else { + err = pd3.Errorf("service rejected the call, no cause provided") + } + } else { + err = pd3.Errorf("service rejected the call") + } + return nil, err + } + + ch := make(chan DelegatedRouting_Provide_AsyncResult, 1) + go process_DelegatedRouting_Provide_AsyncResult(ctx, ch, resp.Body) + return ch, nil +} + +func process_DelegatedRouting_Provide_AsyncResult(ctx pd8.Context, ch chan<- DelegatedRouting_Provide_AsyncResult, r pd7.ReadCloser) { + defer close(ch) + defer r.Close() + opt := pd9.DecodeOptions{ + ParseLinks: true, + ParseBytes: true, + DontParseBeyondEnd: true, + } + for { + var out DelegatedRouting_Provide_AsyncResult + + n, err := pd5.DecodeStreaming(r, opt.Decode) + + if pd6.Is(err, pd7.EOF) || pd6.Is(err, pd7.ErrUnexpectedEOF) || pd6.Is(err, pd8.DeadlineExceeded) || pd6.Is(err, pd8.Canceled) { + return + } + + if err != nil { + out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error + } else { + var x [1]byte + if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { + out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrProto{Cause: pd3.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + } else { + env := &AnonInductive5{} + if err = env.Parse(n); err != nil { + out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error + } else if env.Error != nil { + out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrService{Cause: pd6.New(string(env.Error.Code))}} // service-level error + } else if env.Provide != nil { + out = DelegatedRouting_Provide_AsyncResult{Resp: env.Provide} + } else { + continue + } + } + } + + select { + case <-ctx.Done(): + return + case ch <- out: + } + } +} + +var logger_server_DelegatedRouting = pd4.Logger("service/server/delegatedrouting") type DelegatedRouting_Server interface { - FindProviders(ctx pd6.Context, req *FindProvidersRequest) (<-chan *DelegatedRouting_FindProviders_AsyncResult, error) - GetIPNS(ctx pd6.Context, req *GetIPNSRequest) (<-chan *DelegatedRouting_GetIPNS_AsyncResult, error) - PutIPNS(ctx pd6.Context, req *PutIPNSRequest) (<-chan *DelegatedRouting_PutIPNS_AsyncResult, error) + FindProviders(ctx pd8.Context, req *FindProvidersRequest) (<-chan *DelegatedRouting_FindProviders_AsyncResult, error) + GetIPNS(ctx pd8.Context, req *GetIPNSRequest) (<-chan *DelegatedRouting_GetIPNS_AsyncResult, error) + PutIPNS(ctx pd8.Context, req *PutIPNSRequest) (<-chan *DelegatedRouting_PutIPNS_AsyncResult, error) + Provide(ctx pd8.Context, req *ProvideRequest) (<-chan *DelegatedRouting_Provide_AsyncResult, error) } -func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { - return func(writer pd5.ResponseWriter, request *pd5.Request) { +func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd10.HandlerFunc { + return func(writer pd10.ResponseWriter, request *pd10.Request) { // parse request msg, err := pd15.ReadAll(request.Body) if err != nil { @@ -1657,7 +1833,7 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { writer.WriteHeader(400) return } - n, err := pd9.Decode(msg, pd8.Decode) + n, err := pd5.Decode(msg, pd9.Decode) if err != nil { logger_server_DelegatedRouting.Errorf("received request not decodeable (%v)", err) writer.WriteHeader(400) @@ -1687,7 +1863,7 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { } writer.WriteHeader(200) - if f, ok := writer.(pd5.Flusher); ok { + if f, ok := writer.(pd10.Flusher); ok { f.Flush() } @@ -1705,14 +1881,14 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { } else { env = &AnonInductive5{FindProviders: resp.Resp} } - var buf pd11.Buffer - if err = pd9.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + var buf pd12.Buffer + if err = pd5.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) writer.Write(buf.Bytes()) - if f, ok := writer.(pd5.Flusher); ok { + if f, ok := writer.(pd10.Flusher); ok { f.Flush() } } @@ -1728,7 +1904,7 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { } writer.WriteHeader(200) - if f, ok := writer.(pd5.Flusher); ok { + if f, ok := writer.(pd10.Flusher); ok { f.Flush() } @@ -1746,14 +1922,14 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { } else { env = &AnonInductive5{GetIPNS: resp.Resp} } - var buf pd11.Buffer - if err = pd9.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + var buf pd12.Buffer + if err = pd5.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) writer.Write(buf.Bytes()) - if f, ok := writer.(pd5.Flusher); ok { + if f, ok := writer.(pd10.Flusher); ok { f.Flush() } } @@ -1769,7 +1945,7 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { } writer.WriteHeader(200) - if f, ok := writer.(pd5.Flusher); ok { + if f, ok := writer.(pd10.Flusher); ok { f.Flush() } @@ -1787,14 +1963,55 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { } else { env = &AnonInductive5{PutIPNS: resp.Resp} } - var buf pd11.Buffer - if err = pd9.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + var buf pd12.Buffer + if err = pd5.EncodeStreaming(&buf, env, pd9.Encode); err != nil { + logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) + continue + } + buf.WriteByte("\n"[0]) + writer.Write(buf.Bytes()) + if f, ok := writer.(pd10.Flusher); ok { + f.Flush() + } + } + } + + case env.Provide != nil: + ch, err := s.Provide(request.Context(), env.Provide) + if err != nil { + logger_server_DelegatedRouting.Errorf("service rejected request (%v)", err) + writer.Header()["Error"] = []string{err.Error()} + writer.WriteHeader(500) + return + } + + writer.WriteHeader(200) + if f, ok := writer.(pd10.Flusher); ok { + f.Flush() + } + + for { + select { + case <-request.Context().Done(): + return + case resp, ok := <-ch: + if !ok { + return + } + var env *AnonInductive5 + if resp.Err != nil { + env = &AnonInductive5{Error: &DelegatedRouting_Error{Code: pd2.String(resp.Err.Error())}} + } else { + env = &AnonInductive5{Provide: resp.Resp} + } + var buf pd12.Buffer + if err = pd5.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) writer.Write(buf.Bytes()) - if f, ok := writer.(pd5.Flusher); ok { + if f, ok := writer.(pd10.Flusher); ok { f.Flush() } } @@ -1808,11 +2025,12 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd5.HandlerFunc { "FindProviders", "GetIPNS", "PutIPNS", + "Provide", }, }, } - var buf pd11.Buffer - if err = pd9.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + var buf pd12.Buffer + if err = pd5.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode identify response (%v)", err) writer.WriteHeader(500) return @@ -2934,9 +3152,409 @@ func (x PutIPNSResponse) Prototype() pd1.NodePrototype { return nil } +// -- protocol type ProvideRequest -- + +type ProvideRequest struct { + Key LinkToAny + Provider Provider + Timestamp pd2.Int + AdvisoryTTL pd2.Int + Signature pd2.Bytes +} + +func (x ProvideRequest) Node() pd1.Node { + return x +} + +func (x *ProvideRequest) Parse(n pd1.Node) error { + if n.Kind() != pd1.Kind_Map { + return pd2.ErrNA + } + iter := n.MapIterator() + fieldMap := map[string]pd2.ParseFunc{ + "Key": x.Key.Parse, + "Provider": x.Provider.Parse, + "Timestamp": x.Timestamp.Parse, + "AdvisoryTTL": x.AdvisoryTTL.Parse, + "Signature": x.Signature.Parse, + } + for !iter.Done() { + if kn, vn, err := iter.Next(); err != nil { + return err + } else { + if k, err := kn.AsString(); err != nil { + return pd3.Errorf("structure map key is not a string") + } else { + _ = vn + switch k { + case "Key": + if _, notParsed := fieldMap["Key"]; !notParsed { + return pd3.Errorf("field %s already parsed", "Key") + } + if err := x.Key.Parse(vn); err != nil { + return err + } + delete(fieldMap, "Key") + case "Provider": + if _, notParsed := fieldMap["Provider"]; !notParsed { + return pd3.Errorf("field %s already parsed", "Provider") + } + if err := x.Provider.Parse(vn); err != nil { + return err + } + delete(fieldMap, "Provider") + case "Timestamp": + if _, notParsed := fieldMap["Timestamp"]; !notParsed { + return pd3.Errorf("field %s already parsed", "Timestamp") + } + if err := x.Timestamp.Parse(vn); err != nil { + return err + } + delete(fieldMap, "Timestamp") + case "AdvisoryTTL": + if _, notParsed := fieldMap["AdvisoryTTL"]; !notParsed { + return pd3.Errorf("field %s already parsed", "AdvisoryTTL") + } + if err := x.AdvisoryTTL.Parse(vn); err != nil { + return err + } + delete(fieldMap, "AdvisoryTTL") + case "Signature": + if _, notParsed := fieldMap["Signature"]; !notParsed { + return pd3.Errorf("field %s already parsed", "Signature") + } + if err := x.Signature.Parse(vn); err != nil { + return err + } + delete(fieldMap, "Signature") + + } + } + } + } + for _, fieldParse := range fieldMap { + if err := fieldParse(pd1.Null); err != nil { + return err + } + } + return nil +} + +type ProvideRequest_MapIterator struct { + i int64 + s *ProvideRequest +} + +func (x *ProvideRequest_MapIterator) Next() (key pd1.Node, value pd1.Node, err error) { + x.i++ + switch x.i { + case 0: + return pd2.String("Key"), x.s.Key.Node(), nil + case 1: + return pd2.String("Provider"), x.s.Provider.Node(), nil + case 2: + return pd2.String("Timestamp"), x.s.Timestamp.Node(), nil + case 3: + return pd2.String("AdvisoryTTL"), x.s.AdvisoryTTL.Node(), nil + case 4: + return pd2.String("Signature"), x.s.Signature.Node(), nil + + } + return nil, nil, pd2.ErrNA +} + +func (x *ProvideRequest_MapIterator) Done() bool { + return x.i+1 >= 5 +} + +func (x ProvideRequest) Kind() pd1.Kind { + return pd1.Kind_Map +} + +func (x ProvideRequest) LookupByString(key string) (pd1.Node, error) { + switch key { + case "Key": + return x.Key.Node(), nil + case "Provider": + return x.Provider.Node(), nil + case "Timestamp": + return x.Timestamp.Node(), nil + case "AdvisoryTTL": + return x.AdvisoryTTL.Node(), nil + case "Signature": + return x.Signature.Node(), nil + + } + return nil, pd2.ErrNA +} + +func (x ProvideRequest) LookupByNode(key pd1.Node) (pd1.Node, error) { + switch key.Kind() { + case pd1.Kind_String: + if s, err := key.AsString(); err != nil { + return nil, err + } else { + return x.LookupByString(s) + } + case pd1.Kind_Int: + if i, err := key.AsInt(); err != nil { + return nil, err + } else { + return x.LookupByIndex(i) + } + } + return nil, pd2.ErrNA +} + +func (x ProvideRequest) LookupByIndex(idx int64) (pd1.Node, error) { + switch idx { + case 0: + return x.Key.Node(), nil + case 1: + return x.Provider.Node(), nil + case 2: + return x.Timestamp.Node(), nil + case 3: + return x.AdvisoryTTL.Node(), nil + case 4: + return x.Signature.Node(), nil + + } + return nil, pd2.ErrNA +} + +func (x ProvideRequest) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { + switch seg.String() { + case "0", "Key": + return x.Key.Node(), nil + case "1", "Provider": + return x.Provider.Node(), nil + case "2", "Timestamp": + return x.Timestamp.Node(), nil + case "3", "AdvisoryTTL": + return x.AdvisoryTTL.Node(), nil + case "4", "Signature": + return x.Signature.Node(), nil + + } + return nil, pd2.ErrNA +} + +func (x ProvideRequest) MapIterator() pd1.MapIterator { + return &ProvideRequest_MapIterator{-1, &x} +} + +func (x ProvideRequest) ListIterator() pd1.ListIterator { + return nil +} + +func (x ProvideRequest) Length() int64 { + return 5 +} + +func (x ProvideRequest) IsAbsent() bool { + return false +} + +func (x ProvideRequest) IsNull() bool { + return false +} + +func (x ProvideRequest) AsBool() (bool, error) { + return false, pd2.ErrNA +} + +func (x ProvideRequest) AsInt() (int64, error) { + return 0, pd2.ErrNA +} + +func (x ProvideRequest) AsFloat() (float64, error) { + return 0, pd2.ErrNA +} + +func (x ProvideRequest) AsString() (string, error) { + return "", pd2.ErrNA +} + +func (x ProvideRequest) AsBytes() ([]byte, error) { + return nil, pd2.ErrNA +} + +func (x ProvideRequest) AsLink() (pd1.Link, error) { + return nil, pd2.ErrNA +} + +func (x ProvideRequest) Prototype() pd1.NodePrototype { + return nil +} + +// -- protocol type ProvideResponse -- + +type ProvideResponse struct { + AdvisoryTTL pd2.Int +} + +func (x ProvideResponse) Node() pd1.Node { + return x +} + +func (x *ProvideResponse) Parse(n pd1.Node) error { + if n.Kind() != pd1.Kind_Map { + return pd2.ErrNA + } + iter := n.MapIterator() + fieldMap := map[string]pd2.ParseFunc{ + "AdvisoryTTL": x.AdvisoryTTL.Parse, + } + for !iter.Done() { + if kn, vn, err := iter.Next(); err != nil { + return err + } else { + if k, err := kn.AsString(); err != nil { + return pd3.Errorf("structure map key is not a string") + } else { + _ = vn + switch k { + case "AdvisoryTTL": + if _, notParsed := fieldMap["AdvisoryTTL"]; !notParsed { + return pd3.Errorf("field %s already parsed", "AdvisoryTTL") + } + if err := x.AdvisoryTTL.Parse(vn); err != nil { + return err + } + delete(fieldMap, "AdvisoryTTL") + + } + } + } + } + for _, fieldParse := range fieldMap { + if err := fieldParse(pd1.Null); err != nil { + return err + } + } + return nil +} + +type ProvideResponse_MapIterator struct { + i int64 + s *ProvideResponse +} + +func (x *ProvideResponse_MapIterator) Next() (key pd1.Node, value pd1.Node, err error) { + x.i++ + switch x.i { + case 0: + return pd2.String("AdvisoryTTL"), x.s.AdvisoryTTL.Node(), nil + + } + return nil, nil, pd2.ErrNA +} + +func (x *ProvideResponse_MapIterator) Done() bool { + return x.i+1 >= 1 +} + +func (x ProvideResponse) Kind() pd1.Kind { + return pd1.Kind_Map +} + +func (x ProvideResponse) LookupByString(key string) (pd1.Node, error) { + switch key { + case "AdvisoryTTL": + return x.AdvisoryTTL.Node(), nil + + } + return nil, pd2.ErrNA +} + +func (x ProvideResponse) LookupByNode(key pd1.Node) (pd1.Node, error) { + switch key.Kind() { + case pd1.Kind_String: + if s, err := key.AsString(); err != nil { + return nil, err + } else { + return x.LookupByString(s) + } + case pd1.Kind_Int: + if i, err := key.AsInt(); err != nil { + return nil, err + } else { + return x.LookupByIndex(i) + } + } + return nil, pd2.ErrNA +} + +func (x ProvideResponse) LookupByIndex(idx int64) (pd1.Node, error) { + switch idx { + case 0: + return x.AdvisoryTTL.Node(), nil + + } + return nil, pd2.ErrNA +} + +func (x ProvideResponse) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { + switch seg.String() { + case "0", "AdvisoryTTL": + return x.AdvisoryTTL.Node(), nil + + } + return nil, pd2.ErrNA +} + +func (x ProvideResponse) MapIterator() pd1.MapIterator { + return &ProvideResponse_MapIterator{-1, &x} +} + +func (x ProvideResponse) ListIterator() pd1.ListIterator { + return nil +} + +func (x ProvideResponse) Length() int64 { + return 1 +} + +func (x ProvideResponse) IsAbsent() bool { + return false +} + +func (x ProvideResponse) IsNull() bool { + return false +} + +func (x ProvideResponse) AsBool() (bool, error) { + return false, pd2.ErrNA +} + +func (x ProvideResponse) AsInt() (int64, error) { + return 0, pd2.ErrNA +} + +func (x ProvideResponse) AsFloat() (float64, error) { + return 0, pd2.ErrNA +} + +func (x ProvideResponse) AsString() (string, error) { + return "", pd2.ErrNA +} + +func (x ProvideResponse) AsBytes() ([]byte, error) { + return nil, pd2.ErrNA +} + +func (x ProvideResponse) AsLink() (pd1.Link, error) { + return nil, pd2.ErrNA +} + +func (x ProvideResponse) Prototype() pd1.NodePrototype { + return nil +} + // -- protocol type LinkToAny -- -type LinkToAny pd16.Cid +type LinkToAny pd17.Cid func (v *LinkToAny) Parse(n pd1.Node) error { if n.Kind() != pd1.Kind_Link { @@ -2944,7 +3562,7 @@ func (v *LinkToAny) Parse(n pd1.Node) error { } else { ipldLink, _ := n.AsLink() // TODO: Is there a more general way to convert ipld.Link interface into a concrete user object? - cidLink, ok := ipldLink.(pd17.Link) + cidLink, ok := ipldLink.(pd16.Link) if !ok { return pd3.Errorf("only cid links are supported") } else { @@ -3019,7 +3637,7 @@ func (LinkToAny) AsBytes() ([]byte, error) { } func (v LinkToAny) AsLink() (pd1.Link, error) { - return pd17.Link{Cid: pd16.Cid(v)}, nil + return pd16.Link{Cid: pd17.Cid(v)}, nil } func (LinkToAny) Prototype() pd1.NodePrototype { @@ -3499,15 +4117,15 @@ func (x Node) Prototype() pd1.NodePrototype { return nil } -// -- protocol type AnonList18 -- +// -- protocol type AnonList20 -- -type AnonList18 []pd2.Bytes +type AnonList20 []pd2.Bytes -func (v AnonList18) Node() pd1.Node { +func (v AnonList20) Node() pd1.Node { return v } -func (v *AnonList18) Parse(n pd1.Node) error { +func (v *AnonList20) Parse(n pd1.Node) error { if n.Kind() == pd1.Kind_Null { *v = nil return nil @@ -3515,7 +4133,7 @@ func (v *AnonList18) Parse(n pd1.Node) error { if n.Kind() != pd1.Kind_List { return pd2.ErrNA } else { - *v = make(AnonList18, n.Length()) + *v = make(AnonList20, n.Length()) iter := n.ListIterator() for !iter.Done() { if i, n, err := iter.Next(); err != nil { @@ -3528,19 +4146,19 @@ func (v *AnonList18) Parse(n pd1.Node) error { } } -func (AnonList18) Kind() pd1.Kind { +func (AnonList20) Kind() pd1.Kind { return pd1.Kind_List } -func (AnonList18) LookupByString(string) (pd1.Node, error) { +func (AnonList20) LookupByString(string) (pd1.Node, error) { return nil, pd2.ErrNA } -func (AnonList18) LookupByNode(key pd1.Node) (pd1.Node, error) { +func (AnonList20) LookupByNode(key pd1.Node) (pd1.Node, error) { return nil, pd2.ErrNA } -func (v AnonList18) LookupByIndex(i int64) (pd1.Node, error) { +func (v AnonList20) LookupByIndex(i int64) (pd1.Node, error) { if i < 0 || i >= v.Length() { return nil, pd2.ErrBounds } else { @@ -3548,7 +4166,7 @@ func (v AnonList18) LookupByIndex(i int64) (pd1.Node, error) { } } -func (v AnonList18) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { +func (v AnonList20) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { if i, err := seg.Index(); err != nil { return nil, pd2.ErrNA } else { @@ -3556,60 +4174,60 @@ func (v AnonList18) LookupBySegment(seg pd1.PathSegment) (pd1.Node, error) { } } -func (AnonList18) MapIterator() pd1.MapIterator { +func (AnonList20) MapIterator() pd1.MapIterator { return nil } -func (v AnonList18) ListIterator() pd1.ListIterator { - return &AnonList18_ListIterator{v, 0} +func (v AnonList20) ListIterator() pd1.ListIterator { + return &AnonList20_ListIterator{v, 0} } -func (v AnonList18) Length() int64 { +func (v AnonList20) Length() int64 { return int64(len(v)) } -func (AnonList18) IsAbsent() bool { +func (AnonList20) IsAbsent() bool { return false } -func (AnonList18) IsNull() bool { +func (AnonList20) IsNull() bool { return false } -func (v AnonList18) AsBool() (bool, error) { +func (v AnonList20) AsBool() (bool, error) { return false, pd2.ErrNA } -func (AnonList18) AsInt() (int64, error) { +func (AnonList20) AsInt() (int64, error) { return 0, pd2.ErrNA } -func (AnonList18) AsFloat() (float64, error) { +func (AnonList20) AsFloat() (float64, error) { return 0, pd2.ErrNA } -func (AnonList18) AsString() (string, error) { +func (AnonList20) AsString() (string, error) { return "", pd2.ErrNA } -func (AnonList18) AsBytes() ([]byte, error) { +func (AnonList20) AsBytes() ([]byte, error) { return nil, pd2.ErrNA } -func (AnonList18) AsLink() (pd1.Link, error) { +func (AnonList20) AsLink() (pd1.Link, error) { return nil, pd2.ErrNA } -func (AnonList18) Prototype() pd1.NodePrototype { +func (AnonList20) Prototype() pd1.NodePrototype { return nil // not needed } -type AnonList18_ListIterator struct { - list AnonList18 +type AnonList20_ListIterator struct { + list AnonList20 at int64 } -func (iter *AnonList18_ListIterator) Next() (int64, pd1.Node, error) { +func (iter *AnonList20_ListIterator) Next() (int64, pd1.Node, error) { if iter.Done() { return -1, nil, pd2.ErrBounds } @@ -3619,7 +4237,7 @@ func (iter *AnonList18_ListIterator) Next() (int64, pd1.Node, error) { return i, v.Node(), nil } -func (iter *AnonList18_ListIterator) Done() bool { +func (iter *AnonList20_ListIterator) Done() bool { return iter.at >= iter.list.Length() } @@ -3627,7 +4245,7 @@ func (iter *AnonList18_ListIterator) Done() bool { type Peer struct { ID pd2.Bytes - Multiaddresses AnonList18 + Multiaddresses AnonList20 } func (x Peer) Node() pd1.Node { diff --git a/routing/http/gen/routing.go b/routing/http/gen/routing.go index 8ea585e2f..41f24bda5 100644 --- a/routing/http/gen/routing.go +++ b/routing/http/gen/routing.go @@ -38,6 +38,13 @@ var proto = defs.Defs{ Return: defs.Ref{Name: "PutIPNSResponse"}, }, }, + defs.Method{ + Name: "Provide", + Type: defs.Fn{ + Arg: defs.Ref{Name: "ProvideRequest"}, + Return: defs.Ref{Name: "ProvideResponse"}, + }, + }, }, }, }, @@ -110,6 +117,30 @@ var proto = defs.Defs{ Type: defs.Structure{}, }, + // ProvideRequest type + defs.Named{ + Name: "ProvideRequest", + Type: defs.Structure{ + Fields: defs.Fields{ + defs.Field{Name: "Key", GoName: "Key", Type: defs.Ref{Name: "LinkToAny"}}, + defs.Field{Name: "Provider", GoName: "Provider", Type: defs.Ref{Name: "Provider"}}, + defs.Field{Name: "Timestamp", GoName: "Timestamp", Type: defs.Int{}}, + defs.Field{Name: "AdvisoryTTL", GoName: "AdvisoryTTL", Type: defs.Int{}}, + defs.Field{Name: "Signature", GoName: "Signature", Type: defs.Bytes{}}, + }, + }, + }, + + // ProvideResponse type + defs.Named{ + Name: "ProvideResponse", + Type: defs.Structure{ + Fields: defs.Fields{ + defs.Field{Name: "AdvisoryTTL", GoName: "AdvisoryTTL", Type: defs.Int{}}, + }, + }, + }, + // general routing types defs.Named{ Name: "LinkToAny", diff --git a/routing/http/server/findproviders.go b/routing/http/server/findproviders.go index 1418c2452..8437fb9e4 100644 --- a/routing/http/server/findproviders.go +++ b/routing/http/server/findproviders.go @@ -18,6 +18,7 @@ type DelegatedRoutingService interface { FindProviders(ctx context.Context, key cid.Cid) (<-chan client.FindProvidersAsyncResult, error) GetIPNS(ctx context.Context, id []byte) (<-chan client.GetIPNSAsyncResult, error) PutIPNS(ctx context.Context, id []byte, record []byte) (<-chan client.PutIPNSAsyncResult, error) + Provide(ctx context.Context, req *client.ProvideRequest) (<-chan client.ProvideAsyncResult, error) } func DelegatedRoutingAsyncHandler(svc DelegatedRoutingService) http.HandlerFunc { @@ -145,6 +146,48 @@ func (drs *delegatedRoutingServer) FindProviders(ctx context.Context, req *proto return rch, nil } +func (drs *delegatedRoutingServer) Provide(ctx context.Context, req *proto.ProvideRequest) (<-chan *proto.DelegatedRouting_Provide_AsyncResult, error) { + rch := make(chan *proto.DelegatedRouting_Provide_AsyncResult) + go func() { + defer close(rch) + pr, err := client.ParseProvideRequest(req) + if err != nil { + logger.Errorf("Provide function rejected request (%w)", err) + return + } + ch, err := drs.service.Provide(ctx, pr) + if err != nil { + logger.Errorf("Provide function rejected request (%w)", err) + return + } + + for { + select { + case <-ctx.Done(): + return + case resp, ok := <-ch: + if !ok { + return + } + var protoResp *proto.DelegatedRouting_Provide_AsyncResult + if resp.Err != nil { + logger.Infof("find providers function returned error (%w)", resp.Err) + protoResp = &proto.DelegatedRouting_Provide_AsyncResult{Err: resp.Err} + } else { + protoResp = &proto.DelegatedRouting_Provide_AsyncResult{Resp: &proto.ProvideResponse{AdvisoryTTL: values.Int(resp.AdvisoryTTL)}} + } + + select { + case <-ctx.Done(): + return + case rch <- protoResp: + } + } + } + }() + return rch, nil +} + func parseCidsFromFindProvidersRequest(req *proto.FindProvidersRequest) []cid.Cid { return []cid.Cid{cid.Cid(req.Key)} } diff --git a/routing/http/test/clientserver_test.go b/routing/http/test/clientserver_test.go index 51d762793..ae0e099cd 100644 --- a/routing/http/test/clientserver_test.go +++ b/routing/http/test/clientserver_test.go @@ -22,7 +22,7 @@ import ( "github.com/multiformats/go-multihash" ) -func createClientAndServer(t *testing.T, service server.DelegatedRoutingService) (*client.Client, *httptest.Server) { +func createClientAndServer(t *testing.T, service server.DelegatedRoutingService, p *client.Provider, identity crypto.PrivKey) (*client.Client, *httptest.Server) { // start a server s := httptest.NewServer(server.DelegatedRoutingAsyncHandler(service)) @@ -31,7 +31,10 @@ func createClientAndServer(t *testing.T, service server.DelegatedRoutingService) if err != nil { t.Fatal(err) } - c := client.NewClient(q) + c, err := client.NewClient(q, p, identity) + if err != nil { + t.Fatal(err) + } return c, s } @@ -39,7 +42,7 @@ func createClientAndServer(t *testing.T, service server.DelegatedRoutingService) func testClientServer(t *testing.T, numIter int) (avgLatency time.Duration, deltaGo int, deltaMem uint64) { t.Helper() - c, s := createClientAndServer(t, testDelegatedRoutingService{}) + c, s := createClientAndServer(t, testDelegatedRoutingService{}, nil, nil) defer s.Close() // verify result @@ -184,7 +187,7 @@ func (s testStatistic) DeviatesBy(numStddev float64) bool { func TestCancelContext(t *testing.T) { drService := &hangingDelegatedRoutingService{} - c, s := createClientAndServer(t, drService) + c, s := createClientAndServer(t, drService, nil, nil) defer s.Close() ctx, cancel := context.WithCancel(context.Background()) @@ -354,6 +357,15 @@ func (testDelegatedRoutingService) FindProviders(ctx context.Context, key cid.Ci return ch, nil } +func (testDelegatedRoutingService) Provide(ctx context.Context, pr *client.ProvideRequest) (<-chan client.ProvideAsyncResult, error) { + ch := make(chan client.ProvideAsyncResult) + go func() { + ch <- client.ProvideAsyncResult{AdvisoryTTL: time.Hour} + close(ch) + }() + return ch, nil +} + // hangingDelegatedRoutingService hangs on every request until the context is canceled, returning nothing. type hangingDelegatedRoutingService struct { } @@ -384,3 +396,12 @@ func (s *hangingDelegatedRoutingService) FindProviders(ctx context.Context, key }() return ch, nil } + +func (s *hangingDelegatedRoutingService) Provide(ctx context.Context, pr *client.ProvideRequest) (<-chan client.ProvideAsyncResult, error) { + ch := make(chan client.ProvideAsyncResult) + go func() { + <-ctx.Done() + close(ch) + }() + return ch, nil +} diff --git a/routing/http/test/fallbacks_test.go b/routing/http/test/fallbacks_test.go index 43f625e56..a6c267ca9 100644 --- a/routing/http/test/fallbacks_test.go +++ b/routing/http/test/fallbacks_test.go @@ -24,7 +24,10 @@ func TestClientWithServerReturningUnknownValues(t *testing.T) { if err != nil { t.Fatal(err) } - c := client.NewClient(q) + c, err := client.NewClient(q, nil, nil) + if err != nil { + t.Fatal(err) + } // verify no result arrive h, err := multihash.Sum([]byte("TEST"), multihash.SHA3, 4) @@ -76,3 +79,7 @@ func (testServiceWithUnknown) GetIPNS(ctx context.Context, req *proto.GetIPNSReq func (testServiceWithUnknown) PutIPNS(ctx context.Context, req *proto.PutIPNSRequest) (<-chan *proto.DelegatedRouting_PutIPNS_AsyncResult, error) { return nil, fmt.Errorf("PutIPNS not supported by test service") } + +func (testServiceWithUnknown) Provide(ctx context.Context, req *proto.ProvideRequest) (<-chan *proto.DelegatedRouting_Provide_AsyncResult, error) { + return nil, fmt.Errorf("Provide not supported by test service") +} diff --git a/routing/http/test/provide_test.go b/routing/http/test/provide_test.go new file mode 100644 index 000000000..8bcf5f198 --- /dev/null +++ b/routing/http/test/provide_test.go @@ -0,0 +1,54 @@ +package test + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-delegated-routing/client" + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" + multiaddr "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multihash" +) + +func TestProvideRoundtrip(t *testing.T) { + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + t.Fatal(err) + } + pID, err := peer.IDFromPrivateKey(priv) + if err != nil { + t.Fatal(err) + } + + c1, s1 := createClientAndServer(t, testDelegatedRoutingService{}, nil, nil) + defer s1.Close() + + testMH, _ := multihash.Encode([]byte("test"), multihash.IDENTITY) + testCid := cid.NewCidV1(cid.Raw, testMH) + + if _, err = c1.Provide(context.Background(), testCid, time.Hour); err == nil { + t.Fatal("should get sync error on unsigned provide request.") + } + + c, s := createClientAndServer(t, testDelegatedRoutingService{}, &client.Provider{ + Peer: peer.AddrInfo{ + ID: pID, + Addrs: []multiaddr.Multiaddr{}, + }, + ProviderProto: []client.TransferProtocol{}, + }, priv) + defer s.Close() + + rc, err := c.Provide(context.Background(), testCid, 2*time.Hour) + if err != nil { + t.Fatal(err) + } + + if rc != time.Hour { + t.Fatal("should have gotten back the the fixed server ttl") + } +} diff --git a/routing/http/test/servererror_test.go b/routing/http/test/servererror_test.go index c8dc015e0..5062f1012 100644 --- a/routing/http/test/servererror_test.go +++ b/routing/http/test/servererror_test.go @@ -23,7 +23,10 @@ func TestClientWithServerReturningErrors(t *testing.T) { if err != nil { t.Fatal(err) } - c := client.NewClient(q) + c, err := client.NewClient(q, nil, nil) + if err != nil { + t.Fatal(err) + } // verify no result arrive h, err := multihash.Sum([]byte("TEST"), multihash.SHA3, 4) @@ -76,3 +79,7 @@ func (testServiceWithErrors) GetIPNS(ctx context.Context, req *proto.GetIPNSRequ func (testServiceWithErrors) PutIPNS(ctx context.Context, req *proto.PutIPNSRequest) (<-chan *proto.DelegatedRouting_PutIPNS_AsyncResult, error) { return nil, fmt.Errorf(testSyncError) } + +func (testServiceWithErrors) Provide(ctx context.Context, req *proto.ProvideRequest) (<-chan *proto.DelegatedRouting_Provide_AsyncResult, error) { + return nil, fmt.Errorf(testSyncError) +}