diff --git a/cidset/cidset.go b/cidset/cidset.go new file mode 100644 index 00000000..e6efb361 --- /dev/null +++ b/cidset/cidset.go @@ -0,0 +1,48 @@ +package cidset + +import ( + "errors" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync/ipldutil" + "github.com/ipld/go-ipld-prime/fluent" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" +) + +// EncodeCidSet encodes a cid set into bytes for the do-no-send-cids extension +func EncodeCidSet(cids *cid.Set) ([]byte, error) { + list := fluent.MustBuildList(basicnode.Style.List, cids.Len(), func(la fluent.ListAssembler) { + _ = cids.ForEach(func(c cid.Cid) error { + la.AssembleValue().AssignLink(cidlink.Link{Cid: c}) + return nil + }) + }) + return ipldutil.EncodeNode(list) +} + +// DecodeCidSet decode a cid set from data for the do-no-send-cids extension +func DecodeCidSet(data []byte) (*cid.Set, error) { + list, err := ipldutil.DecodeNode(data) + if err != nil { + return nil, err + } + set := cid.NewSet() + iter := list.ListIterator() + for !iter.Done() { + _, next, err := iter.Next() + if err != nil { + return nil, err + } + link, err := next.AsLink() + if err != nil { + return nil, err + } + asCidLink, ok := link.(cidlink.Link) + if !ok { + return nil, errors.New("contained non CID link") + } + set.Add(asCidLink.Cid) + } + return set, nil +} diff --git a/cidset/cidset_test.go b/cidset/cidset_test.go new file mode 100644 index 00000000..17f6e5dd --- /dev/null +++ b/cidset/cidset_test.go @@ -0,0 +1,27 @@ +package cidset + +import ( + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync/testutil" + "github.com/stretchr/testify/require" +) + +func TestDecodeEncodeCidSet(t *testing.T) { + cids := testutil.GenerateCids(10) + set := cid.NewSet() + for _, c := range cids { + set.Add(c) + } + encoded, err := EncodeCidSet(set) + require.NoError(t, err, "encode errored") + decodedCidSet, err := DecodeCidSet(encoded) + require.NoError(t, err, "decode errored") + require.Equal(t, decodedCidSet.Len(), set.Len()) + err = decodedCidSet.ForEach(func(c cid.Cid) error { + require.True(t, set.Has(c)) + return nil + }) + require.NoError(t, err) +} diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 74f68594..539c3dc7 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -20,6 +20,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" dss "github.com/ipfs/go-datastore/sync" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -34,6 +35,7 @@ import ( "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" gsnet "github.com/ipfs/go-graphsync/network" @@ -231,6 +233,55 @@ func TestGraphsyncRoundTrip(t *testing.T) { require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) } +func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { + // create network + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + td := newGsTestData(ctx, t) + + // initialize graphsync on first node to make requests + requestor := td.GraphSyncHost1() + + // setup receiving peer to just record message coming in + blockChainLength := 100 + blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength) + + firstHalf := blockChain.Blocks(0, 50) + set := cid.NewSet() + for _, blk := range firstHalf { + td.blockStore1[cidlink.Link{Cid: blk.Cid()}] = blk.RawData() + set.Add(blk.Cid()) + } + encodedCidSet, err := cidset.EncodeCidSet(set) + require.NoError(t, err) + extension := graphsync.ExtensionData{ + Name: graphsync.ExtensionDoNotSendCIDs, + Data: encodedCidSet, + } + + // initialize graphsync on second node to response to requests + responder := td.GraphSyncHost2() + + totalSent := 0 + totalSentOnWire := 0 + responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) { + totalSent++ + if blockData.BlockSizeOnWire() > 0 { + totalSentOnWire++ + } + }) + + progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension) + + blockChain.VerifyWholeChain(ctx, progressChan) + testutil.VerifyEmptyErrors(ctx, t, errChan) + require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + + require.Equal(t, blockChainLength, totalSent) + require.Equal(t, blockChainLength-set.Len(), totalSentOnWire) +} + func TestPauseResume(t *testing.T) { // create network ctx := context.Background() diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 313e77d5..39a438ee 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -52,6 +52,7 @@ type peerResponseSender struct { // a given peer across multiple requests. type PeerResponseSender interface { peermanager.PeerProcess + IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) SendResponse( requestID graphsync.RequestID, link ipld.Link, @@ -97,6 +98,14 @@ func (prs *peerResponseSender) Startup() { go prs.run() } +func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) { + prs.linkTrackerLk.Lock() + for _, link := range links { + prs.linkTracker.RecordLinkTraversal(requestID, link, true) + } + prs.linkTrackerLk.Unlock() +} + type responseOperation interface { build(responseBuilder *responsebuilder.ResponseBuilder) size() uint64 diff --git a/responsemanager/peerresponsemanager/peerresponsesender_test.go b/responsemanager/peerresponsemanager/peerresponsesender_test.go index c28762d5..179693d1 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender_test.go +++ b/responsemanager/peerresponsemanager/peerresponsesender_test.go @@ -32,7 +32,7 @@ func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncR return fph.done } -func TestPeerResponseManagerSendsResponses(t *testing.T) { +func TestPeerResponseSenderSendsResponses(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -134,7 +134,7 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) { require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status()) } -func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) { +func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) { p := testutil.GeneratePeers(1)[0] requestID1 := graphsync.RequestID(rand.Int31()) @@ -222,7 +222,7 @@ func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) { } -func TestPeerResponseManagerSendsExtensionData(t *testing.T) { +func TestPeerResponseSenderSendsExtensionData(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -285,7 +285,7 @@ func TestPeerResponseManagerSendsExtensionData(t *testing.T) { require.Equal(t, extensionData2, returnedData2, "did not encode first extension") } -func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) { +func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -332,7 +332,87 @@ func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) { }) require.NoError(t, err) testutil.AssertDoesReceive(ctx, t, sent, "should sent first message") +} + +func TestPeerResponseSenderIgnoreBlocks(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + requestID2 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + done := make(chan struct{}, 1) + sent := make(chan struct{}, 1) + fph := &fakePeerHandler{ + done: done, + sent: sent, + } + peerResponseSender := NewResponseSender(ctx, p, fph) + peerResponseSender.Startup() + + peerResponseSender.IgnoreBlocks(requestID1, links) + + bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) + require.Equal(t, links[0], bd.Link()) + require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize()) + require.Equal(t, uint64(0), bd.BlockSizeOnWire()) + testutil.AssertDoesReceive(ctx, t, sent, "did not send first message") + + require.Len(t, fph.lastBlocks, 0) + require.Len(t, fph.lastResponses, 1) + require.Equal(t, requestID1, fph.lastResponses[0].RequestID()) + require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status()) + + bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData()) + require.Equal(t, links[0], bd.Link()) + require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize()) + require.Equal(t, uint64(0), bd.BlockSizeOnWire()) + bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData()) + require.Equal(t, links[1], bd.Link()) + require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize()) + require.Equal(t, uint64(0), bd.BlockSizeOnWire()) + bd = peerResponseSender.SendResponse(requestID1, links[2], blks[2].RawData()) + require.Equal(t, links[2], bd.Link()) + require.Equal(t, uint64(len(blks[2].RawData())), bd.BlockSize()) + require.Equal(t, uint64(0), bd.BlockSizeOnWire()) + peerResponseSender.FinishRequest(requestID1) + + // let peer reponse manager know last message was sent so message sending can continue + done <- struct{}{} + + testutil.AssertDoesReceive(ctx, t, sent, "did not send second message") + + require.Len(t, fph.lastBlocks, 0) + + require.Len(t, fph.lastResponses, 2, "did not send correct number of responses") + response1, err := findResponseForRequestID(fph.lastResponses, requestID1) + require.NoError(t, err) + require.Equal(t, graphsync.RequestCompletedFull, response1.Status(), "did not send correct response code in second message") + response2, err := findResponseForRequestID(fph.lastResponses, requestID2) + require.NoError(t, err) + require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message") + + peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData()) + peerResponseSender.FinishRequest(requestID2) + + // let peer reponse manager know last message was sent so message sending can continue + done <- struct{}{} + + testutil.AssertDoesReceive(ctx, t, sent, "did not send third message") + + require.Equal(t, 1, len(fph.lastBlocks)) + testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3]) + + require.Len(t, fph.lastResponses, 1, "did not send correct number of responses") + response2, err = findResponseForRequestID(fph.lastResponses, requestID2) + require.NoError(t, err) + require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message") } func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) { diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index f9aea845..a7fed2a4 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -6,6 +6,8 @@ import ( "math" "time" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync" @@ -296,6 +298,9 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context, if validationErr != nil { return nil, nil, validationErr } + if err := rm.processDoNoSendCids(request, peerResponseSender); err != nil { + return nil, nil, err + } rootLink := cidlink.Link{Cid: request.Root()} traverser := ipldutil.TraversalBuilder{ Root: rootLink, @@ -309,6 +314,28 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context, return loader, traverser, nil } +func (rm *ResponseManager) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error { + doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) + if !has { + return nil + } + cidSet, err := cidset.DecodeCidSet(doNotSendCidsData) + if err != nil { + peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) + return err + } + links := make([]ipld.Link, 0, cidSet.Len()) + err = cidSet.ForEach(func(c cid.Cid) error { + links = append(links, cidlink.Link{Cid: c}) + return nil + }) + if err != nil { + return err + } + peerResponseSender.IgnoreBlocks(request.ID(), links) + return nil +} + func (rm *ResponseManager) executeQuery( p peer.ID, request gsmsg.GraphSyncRequest, diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 906f2adc..eb366238 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -8,7 +8,9 @@ import ( "testing" "time" + "github.com/ipfs/go-cid" "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/cidset" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager" @@ -105,6 +107,7 @@ type fakePeerResponseSender struct { sentExtensions chan sentExtension lastCompletedRequest chan completedRequest pausedRequests chan pausedRequest + ignoredLinks chan []ipld.Link } func (fprs *fakePeerResponseSender) Startup() {} @@ -115,6 +118,10 @@ type fakeBlkData struct { size uint64 } +func (fprs *fakePeerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) { + fprs.ignoredLinks <- links +} + func (fbd fakeBlkData) Link() ipld.Link { return fbd.link } @@ -453,6 +460,39 @@ func TestValidationAndExtensions(t *testing.T) { require.Equal(t, 5, customChooserCallCount) }) + t.Run("do-not-send-cids extension", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners) + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + set := cid.NewSet() + blks := td.blockChain.Blocks(0, 5) + for _, blk := range blks { + set.Add(blk.Cid()) + } + data, err := cidset.EncodeCidSet(set) + require.NoError(t, err) + requests := []gsmsg.GraphSyncRequest{ + gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), + graphsync.ExtensionData{ + Name: graphsync.ExtensionDoNotSendCIDs, + Data: data, + }), + } + responseManager.ProcessRequests(td.ctx, td.p, requests) + var lastRequest completedRequest + testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request") + require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed") + var lastLinks []ipld.Link + testutil.AssertReceive(td.ctx, t, td.ignoredLinks, &lastLinks, "should send ignored links") + require.Len(t, lastLinks, set.Len()) + for _, link := range lastLinks { + require.True(t, set.Has(link.(cidlink.Link).Cid)) + } + }) t.Run("test block hook processing", func(t *testing.T) { t.Run("can send extension data", func(t *testing.T) { td := newTestData(t) @@ -767,6 +807,7 @@ type testData struct { sentResponses chan sentResponse sentExtensions chan sentExtension pausedRequests chan pausedRequest + ignoredLinks chan []ipld.Link peerManager *fakePeerManager queryQueue *fakeQueryQueue extensionData []byte @@ -801,7 +842,14 @@ func newTestData(t *testing.T) testData { td.sentResponses = make(chan sentResponse, td.blockChainLength*2) td.sentExtensions = make(chan sentExtension, td.blockChainLength*2) td.pausedRequests = make(chan pausedRequest, 1) - fprs := &fakePeerResponseSender{lastCompletedRequest: td.completedRequestChan, sentResponses: td.sentResponses, sentExtensions: td.sentExtensions, pausedRequests: td.pausedRequests} + td.ignoredLinks = make(chan []ipld.Link, 1) + fprs := &fakePeerResponseSender{ + lastCompletedRequest: td.completedRequestChan, + sentResponses: td.sentResponses, + sentExtensions: td.sentExtensions, + pausedRequests: td.pausedRequests, + ignoredLinks: td.ignoredLinks, + } td.peerManager = &fakePeerManager{peerResponseSender: fprs} td.queryQueue = &fakeQueryQueue{}