Skip to content

Commit

Permalink
Refactor request execution and use IPLD SkipMe functionality for prop…
Browse files Browse the repository at this point in the history
…er partial results on a request (#70)

* refactor(requestmanager): refactor execution of requests

* feat(graphsync): properly use SkipMe from ipld to complete partial selections

* fix(requestmanager): remove unused function
  • Loading branch information
hannahhoward committed Jul 1, 2020
1 parent ea95356 commit a878543
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 91 deletions.
51 changes: 51 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -233,6 +235,55 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
}

func TestGraphsyncRoundTripPartial(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup an IPLD tree and put all but 1 node into the second nodes block store
tree := testutil.NewTestIPLDTree()
td.blockStore2[tree.LeafAlphaLnk] = tree.LeafAlphaBlock.RawData()
td.blockStore2[tree.MiddleMapNodeLnk] = tree.MiddleMapBlock.RawData()
td.blockStore2[tree.MiddleListNodeLnk] = tree.MiddleListBlock.RawData()
td.blockStore2[tree.RootNodeLnk] = tree.RootBlock.RawData()

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
// create a selector to traverse the whole tree
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
allSelector := ssb.ExploreRecursive(selector.RecursionLimitDepth(10),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

_, errChan := requestor.Request(ctx, td.host2.ID(), tree.RootNodeLnk, allSelector)

for err := range errChan {
// verify the error is received for leaf beta node being missing
require.EqualError(t, err, fmt.Sprintf("Remote Peer Is Missing Block: %s", tree.LeafBetaLnk.String()))
}
require.Equal(t, tree.LeafAlphaBlock.RawData(), td.blockStore1[tree.LeafAlphaLnk])
require.Equal(t, tree.MiddleListBlock.RawData(), td.blockStore1[tree.MiddleListNodeLnk])
require.Equal(t, tree.MiddleMapBlock.RawData(), td.blockStore1[tree.MiddleMapNodeLnk])
require.Equal(t, tree.RootBlock.RawData(), td.blockStore1[tree.RootNodeLnk])

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus)
}

func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
8 changes: 0 additions & 8 deletions ipldutil/ipldutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ipldutil
import (
"bytes"
"context"
"errors"

ipld "github.com/ipld/go-ipld-prime"
dagpb "github.com/ipld/go-ipld-prime-proto"
Expand All @@ -15,13 +14,6 @@ import (
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
)

var errDoNotFollow = errors.New("Dont Follow Me")

// ErrDoNotFollow is just a wrapper for whatever IPLD's ErrDoNotFollow ends up looking like
func ErrDoNotFollow() error {
return errDoNotFollow
}

var (
defaultChooser traversal.LinkTargetNodeStyleChooser = dagpb.AddDagPBSupportToChooser(func(ipld.Link, ipld.LinkContext) (ipld.NodeStyle, error) {
return basicnode.Style.Any, nil
Expand Down
84 changes: 84 additions & 0 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package executor

import (
"context"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/loader"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
)

// RequestExecution runs a single graphsync request with data loaded from the
// asynchronous loader
type RequestExecution struct {
Request gsmsg.GraphSyncRequest
SendRequest func(gsmsg.GraphSyncRequest)
Loader loader.AsyncLoadFn
RunBlockHooks func(blk graphsync.BlockData) error
TerminateRequest func()
NodeStyleChooser traversal.LinkTargetNodeStyleChooser
}

// Start begins execution of a request in a go routine
func (re RequestExecution) Start(ctx context.Context) (chan graphsync.ResponseProgress, chan error) {
executor := &requestExecutor{
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
ctx: ctx,
request: re.Request,
sendRequest: re.SendRequest,
loader: re.Loader,
runBlockHooks: re.RunBlockHooks,
terminateRequest: re.TerminateRequest,
nodeStyleChooser: re.NodeStyleChooser,
}
executor.sendRequest(executor.request)
go executor.run()
return executor.inProgressChan, executor.inProgressErr
}

type requestExecutor struct {
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
ctx context.Context
request gsmsg.GraphSyncRequest
sendRequest func(gsmsg.GraphSyncRequest)
loader loader.AsyncLoadFn
runBlockHooks func(blk graphsync.BlockData) error
terminateRequest func()
nodeStyleChooser traversal.LinkTargetNodeStyleChooser
}

func (re *requestExecutor) visitor(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
select {
case <-re.ctx.Done():
case re.inProgressChan <- graphsync.ResponseProgress{
Node: node,
Path: tp.Path,
LastBlock: tp.LastBlock,
}:
}
return nil
}

func (re *requestExecutor) run() {
selector, _ := ipldutil.ParseSelector(re.request.Selector())
loaderFn := loader.WrapAsyncLoader(re.ctx, re.loader, re.request.ID(), re.inProgressErr, re.runBlockHooks)
err := ipldutil.Traverse(re.ctx, loaderFn, re.nodeStyleChooser, cidlink.Link{Cid: re.request.Root()}, selector, re.visitor)
if err != nil {
_, isContextErr := err.(loader.ContextCancelError)
if !isContextErr {
select {
case <-re.ctx.Done():
case re.inProgressErr <- err:
}
}
}
re.terminateRequest()
close(re.inProgressChan)
close(re.inProgressErr)
}
21 changes: 12 additions & 9 deletions requestmanager/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package loader
import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/requestmanager/types"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
)

// ContextCancelError is a sentinel that indicates the passed in request context
// was cancelled
type ContextCancelError struct{}

func (ContextCancelError) Error() string {
return "request context cancelled"
}

// AsyncLoadFn is a function which given a request id and an ipld.Link, returns
// a channel which will eventually return data for the link or an err
type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResult
Expand All @@ -33,22 +40,18 @@ func WrapAsyncLoader(
resultChan := asyncLoadFn(requestID, link)
select {
case <-ctx.Done():
return nil, fmt.Errorf("request finished")
return nil, ContextCancelError{}
case result := <-resultChan:
if result.Err != nil {
select {
case <-ctx.Done():
return nil, fmt.Errorf("request finished")
return nil, ContextCancelError{}
case errorChan <- result.Err:
return nil, ipldutil.ErrDoNotFollow()
return nil, traversal.SkipMe{}
}
}
err := onNewBlockFn(&blockData{link, result.Local, uint64(len(result.Data))})
if err != nil {
select {
case <-ctx.Done():
case errorChan <- err:
}
return nil, err
}
return bytes.NewReader(result.Data), nil
Expand Down
10 changes: 4 additions & 6 deletions requestmanager/loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"time"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
)

type callParams struct {
Expand Down Expand Up @@ -70,7 +70,8 @@ func TestWrappedAsyncLoaderSideChannelsErrors(t *testing.T) {
responseChan <- types.AsyncLoadResult{Data: nil, Err: err}
stream, loadErr := loader(link, ipld.LinkContext{})
require.Nil(t, stream, "should return nil reader")
require.EqualError(t, loadErr, ipldutil.ErrDoNotFollow().Error())
_, isSkipErr := loadErr.(traversal.SkipMe)
require.True(t, isSkipErr)
var returnedErr error
testutil.AssertReceive(ctx, t, errChan, &returnedErr, "should return an error on side channel")
require.EqualError(t, returnedErr, err.Error())
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestWrappedAsyncLoaderContextCancels(t *testing.T) {
require.Error(t, result.error, "should error from sub context cancelling")
}

func TestWrappedAsyncLoaderSideChannelsBlockHookErrors(t *testing.T) {
func TestWrappedAsyncLoaderBlockHookErrors(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand All @@ -131,7 +132,4 @@ func TestWrappedAsyncLoaderSideChannelsBlockHookErrors(t *testing.T) {
stream, err := loader(link, ipld.LinkContext{})
require.Nil(t, stream, "should return nil reader")
require.EqualError(t, err, blockHookErr.Error())
var returnedErr error
testutil.AssertReceive(ctx, t, errChan, &returnedErr, "should return an error on side channel")
require.EqualError(t, returnedErr, blockHookErr.Error())
}
82 changes: 34 additions & 48 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"fmt"
"sync/atomic"

"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync"
ipldutil "github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/loader"
"github.com/ipfs/go-graphsync/requestmanager/types"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/peer"
)

Expand Down Expand Up @@ -263,11 +262,35 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) {
var ipr inProgressRequest
ipr.requestID = rm.nextRequestID
rm.nextRequestID++
request, hooksResult, selector, err := rm.validateRequest(ipr.requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
request, hooksResult, err := rm.validateRequest(ipr.requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
if err != nil {
ipr.incoming, ipr.incomingError = rm.singleErrorResponse(err)
} else {
ipr.incoming, ipr.incomingError = rm.setupRequest(nrm.p, request, hooksResult, selector)
ctx, cancel := context.WithCancel(rm.ctx)
p := nrm.p
requestStatus := &inProgressRequestStatus{
ctx: ctx, cancelFn: cancel, p: p,
}
lastResponse := &requestStatus.lastResponse
lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
rm.inProgressRequestStatuses[request.ID()] = requestStatus
ipr.incoming, ipr.incomingError = executor.RequestExecution{
Request: request,
SendRequest: func(gsRequest gsmsg.GraphSyncRequest) { rm.peerHandler.SendRequest(p, gsRequest) },
Loader: rm.asyncLoader.AsyncLoad,
RunBlockHooks: func(bd graphsync.BlockData) error {
response := lastResponse.Load().(gsmsg.GraphSyncResponse)
return rm.processBlockHooks(p, response, bd)
},
TerminateRequest: func() {
select {
case <-ctx.Done():
case rm.messages <- &terminateRequestMessage{request.ID()}:
}
},
NodeStyleChooser: hooksResult.CustomChooser,
}.Start(ctx)
requestStatus.networkError = ipr.incomingError
}

select {
Expand Down Expand Up @@ -398,61 +421,24 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon
return result.Err
}

func (rm *RequestManager) setupRequest(p peer.ID, request gsmsg.GraphSyncRequest, hooksResult hooks.RequestResult, selector selector.Selector) (chan graphsync.ResponseProgress, chan error) {
networkErrorChan := make(chan error, 1)
ctx, cancel := context.WithCancel(rm.ctx)
requestStatus := &inProgressRequestStatus{
ctx: ctx, cancelFn: cancel, p: p, networkError: networkErrorChan,
}
lastResponse := &requestStatus.lastResponse
rm.inProgressRequestStatuses[request.ID()] = requestStatus
lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
rm.peerHandler.SendRequest(p, request)
inProgressChan := make(chan graphsync.ResponseProgress)
inProgressErr := make(chan error)
go func() {
loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, request.ID(), inProgressErr, func(bd graphsync.BlockData) error {
response := lastResponse.Load().(gsmsg.GraphSyncResponse)
return rm.processBlockHooks(p, response, bd)
})
visitor := visitToChannel(ctx, inProgressChan)
_ = ipldutil.Traverse(ctx, loaderFn, hooksResult.CustomChooser, cidlink.Link{Cid: request.Root()}, selector, visitor)
select {
case networkError := <-networkErrorChan:
select {
case <-rm.ctx.Done():
case inProgressErr <- networkError:
}
default:
}
select {
case <-ctx.Done():
case rm.messages <- &terminateRequestMessage{request.ID()}:
}
close(inProgressChan)
close(inProgressErr)
}()
return inProgressChan, inProgressErr
}

func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, selector.Selector, error) {
func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) {
_, err := ipldutil.EncodeNode(selectorSpec)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
selector, err := ipldutil.ParseSelector(selectorSpec)
_, err = ipldutil.ParseSelector(selectorSpec)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
asCidLink, ok := root.(cidlink.Link)
if !ok {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, fmt.Errorf("request failed: link has no cid")
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid")
}
request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
return request, hooksResult, selector, nil
return request, hooksResult, nil
}
Loading

0 comments on commit a878543

Please sign in to comment.