diff --git a/dagsync/httpsync/publisher.go b/dagsync/httpsync/publisher.go index 2347774..4ae91bc 100644 --- a/dagsync/httpsync/publisher.go +++ b/dagsync/httpsync/publisher.go @@ -131,6 +131,27 @@ func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.Lin }, nil } +// NewPublisherHandler returns a Publisher for use as an http.Handler. Doesn't +// listen or know about a url prefix. +func NewPublisherHandler(lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) { + if privKey == nil { + return nil, errors.New("private key required to sign head requests") + } + peerID, err := peer.IDFromPrivateKey(privKey) + if err != nil { + return nil, fmt.Errorf("could not get peer id from private key: %w", err) + } + + return &Publisher{ + addr: nil, + closer: io.NopCloser(nil), + lsys: lsys, + handlerPath: "", + peerID: peerID, + privKey: privKey, + }, nil +} + // Addrs returns the addresses, as []multiaddress, that the Publisher is // listening on. func (p *Publisher) Addrs() []multiaddr.Multiaddr { diff --git a/dagsync/httpsync/publisher_test.go b/dagsync/httpsync/publisher_test.go index e6fb11f..78336e3 100644 --- a/dagsync/httpsync/publisher_test.go +++ b/dagsync/httpsync/publisher_test.go @@ -15,16 +15,23 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/fluent" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/storage/memstore" "github.com/ipld/go-ipld-prime/traversal" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/ipni/go-libipni/announce" "github.com/ipni/go-libipni/announce/message" "github.com/ipni/go-libipni/dagsync/httpsync" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + libp2phttp "github.com/libp2p/go-libp2p/p2p/http" "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) @@ -96,6 +103,149 @@ func TestNewPublisherForListener(t *testing.T) { } } +func TestPublisherWithLibp2pHTTP(t *testing.T) { + ctx := context.Background() + req := require.New(t) + + publisherStore := &correctedMemStore{&memstore.Store{ + Bag: make(map[string][]byte), + }} + publisherLsys := cidlink.DefaultLinkSystem() + publisherLsys.TrustedStorage = true + publisherLsys.SetReadStorage(publisherStore) + publisherLsys.SetWriteStorage(publisherStore) + + privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, rand.Reader) + req.NoError(err) + + publisher, err := httpsync.NewPublisherHandler(publisherLsys, privKey) + req.NoError(err) + + // Use same identity as publisher. This is necessary so that same ID that + // the publisher uses to sign head/ query responses is the same as the ID + // used to identify the publisherStreamHost. Otherwise, it would be + // necessary for the sync client to know both IDs: one for the stream host + // to connect to, and one for the publisher to validate the dignatuse with. + publisherStreamHost, err := libp2p.New(libp2p.Identity(privKey), libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + req.NoError(err) + + // This is the "HTTP Host". It's like the libp2p "stream host" (aka core + // host.Host), but it uses HTTP semantics instead of stream semantics. + // + // You can pass in options on creation like a stream host to do HTTP over + // libp2p streams, and multiaddrs to create listeners on. + publisherHost, err := libp2phttp.New( + libp2phttp.StreamHost(publisherStreamHost), + libp2phttp.ListenAddrs([]multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http")}), + ) + req.NoError(err) + + go publisherHost.Serve() + defer publisherHost.Close() + + protoID := protocol.ID("/ipni-sync/1") + + serverStreamMa := publisherHost.Addrs()[0] + serverHTTPMa := publisherHost.Addrs()[1] + req.Contains(serverHTTPMa.String(), "/http") + + t.Log("libp2p stream server address:", serverStreamMa.String()) + t.Log("libp2p http server address:", serverHTTPMa.String()) + + // Here is where we attach our request handler. Note that we are mounting + // the "/ipni-sync/1" protocol at /ipni/. libp2phttp manages this mapping + // and clients can learn about the mapping at .well-known/libp2p. + // + // In this case we also want out HTTP handler to not even know about the + // prefix, so we use the stdlib http.StripPrefix. + publisherHost.SetHttpHandlerAtPath(protoID, "/ipni/", http.StripPrefix("/ipni/", publisher)) + + link, err := publisherLsys.Store( + ipld.LinkContext{Ctx: ctx}, + cidlink.LinkPrototype{ + Prefix: cid.Prefix{ + Version: 1, + Codec: uint64(multicodec.DagJson), + MhType: uint64(multicodec.Sha2_256), + MhLength: -1, + }, + }, + fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { + na.AssembleEntry("fish").AssignString("lobster") + na.AssembleEntry("fish1").AssignString("lobster1") + na.AssembleEntry("fish2").AssignString("lobster2") + na.AssembleEntry("fish0").AssignString("lobster0") + })) + req.NoError(err) + publisher.SetRoot(link.(cidlink.Link).Cid) + + testCases := []struct { + name string + publisher peer.AddrInfo + newClientHost func(t *testing.T) *libp2phttp.HTTPHost + }{ + { + "HTTP transport", + peer.AddrInfo{Addrs: []multiaddr.Multiaddr{serverHTTPMa}}, + func(t *testing.T) *libp2phttp.HTTPHost { + clientHost, err := libp2phttp.New() + req.NoError(err) + return clientHost + }, + }, + { + "libp2p stream transport", + peer.AddrInfo{ID: publisherStreamHost.ID(), Addrs: []multiaddr.Multiaddr{serverStreamMa}}, + func(t *testing.T) *libp2phttp.HTTPHost { + clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs) + req.NoError(err) + clientHost, err := libp2phttp.New(libp2phttp.StreamHost(clientStreamHost)) + req.NoError(err) + return clientHost + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Plumbing to set up the test. + clientStore := &correctedMemStore{&memstore.Store{ + Bag: make(map[string][]byte), + }} + clientLsys := cidlink.DefaultLinkSystem() + clientLsys.TrustedStorage = true + clientLsys.SetReadStorage(clientStore) + clientLsys.SetWriteStorage(clientStore) + clientSync := httpsync.NewLibp2pSync(clientLsys, tc.newClientHost(t), protoID, nil) + + clientSyncer, err := clientSync.NewSyncer(tc.publisher.ID, tc.publisher.Addrs) + req.NoError(err) + wk := clientSyncer.PeerProtoMap() + if wk != nil { + req.Contains(wk, protoID) + } + + headCid, err := clientSyncer.GetHead(ctx) + req.NoError(err) + + req.Equal(link.(cidlink.Link).Cid, headCid) + + clientSyncer.Sync(ctx, headCid, selectorparse.CommonSelector_MatchPoint) + require.NoError(t, err) + + // Assert that data is loadable from the link system. + wantLink := cidlink.Link{Cid: headCid} + node, err := clientLsys.Load(ipld.LinkContext{Ctx: ctx}, wantLink, basicnode.Prototype.Any) + require.NoError(t, err) + + // Assert synced node link matches the computed link, i.e. is spec-compliant. + gotLink, err := clientLsys.ComputeLink(wantLink.Prototype(), node) + require.NoError(t, err) + require.Equal(t, gotLink, wantLink, "computed %s but got %s", gotLink.String(), wantLink.String()) + }) + } +} + func mapKeys(t *testing.T, n ipld.Node) []string { var keys []string require.Equal(t, n.Kind(), datamodel.Kind_Map) diff --git a/dagsync/httpsync/sync.go b/dagsync/httpsync/sync.go index afae832..31a07fe 100644 --- a/dagsync/httpsync/sync.go +++ b/dagsync/httpsync/sync.go @@ -21,6 +21,8 @@ import ( "github.com/ipni/go-libipni/maurl" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + libp2phttp "github.com/libp2p/go-libp2p/p2p/http" "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) @@ -34,6 +36,10 @@ type Sync struct { blockHook func(peer.ID, cid.Cid) client *http.Client lsys ipld.LinkSystem + + // libp2phttp + clientHost *libp2phttp.HTTPHost + protoID protocol.ID } // NewSync creates a new Sync. @@ -50,19 +56,79 @@ func NewSync(lsys ipld.LinkSystem, client *http.Client, blockHook func(peer.ID, } } +var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer") + +// Syncer provides sync functionality for a single sync with a peer. +type Syncer struct { + client *http.Client + peerID peer.ID + protos libp2phttp.WellKnownProtoMap + rootURL url.URL + urls []*url.URL + sync *Sync +} + +func NewLibp2pSync(lsys ipld.LinkSystem, clientHost *libp2phttp.HTTPHost, protoID protocol.ID, blockHook func(peer.ID, cid.Cid)) *Sync { + return &Sync{ + blockHook: blockHook, + lsys: lsys, + + clientHost: clientHost, + protoID: protoID, + } +} + // NewSyncer creates a new Syncer to use for a single sync operation against a peer. -func (s *Sync) NewSyncer(peerID peer.ID, peerAddrs []multiaddr.Multiaddr) (*Syncer, error) { - urls := make([]*url.URL, len(peerAddrs)) - for i := range peerAddrs { +// +// TODO: Replace arguments with peer.AddrInfo +func (s *Sync) NewSyncer(peerID peer.ID, addrs []multiaddr.Multiaddr) (*Syncer, error) { + peerInfo := peer.AddrInfo{ + ID: peerID, + Addrs: addrs, + } + if s.clientHost != nil { + return s.newLibp2pSyncer(peerInfo) + } + return s.newSyncer(peerInfo) +} + +func (s *Sync) newLibp2pSyncer(peerInfo peer.AddrInfo) (*Syncer, error) { + httpClient, err := s.clientHost.NamespacedClient(s.protoID, peerInfo) + if err != nil { + return nil, err + } + + sr := &Syncer{ + client: &httpClient, + peerID: peerInfo.ID, + rootURL: url.URL{Path: "/"}, + urls: nil, + sync: s, + } + + if peerInfo.ID != "" { + sr.protos, err = s.clientHost.GetAndStorePeerProtoMap(httpClient.Transport, peerInfo.ID) + if err != nil { + return nil, err + } + } + + return sr, nil +} + +func (s *Sync) newSyncer(peerInfo peer.AddrInfo) (*Syncer, error) { + urls := make([]*url.URL, len(peerInfo.Addrs)) + for i, addr := range peerInfo.Addrs { var err error - urls[i], err = maurl.ToURL(peerAddrs[i]) + urls[i], err = maurl.ToURL(addr) if err != nil { return nil, err } } return &Syncer{ - peerID: peerID, + client: s.client, + peerID: peerInfo.ID, rootURL: *urls[0], urls: urls[1:], sync: s, @@ -73,14 +139,8 @@ func (s *Sync) Close() { s.client.CloseIdleConnections() } -var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer") - -// Syncer provides sync functionality for a single sync with a peer. -type Syncer struct { - peerID peer.ID - rootURL url.URL - urls []*url.URL - sync *Sync +func (s *Syncer) PeerProtoMap() libp2phttp.WellKnownProtoMap { + return s.protos } // GetHead fetches the head of the peer's advertisement chain. @@ -102,7 +162,9 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) { return cid.Undef, err } - if peerIDFromSig != s.peerID { + if s.peerID == "" { + log.Warn("cannot verify publisher signature without peer ID") + } else if peerIDFromSig != s.peerID { return cid.Undef, errHeadFromUnexpectedPeer } @@ -136,7 +198,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error } } - s.sync.client.CloseIdleConnections() + s.client.CloseIdleConnections() return nil } @@ -205,7 +267,7 @@ nextURL: return err } - resp, err := s.sync.client.Do(req) + resp, err := s.client.Do(req) if err != nil { if len(s.urls) != 0 { log.Errorw("Fetch request failed, will retry with next address", "err", err) diff --git a/go.mod b/go.mod index 6e3d973..13340d1 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/ipfs/go-ipld-format v0.3.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-ipld-prime v0.20.0 - github.com/libp2p/go-libp2p v0.29.1 + github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e github.com/libp2p/go-libp2p-gostream v0.6.0 github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/libp2p/go-msgio v0.3.0 @@ -113,7 +113,7 @@ require ( github.com/quic-go/qpack v0.4.0 // indirect github.com/quic-go/qtls-go1-19 v0.3.3 // indirect github.com/quic-go/qtls-go1-20 v0.2.3 // indirect - github.com/quic-go/quic-go v0.36.3 // indirect + github.com/quic-go/quic-go v0.36.4 // indirect github.com/quic-go/webtransport-go v0.5.3 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect diff --git a/go.sum b/go.sum index bb74f0b..8157d91 100644 --- a/go.sum +++ b/go.sum @@ -299,8 +299,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= -github.com/libp2p/go-libp2p v0.29.1 h1:yNeg6XgP8gbdc4YSrwiIt5T1TGOrVjH8dzl8h0GIOfQ= -github.com/libp2p/go-libp2p v0.29.1/go.mod h1:20El+LLy3/YhdUYIvGbLnvVJN32nMdqY6KXBENRAfLY= +github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e h1:OGUuDNhPAEt58YoUW5fSSB1XeQB3k5OFPokuLc1KVeA= +github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e/go.mod h1:iNKL7mEnZ9wAss+03IjAwM9ZAQXfVUAPUUmOACQfQ/g= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU= @@ -426,8 +426,8 @@ github.com/quic-go/qtls-go1-19 v0.3.3 h1:wznEHvJwd+2X3PqftRha0SUKmGsnb6dfArMhy9P github.com/quic-go/qtls-go1-19 v0.3.3/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI= github.com/quic-go/qtls-go1-20 v0.2.3 h1:m575dovXn1y2ATOb1XrRFcrv0F+EQmlowTkoraNkDPI= github.com/quic-go/qtls-go1-20 v0.2.3/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM= -github.com/quic-go/quic-go v0.36.3 h1:f+yOqeGhMoRX7/M3wmEw/djhzKWr15FtQysox85/834= -github.com/quic-go/quic-go v0.36.3/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o= +github.com/quic-go/quic-go v0.36.4 h1:CXn/ZLN5Vntlk53fjR+kUMC8Jt7flfQe+I5Ty5A+k0o= +github.com/quic-go/quic-go v0.36.4/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o= github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU= github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=