Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove main thread block on allocation #216

Merged
merged 1 commit into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type GraphSync struct {
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
allocator *allocator.Allocator
responseAllocator *allocator.Allocator
requestAllocator *allocator.Allocator
}

type graphsyncConfigOptions struct {
Expand Down Expand Up @@ -191,7 +192,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
persistenceOptions: persistenceOptions,
ctx: ctx,
cancel: cancel,
allocator: responseAllocator,
responseAllocator: responseAllocator,
requestAllocator: requestAllocator,
}

asyncLoader.Startup()
Expand Down Expand Up @@ -334,6 +336,14 @@ func (gsr *graphSyncReceiver) ReceiveMessage(
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
totalMemoryAllocated := uint64(0)
for _, blk := range incoming.Blocks() {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-gsr.graphSync().responseAllocator.AllocateBlockMemory(sender, totalMemoryAllocated):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all the message receivers use the same allocator, would this still cause all other receivers to be blocked on memory allocation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocator takes a peer as a parameter cause it tracks memory allocations per peer as well as total. There's a total limit and a per peer limit. So if peer X tries to allocate above a certain amount, it will get held up seperately long before the total limit across peers gets hit.

case <-gsr.ctx.Done():
}
gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

Expand Down
11 changes: 1 addition & 10 deletions requestmanager/asyncloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type alternateQueue struct {

// Allocator indicates a mechanism for tracking memory used by a given peer
type Allocator interface {
AllocateBlockMemory(p peer.ID, amount uint64) <-chan error
ReleaseBlockMemory(p peer.ID, amount uint64) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now an Allocator interface only has a ReleaseBlockMemory method. Is Allocator still an appropriate name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just that go pattern of defining the interface at the site of usage. I guess we could rename it? Not sure.

}

Expand Down Expand Up @@ -113,16 +112,8 @@ func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOp

// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
totalMemoryAllocated := uint64(0)
for _, blk := range blks {
totalMemoryAllocated += uint64(len(blk.RawData()))
}
select {
case <-al.allocator.AllocateBlockMemory(p, totalMemoryAllocated):
case <-al.ctx.Done():
}
select {
case <-al.ctx.Done():
case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
Expand Down
14 changes: 7 additions & 7 deletions requestmanager/asyncloader/asyncloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
Expand All @@ -73,7 +73,7 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, nil)
asyncLoader.ProcessResponse(responses, nil)

resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})
assertFailResponse(ctx, t, resultChan)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
assertSuccessResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
st.AssertBlockStored(t, block)
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, nil)
asyncLoader.ProcessResponse(responses, nil)
assertFailResponse(ctx, t, resultChan)
st.AssertLocalLoads(t, 1)
})
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
},
}
p := testutil.GeneratePeers(1)[0]
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
resultChan := asyncLoader.AsyncLoad(p, requestID, link, ipld.LinkContext{})

assertSuccessResponse(ctx, t, resultChan)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestRequestSplittingSameBlockTwoStores(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)

assertSuccessResponse(ctx, t, resultChan1)
assertSuccessResponse(ctx, t, resultChan2)
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestRequestSplittingSameBlockOnlyOneResponse(t *testing.T) {
},
},
}
asyncLoader.ProcessResponse(p, responses, blocks)
asyncLoader.ProcessResponse(responses, blocks)
asyncLoader.CompleteResponsesFor(requestID1)

assertFailResponse(ctx, t, resultChan1)
Expand Down
16 changes: 5 additions & 11 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type PeerHandler interface {
// results as new responses are processed
type AsyncLoader interface {
StartRequest(graphsync.RequestID, string) error
ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block)
AsyncLoad(p peer.ID, requestID graphsync.RequestID, link ipld.Link, linkContext ipld.LinkContext) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID graphsync.RequestID)
Expand Down Expand Up @@ -278,17 +278,15 @@ type processResponseMessage struct {
p peer.ID
responses []gsmsg.GraphSyncResponse
blks []blocks.Block
response chan error
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
blks []blocks.Block) {
response := make(chan error, 1)
err := rm.sendSyncMessage(&processResponseMessage{p, responses, blks, response}, response, nil)
if err != nil {
log.Warnf("ProcessResponses: %s", err)
select {
case rm.messages <- &processResponseMessage{p, responses, blks}:
case <-rm.ctx.Done():
}
}

Expand Down Expand Up @@ -485,12 +483,8 @@ func (prm *processResponseMessage) handle(rm *RequestManager) {
filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p)
rm.updateLastResponses(filteredResponses)
responseMetadata := metadataForResponses(filteredResponses)
rm.asyncLoader.ProcessResponse(prm.p, responseMetadata, prm.blks)
rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
rm.processTerminations(filteredResponses)
select {
case <-rm.ctx.Done():
case prm.response <- nil:
Copy link
Contributor

@gammazero gammazero Sep 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't sending nil still needed? Or was it not needed previously?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prm.response as a field was removed in this PR I think. Waiting on the response channel was the essentially what was used to block on allocation, but that's now just being done by allocating directly in the receive handler.

}
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/testloader/asyncloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (fal *FakeAsyncLoader) StartRequest(requestID graphsync.RequestID, name str
}

// ProcessResponse just records values passed to verify expectations later
func (fal *FakeAsyncLoader) ProcessResponse(p peer.ID, responses map[graphsync.RequestID]metadata.Metadata,
func (fal *FakeAsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
blks []blocks.Block) {
fal.responses <- responses
fal.blks <- blks
Expand Down