From cf383d909cd9bb4977c4e76470bc3cf0693610ee Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 15 Sep 2021 17:39:41 -0700 Subject: [PATCH] fix(requestmanager): remove main thread block on allocation remote allocation from main requestmanager thread -- this could block processing of other operations like request cancels --- impl/graphsync.go | 14 ++++++++++++-- requestmanager/asyncloader/asyncloader.go | 11 +---------- requestmanager/asyncloader/asyncloader_test.go | 14 +++++++------- requestmanager/requestmanager.go | 16 +++++----------- requestmanager/testloader/asyncloader.go | 2 +- 5 files changed, 26 insertions(+), 31 deletions(-) diff --git a/impl/graphsync.go b/impl/graphsync.go index 4e7f3994..aee464be 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -58,7 +58,8 @@ type GraphSync struct { persistenceOptions *persistenceoptions.PersistenceOptions ctx context.Context cancel context.CancelFunc - allocator *allocator.Allocator + responseAllocator *allocator.Allocator + requestAllocator *allocator.Allocator } type graphsyncConfigOptions struct { @@ -191,7 +192,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, persistenceOptions: persistenceOptions, ctx: ctx, cancel: cancel, - allocator: responseAllocator, + responseAllocator: responseAllocator, + requestAllocator: requestAllocator, } asyncLoader.Startup() @@ -334,6 +336,14 @@ func (gsr *graphSyncReceiver) ReceiveMessage( sender peer.ID, incoming gsmsg.GraphSyncMessage) { gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests()) + totalMemoryAllocated := uint64(0) + for _, blk := range incoming.Blocks() { + totalMemoryAllocated += uint64(len(blk.RawData())) + } + select { + case <-gsr.graphSync().responseAllocator.AllocateBlockMemory(sender, totalMemoryAllocated): + case <-gsr.ctx.Done(): + } gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks()) } diff --git a/requestmanager/asyncloader/asyncloader.go b/requestmanager/asyncloader/asyncloader.go index 4d0d8931..bcbccba9 100644 --- a/requestmanager/asyncloader/asyncloader.go +++ b/requestmanager/asyncloader/asyncloader.go @@ -31,7 +31,6 @@ type alternateQueue struct { // Allocator indicates a mechanism for tracking memory used by a given peer type Allocator interface { - AllocateBlockMemory(p peer.ID, amount uint64) <-chan error ReleaseBlockMemory(p peer.ID, amount uint64) error } @@ -113,16 +112,8 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp // ProcessResponse injests new responses and completes asynchronous loads as // neccesary -func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, +func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { - totalMemoryAllocated := uint64(0) - for _, blk := range blks { - totalMemoryAllocated += uint64(len(blk.RawData())) - } - select { - case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated): - case <-al.ctx.Done(): - } select { case <-al.ctx.Done(): case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}: diff --git a/requestmanager/asyncloader/asyncloader_test.go b/requestmanager/asyncloader/asyncloader_test.go index b02eb23c..54a76903 100644 --- a/requestmanager/asyncloader/asyncloader_test.go +++ b/requestmanager/asyncloader/asyncloader_test.go @@ -49,7 +49,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) { }, } p := testutil.GeneratePeers(1)[0] - asyncLoader.ProcessResponse(p, responses, blocks) + asyncLoader.ProcessResponse(responses, blocks) resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) @@ -73,7 +73,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) { }, } p := testutil.GeneratePeers(1)[0] - asyncLoader.ProcessResponse(p, responses, nil) + asyncLoader.ProcessResponse(responses, nil) resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertFailResponse(ctx, t, resultChan) @@ -117,7 +117,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(p, responses, blocks) + asyncLoader.ProcessResponse(responses, blocks) assertSuccessResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) st.AssertBlockStored(t, block) @@ -145,7 +145,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(p, responses, nil) + asyncLoader.ProcessResponse(responses, nil) assertFailResponse(ctx, t, resultChan) st.AssertLocalLoads(t, 1) }) @@ -183,7 +183,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) { }, } p := testutil.GeneratePeers(1)[0] - asyncLoader.ProcessResponse(p, responses, blocks) + asyncLoader.ProcessResponse(responses, blocks) resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{}) assertSuccessResponse(ctx, t, resultChan) @@ -283,7 +283,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(p, responses, blocks) + asyncLoader.ProcessResponse(responses, blocks) assertSuccessResponse(ctx, t, resultChan1) assertSuccessResponse(ctx, t, resultChan2) @@ -318,7 +318,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) { }, }, } - asyncLoader.ProcessResponse(p, responses, blocks) + asyncLoader.ProcessResponse(responses, blocks) asyncLoader.CompleteResponsesFor(requestID1) assertFailResponse(ctx, t, resultChan1) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 3cc6bc4c..8153e98e 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -59,7 +59,7 @@ type PeerHandler interface { // results as new responses are processed type AsyncLoader interface { StartRequest(graphsync.RequestID, string) error - ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, + ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult CompleteResponsesFor(requestID graphsync.RequestID) @@ -278,17 +278,15 @@ type processResponseMessage struct { p peer.ID responses []gsmsg.GraphSyncResponse blks []blocks.Block - response chan error } // ProcessResponses ingests the given responses from the network and // and updates the in progress requests based on those responses. func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) { - response := make(chan error, 1) - err := rm.sendSyncMessage(&processResponseMessage{p, responses, blks, response}, response, nil) - if err != nil { - log.Warnf("ProcessResponses: %s", err) + select { + case rm.messages <- &processResponseMessage{p, responses, blks}: + case <-rm.ctx.Done(): } } @@ -485,12 +483,8 @@ func (prm *processResponseMessage) handle(rm *RequestManager) { filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p) rm.updateLastResponses(filteredResponses) responseMetadata := metadataForResponses(filteredResponses) - rm.asyncLoader.ProcessResponse(prm.p, responseMetadata, prm.blks) + rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks) rm.processTerminations(filteredResponses) - select { - case <-rm.ctx.Done(): - case prm.response <- nil: - } } func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse { diff --git a/requestmanager/testloader/asyncloader.go b/requestmanager/testloader/asyncloader.go index 7f07fa3e..557e9d64 100644 --- a/requestmanager/testloader/asyncloader.go +++ b/requestmanager/testloader/asyncloader.go @@ -61,7 +61,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str } // ProcessResponse just records values passed to verify expectations later -func (fal *FakeAsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata, +func (fal *FakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata, blks []blocks.Block) { fal.responses <- responses fal.blks <- blks