Skip to content

Commit

Permalink
feat(graphsync): consume response channel
Browse files Browse the repository at this point in the history
consume response channel so graphsync does not buffer responses in memory
  • Loading branch information
hannahhoward committed Oct 28, 2020
1 parent a6066f8 commit e528b42
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
3 changes: 2 additions & 1 deletion testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ReceivedGraphSyncRequest struct {
Root ipld.Link
Selector ipld.Node
Extensions []graphsync.ExtensionData
ResponseChan chan graphsync.ResponseProgress
ResponseErrChan chan error
}

Expand Down Expand Up @@ -247,8 +248,8 @@ func (fgs *FakeGraphSync) AssertDoesNotHavePersistenceOption(t *testing.T, name
// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (fgs *FakeGraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
errors := make(chan error)
fgs.requests <- ReceivedGraphSyncRequest{ctx, p, root, selector, extensions, errors}
responses := make(chan graphsync.ResponseProgress)
fgs.requests <- ReceivedGraphSyncRequest{ctx, p, root, selector, extensions, responses, errors}
if !fgs.leaveRequestsOpen {
close(responses)
close(errors)
Expand Down
25 changes: 10 additions & 15 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,24 @@ func (t *Transport) OpenChannel(ctx context.Context,
Data: bz}
exts = append(exts, doNotSendExt)
}
_, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...)
responseChan, errChan := t.gs.Request(internalCtx, dataSender, root, stor, exts...)

go t.executeGsRequest(ctx, channelID, errChan)
go t.executeGsRequest(ctx, channelID, responseChan, errChan)
return nil
}

func (t *Transport) consumeResponses(ctx context.Context, errChan <-chan error) error {
func (t *Transport) consumeResponses(responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) error {
var lastError error
for {
select {
case <-ctx.Done():
return errContextCancelled
case err, ok := <-errChan:
if !ok {
return lastError
}
lastError = err
}
for range responseChan {
}
for err := range errChan {
lastError = err
}
return lastError
}

func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer.ChannelID, errChan <-chan error) {
lastError := t.consumeResponses(ctx, errChan)
func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer.ChannelID, responseChan <-chan graphsync.ResponseProgress, errChan <-chan error) {
lastError := t.consumeResponses(responseChan, errChan)

if _, ok := lastError.(graphsync.RequestContextCancelledErr); ok {
log.Warnf("graphsync request context cancelled, channel Id: %v", channelID)
Expand Down
3 changes: 3 additions & 0 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func TestManager(t *testing.T) {
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t)
close(requestReceived.ResponseChan)
close(requestReceived.ResponseErrChan)

require.Eventually(t, func() bool {
Expand All @@ -734,6 +735,7 @@ func TestManager(t *testing.T) {
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t)
close(requestReceived.ResponseChan)
requestReceived.ResponseErrChan <- graphsync.RequestFailedUnknownErr{}
close(requestReceived.ResponseErrChan)

Expand Down Expand Up @@ -789,6 +791,7 @@ func TestManager(t *testing.T) {
},
check: func(t *testing.T, events *fakeEvents, gsData *harness) {
requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t)
close(requestReceived.ResponseChan)
requestReceived.ResponseErrChan <- graphsync.RequestContextCancelledErr{}
close(requestReceived.ResponseErrChan)

Expand Down

0 comments on commit e528b42

Please sign in to comment.