Skip to content

Commit

Permalink
fix(requestmanager): remove main thread block on allocation (#216)
Browse files Browse the repository at this point in the history
remote allocation from main requestmanager thread -- this could block processing of other operations
like request cancels
  • Loading branch information
hannahhoward committed Sep 17, 2021
1 parent 2687228 commit b0bd16e
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 31 deletions.
14 changes: 12 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type GraphSync struct {
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
allocator *allocator.Allocator
responseAllocator *allocator.Allocator
requestAllocator *allocator.Allocator
}

type graphsyncConfigOptions struct {
Expand Down Expand Up @@ -191,7 +192,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
allocator: responseAllocator,
responseAllocator: responseAllocator,
requestAllocator: requestAllocator,
}

asyncLoader.Startup()
Expand Down Expand Up @@ -334,6 +336,14 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
totalMemoryAllocated := uint64(0)
for _, blk := range incoming.Blocks() {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-gsr.graphSync().responseAllocator.AllocateBlockMemory(sender, totalMemoryAllocated):
case <-gsr.ctx.Done():
}
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

Expand Down
11 changes: 1 addition & 10 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type alternateQueue struct {

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleaseBlockMemory(p peer.ID, amount uint64) error
}

Expand Down Expand Up @@ -113,16 +112,8 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp

// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
totalMemoryAllocated := uint64(0)
for _, blk := range blks {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated):
case <-al.ctx.Done():
}
select {
case <-al.ctx.Done():
case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
Expand Down
14 changes: 7 additions & 7 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
Expand All @@ -73,7 +73,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, nil)
asyncLoader.ProcessResponse(responses, nil)

resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
st.AssertBlockStored(t, block)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, nil)
asyncLoader.ProcessResponse(responses, nil)
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)

assertSuccessResponse(ctx, t, resultChan1)
assertSuccessResponse(ctx, t, resultChan2)
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.CompleteResponsesFor(requestID1)

assertFailResponse(ctx, t, resultChan1)
Expand Down
16 changes: 5 additions & 11 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type PeerHandler interface {
// results as new responses are processed
type AsyncLoader interface {
StartRequest(graphsync.RequestID, string) error
ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block)
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID graphsync.RequestID)
Expand Down Expand Up @@ -278,17 +278,15 @@ type processResponseMessage struct {
p peer.ID
responses []gsmsg.GraphSyncResponse
blks []blocks.Block
response chan error
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
blks []blocks.Block) {
response := make(chan error, 1)
err := rm.sendSyncMessage(&processResponseMessage{p, responses, blks, response}, response, nil)
if err != nil {
log.Warnf("ProcessResponses: %s", err)
select {
case rm.messages <- &processResponseMessage{p, responses, blks}:
case <-rm.ctx.Done():
}
}

Expand Down Expand Up @@ -485,12 +483,8 @@ func (prm *processResponseMessage) handle(rm *RequestManager) {
filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p)
rm.updateLastResponses(filteredResponses)
responseMetadata := metadataForResponses(filteredResponses)
rm.asyncLoader.ProcessResponse(prm.p, responseMetadata, prm.blks)
rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
rm.processTerminations(filteredResponses)
select {
case <-rm.ctx.Done():
case prm.response <- nil:
}
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/testloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str
}

// ProcessResponse just records values passed to verify expectations later
func (fal *FakeAsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
func (fal *FakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
fal.responses <- responses
fal.blks <- blks
Expand Down

0 comments on commit b0bd16e

Please sign in to comment.