Skip to content

Commit

Permalink
feat(graphsync): implement do-no-send-cids extension (#69)
Browse files Browse the repository at this point in the history
Provides full implementation of do-no-send-cids on responder side
  • Loading branch information
hannahhoward committed May 27, 2020
1 parent 430b4dc commit ea95356
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 5 deletions.
48 changes: 48 additions & 0 deletions cidset/cidset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package cidset

import (
"errors"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipld/go-ipld-prime/fluent"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)

// EncodeCidSet encodes a cid set into bytes for the do-no-send-cids extension
func EncodeCidSet(cids *cid.Set) ([]byte, error) {
list := fluent.MustBuildList(basicnode.Style.List, cids.Len(), func(la fluent.ListAssembler) {
_ = cids.ForEach(func(c cid.Cid) error {
la.AssembleValue().AssignLink(cidlink.Link{Cid: c})
return nil
})
})
return ipldutil.EncodeNode(list)
}

// DecodeCidSet decode a cid set from data for the do-no-send-cids extension
func DecodeCidSet(data []byte) (*cid.Set, error) {
list, err := ipldutil.DecodeNode(data)
if err != nil {
return nil, err
}
set := cid.NewSet()
iter := list.ListIterator()
for !iter.Done() {
_, next, err := iter.Next()
if err != nil {
return nil, err
}
link, err := next.AsLink()
if err != nil {
return nil, err
}
asCidLink, ok := link.(cidlink.Link)
if !ok {
return nil, errors.New("contained non CID link")
}
set.Add(asCidLink.Cid)
}
return set, nil
}
27 changes: 27 additions & 0 deletions cidset/cidset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cidset

import (
"testing"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/testutil"
"github.com/stretchr/testify/require"
)

func TestDecodeEncodeCidSet(t *testing.T) {
cids := testutil.GenerateCids(10)
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
encoded, err := EncodeCidSet(set)
require.NoError(t, err, "encode errored")
decodedCidSet, err := DecodeCidSet(encoded)
require.NoError(t, err, "decode errored")
require.Equal(t, decodedCidSet.Len(), set.Len())
err = decodedCidSet.ForEach(func(c cid.Cid) error {
require.True(t, set.Has(c))
return nil
})
require.NoError(t, err)
}
51 changes: 51 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand All @@ -34,6 +35,7 @@ import (

"github.com/ipfs/go-graphsync"

"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
Expand Down Expand Up @@ -231,6 +233,55 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
}

func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)

firstHalf := blockChain.Blocks(0, 50)
set := cid.NewSet()
for _, blk := range firstHalf {
td.blockStore1[cidlink.Link{Cid: blk.Cid()}] = blk.RawData()
set.Add(blk.Cid())
}
encodedCidSet, err := cidset.EncodeCidSet(set)
require.NoError(t, err)
extension := graphsync.ExtensionData{
Name: graphsync.ExtensionDoNotSendCIDs,
Data: encodedCidSet,
}

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

totalSent := 0
totalSentOnWire := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
totalSent++
if blockData.BlockSizeOnWire() > 0 {
totalSentOnWire++
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)

blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

require.Equal(t, blockChainLength, totalSent)
require.Equal(t, blockChainLength-set.Len(), totalSentOnWire)
}

func TestPauseResume(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
9 changes: 9 additions & 0 deletions responsemanager/peerresponsemanager/peerresponsesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type peerResponseSender struct {
// a given peer across multiple requests.
type PeerResponseSender interface {
peermanager.PeerProcess
IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
SendResponse(
requestID graphsync.RequestID,
link ipld.Link,
Expand Down Expand Up @@ -97,6 +98,14 @@ func (prs *peerResponseSender) Startup() {
go prs.run()
}

func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
prs.linkTrackerLk.Lock()
for _, link := range links {
prs.linkTracker.RecordLinkTraversal(requestID, link, true)
}
prs.linkTrackerLk.Unlock()
}

type responseOperation interface {
build(responseBuilder *responsebuilder.ResponseBuilder)
size() uint64
Expand Down
88 changes: 84 additions & 4 deletions responsemanager/peerresponsemanager/peerresponsesender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncR
return fph.done
}

func TestPeerResponseManagerSendsResponses(t *testing.T) {
func TestPeerResponseSenderSendsResponses(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) {
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
}

func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) {
func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) {

p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) {

}

func TestPeerResponseManagerSendsExtensionData(t *testing.T) {
func TestPeerResponseSenderSendsExtensionData(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestPeerResponseManagerSendsExtensionData(t *testing.T) {
require.Equal(t, extensionData2, returnedData2, "did not encode first extension")
}

func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) {
func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -332,7 +332,87 @@ func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) {
})
require.NoError(t, err)
testutil.AssertDoesReceive(ctx, t, sent, "should sent first message")
}

func TestPeerResponseSenderIgnoreBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
done := make(chan struct{}, 1)
sent := make(chan struct{}, 1)
fph := &fakePeerHandler{
done: done,
sent: sent,
}
peerResponseSender := NewResponseSender(ctx, p, fph)
peerResponseSender.Startup()

peerResponseSender.IgnoreBlocks(requestID1, links)

bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")

require.Len(t, fph.lastBlocks, 0)

require.Len(t, fph.lastResponses, 1)
require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())

bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
require.Equal(t, links[1], bd.Link())
require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[2], blks[2].RawData())
require.Equal(t, links[2], bd.Link())
require.Equal(t, uint64(len(blks[2].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
peerResponseSender.FinishRequest(requestID1)

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")

require.Len(t, fph.lastBlocks, 0)

require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
require.NoError(t, err)
require.Equal(t, graphsync.RequestCompletedFull, response1.Status(), "did not send correct response code in second message")
response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")

peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData())
peerResponseSender.FinishRequest(requestID2)

// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}

testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")

require.Equal(t, 1, len(fph.lastBlocks))
testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])

require.Len(t, fph.lastResponses, 1, "did not send correct number of responses")
response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
}

func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
Expand Down
27 changes: 27 additions & 0 deletions responsemanager/responsemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"math"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/responsemanager/hooks"

"github.com/ipfs/go-graphsync"
Expand Down Expand Up @@ -296,6 +298,9 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context,
if validationErr != nil {
return nil, nil, validationErr
}
if err := rm.processDoNoSendCids(request, peerResponseSender); err != nil {
return nil, nil, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Expand All @@ -309,6 +314,28 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context,
return loader, traverser, nil
}

func (rm *ResponseManager) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
peerResponseSender.IgnoreBlocks(request.ID(), links)
return nil
}

func (rm *ResponseManager) executeQuery(
p peer.ID,
request gsmsg.GraphSyncRequest,
Expand Down
Loading

0 comments on commit ea95356

Please sign in to comment.