Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor request execution and use IPLD SkipMe functionality for proper partial results on a request #70

Merged
merged 3 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -233,6 +235,55 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
}

func TestGraphsyncRoundTripPartial(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup an IPLD tree and put all but 1 node into the second nodes block store
tree := testutil.NewTestIPLDTree()
td.blockStore2[tree.LeafAlphaLnk] = tree.LeafAlphaBlock.RawData()
td.blockStore2[tree.MiddleMapNodeLnk] = tree.MiddleMapBlock.RawData()
td.blockStore2[tree.MiddleListNodeLnk] = tree.MiddleListBlock.RawData()
td.blockStore2[tree.RootNodeLnk] = tree.RootBlock.RawData()

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()

finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
}
})
// create a selector to traverse the whole tree
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
allSelector := ssb.ExploreRecursive(selector.RecursionLimitDepth(10),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

_, errChan := requestor.Request(ctx, td.host2.ID(), tree.RootNodeLnk, allSelector)

for err := range errChan {
// verify the error is received for leaf beta node being missing
require.EqualError(t, err, fmt.Sprintf("Remote Peer Is Missing Block: %s", tree.LeafBetaLnk.String()))
}
require.Equal(t, tree.LeafAlphaBlock.RawData(), td.blockStore1[tree.LeafAlphaLnk])
require.Equal(t, tree.MiddleListBlock.RawData(), td.blockStore1[tree.MiddleListNodeLnk])
require.Equal(t, tree.MiddleMapBlock.RawData(), td.blockStore1[tree.MiddleMapNodeLnk])
require.Equal(t, tree.RootBlock.RawData(), td.blockStore1[tree.RootNodeLnk])

// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus)
}

func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
8 changes: 0 additions & 8 deletions ipldutil/ipldutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ipldutil
import (
"bytes"
"context"
"errors"

ipld "github.com/ipld/go-ipld-prime"
dagpb "github.com/ipld/go-ipld-prime-proto"
Expand All @@ -15,13 +14,6 @@ import (
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
)

var errDoNotFollow = errors.New("Dont Follow Me")

// ErrDoNotFollow is just a wrapper for whatever IPLD's ErrDoNotFollow ends up looking like
func ErrDoNotFollow() error {
return errDoNotFollow
}

var (
defaultChooser traversal.LinkTargetNodeStyleChooser = dagpb.AddDagPBSupportToChooser(func(ipld.Link, ipld.LinkContext) (ipld.NodeStyle, error) {
return basicnode.Style.Any, nil
Expand Down
84 changes: 84 additions & 0 deletions requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package executor

import (
"context"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/loader"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
)

// RequestExecution runs a single graphsync request with data loaded from the
// asynchronous loader
type RequestExecution struct {
Request gsmsg.GraphSyncRequest
SendRequest func(gsmsg.GraphSyncRequest)
Loader loader.AsyncLoadFn
RunBlockHooks func(blk graphsync.BlockData) error
TerminateRequest func()
NodeStyleChooser traversal.LinkTargetNodeStyleChooser
}

// Start begins execution of a request in a go routine
func (re RequestExecution) Start(ctx context.Context) (chan graphsync.ResponseProgress, chan error) {
executor := &requestExecutor{
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
ctx: ctx,
request: re.Request,
sendRequest: re.SendRequest,
loader: re.Loader,
runBlockHooks: re.RunBlockHooks,
terminateRequest: re.TerminateRequest,
nodeStyleChooser: re.NodeStyleChooser,
}
executor.sendRequest(executor.request)
go executor.run()
return executor.inProgressChan, executor.inProgressErr
}

type requestExecutor struct {
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
ctx context.Context
request gsmsg.GraphSyncRequest
sendRequest func(gsmsg.GraphSyncRequest)
loader loader.AsyncLoadFn
runBlockHooks func(blk graphsync.BlockData) error
terminateRequest func()
nodeStyleChooser traversal.LinkTargetNodeStyleChooser
}

func (re *requestExecutor) visitor(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
select {
case <-re.ctx.Done():
case re.inProgressChan <- graphsync.ResponseProgress{
Node: node,
Path: tp.Path,
LastBlock: tp.LastBlock,
}:
}
return nil
}

func (re *requestExecutor) run() {
selector, _ := ipldutil.ParseSelector(re.request.Selector())
loaderFn := loader.WrapAsyncLoader(re.ctx, re.loader, re.request.ID(), re.inProgressErr, re.runBlockHooks)
err := ipldutil.Traverse(re.ctx, loaderFn, re.nodeStyleChooser, cidlink.Link{Cid: re.request.Root()}, selector, re.visitor)
if err != nil {
_, isContextErr := err.(loader.ContextCancelError)
if !isContextErr {
select {
case <-re.ctx.Done():
case re.inProgressErr <- err:
}
}
}
re.terminateRequest()
close(re.inProgressChan)
close(re.inProgressErr)
}
21 changes: 12 additions & 9 deletions requestmanager/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package loader
import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/requestmanager/types"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
)

// ContextCancelError is a sentinel that indicates the passed in request context
// was cancelled
type ContextCancelError struct{}

func (ContextCancelError) Error() string {
return "request context cancelled"
}

// AsyncLoadFn is a function which given a request id and an ipld.Link, returns
// a channel which will eventually return data for the link or an err
type AsyncLoadFn func(graphsync.RequestID, ipld.Link) <-chan types.AsyncLoadResult
Expand All @@ -33,22 +40,18 @@ func WrapAsyncLoader(
resultChan := asyncLoadFn(requestID, link)
select {
case <-ctx.Done():
return nil, fmt.Errorf("request finished")
return nil, ContextCancelError{}
case result := <-resultChan:
if result.Err != nil {
select {
case <-ctx.Done():
return nil, fmt.Errorf("request finished")
return nil, ContextCancelError{}
case errorChan <- result.Err:
return nil, ipldutil.ErrDoNotFollow()
return nil, traversal.SkipMe{}
}
}
err := onNewBlockFn(&blockData{link, result.Local, uint64(len(result.Data))})
if err != nil {
select {
case <-ctx.Done():
case errorChan <- err:
}
return nil, err
}
return bytes.NewReader(result.Data), nil
Expand Down
10 changes: 4 additions & 6 deletions requestmanager/loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"time"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/stretchr/testify/require"

"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal"
)

type callParams struct {
Expand Down Expand Up @@ -70,7 +70,8 @@ func TestWrappedAsyncLoaderSideChannelsErrors(t *testing.T) {
responseChan <- types.AsyncLoadResult{Data: nil, Err: err}
stream, loadErr := loader(link, ipld.LinkContext{})
require.Nil(t, stream, "should return nil reader")
require.EqualError(t, loadErr, ipldutil.ErrDoNotFollow().Error())
_, isSkipErr := loadErr.(traversal.SkipMe)
require.True(t, isSkipErr)
var returnedErr error
testutil.AssertReceive(ctx, t, errChan, &returnedErr, "should return an error on side channel")
require.EqualError(t, returnedErr, err.Error())
Expand Down Expand Up @@ -111,7 +112,7 @@ func TestWrappedAsyncLoaderContextCancels(t *testing.T) {
require.Error(t, result.error, "should error from sub context cancelling")
}

func TestWrappedAsyncLoaderSideChannelsBlockHookErrors(t *testing.T) {
func TestWrappedAsyncLoaderBlockHookErrors(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand All @@ -131,7 +132,4 @@ func TestWrappedAsyncLoaderSideChannelsBlockHookErrors(t *testing.T) {
stream, err := loader(link, ipld.LinkContext{})
require.Nil(t, stream, "should return nil reader")
require.EqualError(t, err, blockHookErr.Error())
var returnedErr error
testutil.AssertReceive(ctx, t, errChan, &returnedErr, "should return an error on side channel")
require.EqualError(t, returnedErr, blockHookErr.Error())
}
82 changes: 34 additions & 48 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import (
"fmt"
"sync/atomic"

"github.com/ipfs/go-graphsync/requestmanager/executor"
"github.com/ipfs/go-graphsync/requestmanager/hooks"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync"
ipldutil "github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/loader"
"github.com/ipfs/go-graphsync/requestmanager/types"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/peer"
)

Expand Down Expand Up @@ -263,11 +262,35 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) {
var ipr inProgressRequest
ipr.requestID = rm.nextRequestID
rm.nextRequestID++
request, hooksResult, selector, err := rm.validateRequest(ipr.requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
request, hooksResult, err := rm.validateRequest(ipr.requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
if err != nil {
ipr.incoming, ipr.incomingError = rm.singleErrorResponse(err)
} else {
ipr.incoming, ipr.incomingError = rm.setupRequest(nrm.p, request, hooksResult, selector)
ctx, cancel := context.WithCancel(rm.ctx)
p := nrm.p
requestStatus := &inProgressRequestStatus{
ctx: ctx, cancelFn: cancel, p: p,
}
lastResponse := &requestStatus.lastResponse
lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
rm.inProgressRequestStatuses[request.ID()] = requestStatus
ipr.incoming, ipr.incomingError = executor.RequestExecution{
Request: request,
SendRequest: func(gsRequest gsmsg.GraphSyncRequest) { rm.peerHandler.SendRequest(p, gsRequest) },
Loader: rm.asyncLoader.AsyncLoad,
RunBlockHooks: func(bd graphsync.BlockData) error {
response := lastResponse.Load().(gsmsg.GraphSyncResponse)
return rm.processBlockHooks(p, response, bd)
},
TerminateRequest: func() {
select {
case <-ctx.Done():
case rm.messages <- &terminateRequestMessage{request.ID()}:
}
},
NodeStyleChooser: hooksResult.CustomChooser,
}.Start(ctx)
requestStatus.networkError = ipr.incomingError
}

select {
Expand Down Expand Up @@ -398,61 +421,24 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon
return result.Err
}

func (rm *RequestManager) setupRequest(p peer.ID, request gsmsg.GraphSyncRequest, hooksResult hooks.RequestResult, selector selector.Selector) (chan graphsync.ResponseProgress, chan error) {
networkErrorChan := make(chan error, 1)
ctx, cancel := context.WithCancel(rm.ctx)
requestStatus := &inProgressRequestStatus{
ctx: ctx, cancelFn: cancel, p: p, networkError: networkErrorChan,
}
lastResponse := &requestStatus.lastResponse
rm.inProgressRequestStatuses[request.ID()] = requestStatus
lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
rm.peerHandler.SendRequest(p, request)
inProgressChan := make(chan graphsync.ResponseProgress)
inProgressErr := make(chan error)
go func() {
loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, request.ID(), inProgressErr, func(bd graphsync.BlockData) error {
response := lastResponse.Load().(gsmsg.GraphSyncResponse)
return rm.processBlockHooks(p, response, bd)
})
visitor := visitToChannel(ctx, inProgressChan)
_ = ipldutil.Traverse(ctx, loaderFn, hooksResult.CustomChooser, cidlink.Link{Cid: request.Root()}, selector, visitor)
select {
case networkError := <-networkErrorChan:
select {
case <-rm.ctx.Done():
case inProgressErr <- networkError:
}
default:
}
select {
case <-ctx.Done():
case rm.messages <- &terminateRequestMessage{request.ID()}:
}
close(inProgressChan)
close(inProgressErr)
}()
return inProgressChan, inProgressErr
}

func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, selector.Selector, error) {
func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) {
_, err := ipldutil.EncodeNode(selectorSpec)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
selector, err := ipldutil.ParseSelector(selectorSpec)
_, err = ipldutil.ParseSelector(selectorSpec)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Comment on lines -443 to +431

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! my only nit would be that I'd rewire this error check into an inlined version. Otherwise it seems like .ParseSelector has side-effects ( also do you even need to check .EncodeNode above? )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh shoot. just realized what you're saying. will fix in later PR

}
asCidLink, ok := root.(cidlink.Link)
if !ok {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, fmt.Errorf("request failed: link has no cid")
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid")
}
request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
if err != nil {
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
}
return request, hooksResult, selector, nil
return request, hooksResult, nil
}
Loading