Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: MaxLinks for requests #420

Merged
merged 3 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ type IncomingRequestHookActions interface {
TerminateWithError(error)
ValidateRequest()
PauseResponse()
MaxLinks(uint64)
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
Expand All @@ -299,6 +300,7 @@ type OutgoingBlockHookActions interface {
type OutgoingRequestHookActions interface {
UsePersistenceOption(name string)
UseLinkTargetNodePrototypeChooser(traversal.LinkTargetNodePrototypeChooser)
MaxLinks(uint64)
}

// IncomingResponseHookActions are actions that incoming response hook can take
Expand Down
204 changes: 119 additions & 85 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,103 +109,137 @@ func TestRejectRequestsByDefault(t *testing.T) {
tracing.SingleExceptionEvent(t, "response(0)", "github.com/ipfs/go-graphsync/responsemanager.errorString", "request not valid", true)
}

func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {

// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

var linksToTraverse uint64 = 5
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1(MaxLinksPerOutgoingRequests(linksToTraverse))

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
assertCancelOrComplete := assertCancelOrCompleteFunction(responder, 1)
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")

drain(requestor)
drain(responder)
wasCancelled := assertCancelOrComplete(ctx, t)

tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
if wasCancelled {
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
func TestGraphsyncRoundTripRequestBudgets(t *testing.T) {
testCases := []struct {
name string
globalOutgoingBudget uint64
globalIncomingBudget uint64
perOutgoingBudget uint64
perIncomingBudget uint64
expectedLinks uint64
}{
// outgoing
{
name: "global outgoing budget",
globalOutgoingBudget: 5,
expectedLinks: 5,
},
{
name: "per outgoing budget",
perOutgoingBudget: 5,
expectedLinks: 5,
},
{
name: "global outgoing budget less than per outgoing budget",
globalOutgoingBudget: 3,
perOutgoingBudget: 5,
expectedLinks: 3,
},
{
name: "global outgoing budget greater than per outgoing budget",
globalOutgoingBudget: 5,
perOutgoingBudget: 3,
expectedLinks: 3,
},
// incoming
{
name: "global incoming budget",
globalIncomingBudget: 5,
expectedLinks: 5,
},
{
name: "per incoming budget",
perIncomingBudget: 5,
expectedLinks: 5,
},
{
name: "global incoming budget less than per incoming budget",
globalIncomingBudget: 3,
perIncomingBudget: 5,
expectedLinks: 3,
},
{
name: "global incoming budget greater than per incoming budget",
globalIncomingBudget: 5,
perIncomingBudget: 3,
expectedLinks: 3,
},
}
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ErrBudgetExceeded exception recorded in the right place
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true)
if wasCancelled {
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), true)
}
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
// create network
ctx := context.Background()
ctx, collectTracing := testutil.SetupTracing(ctx)
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
td := newGsTestData(ctx, t)
// initialize graphsync on first node to make requests
opts := []Option{}
if tc.globalOutgoingBudget > 0 {
opts = append(opts, MaxLinksPerOutgoingRequests(tc.globalOutgoingBudget))
}
requestor := td.GraphSyncHost1(opts...)

var linksToTraverse uint64 = 5
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()
if tc.perOutgoingBudget > 0 {
requestor.RegisterOutgoingRequestHook(func(_ peer.ID, _ graphsync.RequestData, ha graphsync.OutgoingRequestHookActions) {
ha.MaxLinks(tc.perOutgoingBudget)
})
}

// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)
// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.persistence2, 100, blockChainLength)

// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2(MaxLinksPerIncomingRequests(linksToTraverse))
assertComplete := assertCompletionFunction(responder, 1)
// initialize graphsync on second node to response to requests
opts = []Option{}
if tc.globalIncomingBudget > 0 {
opts = append(opts, MaxLinksPerIncomingRequests(tc.globalIncomingBudget))
}
responder := td.GraphSyncHost2(opts...)
if tc.perIncomingBudget > 0 {
responder.RegisterIncomingRequestHook(func(_ peer.ID, _ graphsync.RequestData, ha graphsync.IncomingRequestHookActions) {
ha.MaxLinks(tc.perIncomingBudget)
})
}

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
assertCancelOrComplete := assertCancelOrCompleteFunction(responder, 1)
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")
// response budgets don't include the root block, so total links traverse with be one more than expected
blockChain.VerifyResponseRange(ctx, progressChan, 0, int(tc.expectedLinks))
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(tc.expectedLinks), "did not store all blocks")

drain(requestor)
drain(responder)
assertComplete(ctx, t)
drain(requestor)
drain(responder)
wasCancelled := assertCancelOrComplete(ctx, t)

tracing := collectTracing(t)
tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block
traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)")
if wasCancelled {
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
}
require.Contains(t, traceStrings, "request(0)->newRequest(0)")
require.Contains(t, traceStrings, "request(0)->executeTask(0)")
require.Contains(t, traceStrings, "request(0)->terminateRequest(0)")
require.Contains(t, traceStrings, "processResponses(0)") // should have one of these per response
require.Contains(t, traceStrings, "request(0)->verifyBlock(0)") // should have one of these per block

// has ContextCancelError exception recorded in the right place
// the requester gets a cancel, the responder gets a ErrBudgetExceeded
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), false)
if tc.globalOutgoingBudget > 0 || tc.perOutgoingBudget > 0 {
// has ErrBudgetExceeded exception recorded in the right place because we caused it locally
tracing.SingleExceptionEvent(t, "request(0)->executeTask(0)", "ErrBudgetExceeded", "traversal budget exceeded", true)
}
if wasCancelled {
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "ContextCancelError", ipldutil.ContextCancelError{}.Error(), true)
}
})
}
}

func TestGraphsyncRoundTrip(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type inProgressRequestStatus struct {
onTerminated []chan<- error
request gsmsg.GraphSyncRequest
doNotSendFirstBlocks int64
maxLinks uint64
nodeStyleChooser traversal.LinkTargetNodePrototypeChooser
inProgressChan chan graphsync.ResponseProgress
inProgressErr chan error
Expand Down
7 changes: 7 additions & 0 deletions requestmanager/hooks/requesthooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (orh *OutgoingRequestHooks) Register(hook graphsync.OnOutgoingRequestHook)
type RequestResult struct {
PersistenceOption string
CustomChooser traversal.LinkTargetNodePrototypeChooser
MaxLinks uint64
}

// ProcessRequestHooks runs request hooks against an outgoing request
Expand All @@ -54,12 +55,14 @@ func (orh *OutgoingRequestHooks) ProcessRequestHooks(p peer.ID, request graphsyn
type requestHookActions struct {
persistenceOption string
nodeBuilderChooser traversal.LinkTargetNodePrototypeChooser
maxLinks uint64
}

func (rha *requestHookActions) result() RequestResult {
return RequestResult{
PersistenceOption: rha.persistenceOption,
CustomChooser: rha.nodeBuilderChooser,
MaxLinks: rha.maxLinks,
}
}

Expand All @@ -70,3 +73,7 @@ func (rha *requestHookActions) UsePersistenceOption(name string) {
func (rha *requestHookActions) UseLinkTargetNodePrototypeChooser(nodeBuilderChooser traversal.LinkTargetNodePrototypeChooser) {
rha.nodeBuilderChooser = nodeBuilderChooser
}

func (rha *requestHookActions) MaxLinks(maxLinks uint64) {
rha.maxLinks = maxLinks
}
10 changes: 8 additions & 2 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (rm *RequestManager) newRequest(requestID graphsync.RequestID, parentSpan t
request: request,
state: graphsync.Queued,
nodeStyleChooser: hooksResult.CustomChooser,
maxLinks: hooksResult.MaxLinks,
inProgressChan: make(chan graphsync.ResponseProgress),
inProgressErr: make(chan error),
lsys: lsys,
Expand All @@ -121,10 +122,15 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re

if ipr.traverser == nil {
var budget *traversal.Budget
if rm.maxLinksPerRequest > 0 {
maxLinks := rm.maxLinksPerRequest
if maxLinks == 0 || (ipr.maxLinks != 0 && ipr.maxLinks < maxLinks) {
// take the lowest nonzero budget (global or per-request)
maxLinks = ipr.maxLinks
}
if maxLinks > 0 {
budget = &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: int64(rm.maxLinksPerRequest),
LinkBudget: int64(maxLinks),
}
}
// the traverser has its own context because we want to fail on block boundaries, in the executor,
Expand Down
1 change: 1 addition & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type inProgressResponseStatus struct {
request gsmsg.GraphSyncRequest
linkSystem ipld.LinkSystem
customChooser traversal.LinkTargetNodePrototypeChooser
maxLinks uint64
traverser ipldutil.Traverser
signals queryexecutor.ResponseSignals
updates []gsmsg.GraphSyncRequest
Expand Down
7 changes: 7 additions & 0 deletions responsemanager/hooks/requesthook.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RequestResult struct {
Err error
Extensions []graphsync.ExtensionData
Ctx context.Context
MaxLinks uint64
}

// ProcessRequestHooks runs request hooks against an incoming request
Expand All @@ -79,6 +80,7 @@ type requestHookActions struct {
chooser traversal.LinkTargetNodePrototypeChooser
extensions []graphsync.ExtensionData
ctx context.Context
maxLinks uint64
}

func (ha *requestHookActions) result() RequestResult {
Expand All @@ -90,6 +92,7 @@ func (ha *requestHookActions) result() RequestResult {
Err: ha.err,
Extensions: ha.extensions,
Ctx: ha.ctx,
MaxLinks: ha.maxLinks,
}
}

Expand All @@ -114,6 +117,10 @@ func (ha *requestHookActions) UsePersistenceOption(name string) {
ha.linkSystem = linkSystem
}

func (ha *requestHookActions) MaxLinks(maxLinks uint64) {
ha.maxLinks = maxLinks
}

func (ha *requestHookActions) UseLinkTargetNodePrototypeChooser(chooser traversal.LinkTargetNodePrototypeChooser) {
ha.chooser = chooser
}
Expand Down
10 changes: 8 additions & 2 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (rm *ResponseManager) newRequest(ctx context.Context, p peer.ID, request gs
request: request,
linkSystem: linkSystem,
customChooser: result.CustomChooser,
maxLinks: result.MaxLinks,
signals: signals,
startTime: time.Now(),
responseStream: responseStream,
Expand Down Expand Up @@ -298,10 +299,15 @@ func (rm *ResponseManager) taskDataForKey(requestID graphsync.RequestID) queryex
rootLink := cidlink.Link{Cid: response.request.Root()}

var budget *traversal.Budget
if rm.maxLinksPerRequest > 0 {
maxLinks := rm.maxLinksPerRequest
if maxLinks == 0 || (response.maxLinks != 0 && response.maxLinks < maxLinks) {
// take the lowest nonzero budget (global or per-request)
maxLinks = response.maxLinks
}
if maxLinks > 0 {
budget = &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: int64(rm.maxLinksPerRequest),
LinkBudget: int64(maxLinks),
}
}
traverser := ipldutil.TraversalBuilder{
Expand Down