Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error message on complete #145

Merged
merged 2 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ const ErrPause = errorType("pause channel")
// use to resume the channel
const ErrResume = errorType("resume channel")

// ErrIncomplete indicates a channel did not finish transferring data successfully
const ErrIncomplete = errorType("incomplete response")

// ErrRejected indicates a request was not accepted
const ErrRejected = errorType("response rejected")

Expand Down
8 changes: 5 additions & 3 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.C
return nil
}

func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) error {
if success {
func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error {
if completeErr == nil {
if chid.Initiator != m.peerID {
msg, err := m.completeMessage(chid)
if err != nil {
Expand Down Expand Up @@ -316,7 +316,9 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
}
// send an error, but only if we haven't already errored for some reason
if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed {
return m.channels.Error(chid, datatransfer.ErrIncomplete)
err := xerrors.Errorf("data transfer channel %s failed to transfer data: %w", chid, completeErr)
log.Warnf(err.Error())
return m.channels.Error(chid, err)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestDataTransferResponding(t *testing.T) {
verify: func(t *testing.T, h *receiverHarness) {
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
require.NoError(t, err)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), true)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), nil)
require.NoError(t, err)
require.Len(t, h.network.SentMessages, 1)
response, ok := h.network.SentMessages[0].Message.(datatransfer.Response)
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestDataTransferResponding(t *testing.T) {
verify: func(t *testing.T, h *receiverHarness) {
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
require.NoError(t, err)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), false)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), xerrors.Errorf("err"))
require.NoError(t, err)
},
},
Expand Down
6 changes: 3 additions & 3 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type EventsHandler interface {
// - err == ErrPause - pause this request (only for new requests)
// - err == ErrResume - resume this request (only for update requests)
OnRequestReceived(chid ChannelID, msg Request) (Response, error)
// OnResponseCompleted is called when we finish sending data for the given channel ID
// Error returns are logged but otherwise have not effect
OnChannelCompleted(chid ChannelID, success bool) error
// OnChannelCompleted is called when we finish transferring data for the given channel ID
// Error returns are logged but otherwise have no effect
OnChannelCompleted(chid ChannelID, err error) error

// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
// Error returns are logged but otherwise have no effect
Expand Down
51 changes: 44 additions & 7 deletions transport/graphsync/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ func (t *Transport) executeGsRequest(ctx context.Context, internalCtx context.Co
}

log.Debugf("finished executing graphsync request for channel %s", channelID)
err := t.events.OnChannelCompleted(channelID, lastError == nil)

var completeErr error
if lastError != nil {
completeErr = xerrors.Errorf("graphsync request failed to complete: %w", lastError)
}
err := t.events.OnChannelCompleted(channelID, completeErr)
if err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -533,13 +538,45 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req
return
}

if status != graphsync.RequestCancelled {
success := status == graphsync.RequestCompletedFull
err := t.events.OnChannelCompleted(chid, success)
if err != nil {
log.Error(err)
}
if status == graphsync.RequestCancelled {
return
}

var completeErr error
if status != graphsync.RequestCompletedFull {
statusStr := gsResponseStatusCodeString(status)
completeErr = xerrors.Errorf("graphsync response to peer %s did not complete: response status code %s", p, statusStr)
}
err := t.events.OnChannelCompleted(chid, completeErr)
if err != nil {
log.Error(err)
}
}

// Remove this map once this PR lands: https://github.com/ipfs/go-graphsync/pull/148
var gsResponseStatusCodes = map[graphsync.ResponseStatusCode]string{
graphsync.RequestAcknowledged: "RequestAcknowledged",
graphsync.AdditionalPeers: "AdditionalPeers",
graphsync.NotEnoughGas: "NotEnoughGas",
graphsync.OtherProtocol: "OtherProtocol",
graphsync.PartialResponse: "PartialResponse",
graphsync.RequestPaused: "RequestPaused",
graphsync.RequestCompletedFull: "RequestCompletedFull",
graphsync.RequestCompletedPartial: "RequestCompletedPartial",
graphsync.RequestRejected: "RequestRejected",
graphsync.RequestFailedBusy: "RequestFailedBusy",
graphsync.RequestFailedUnknown: "RequestFailedUnknown",
graphsync.RequestFailedLegal: "RequestFailedLegal",
graphsync.RequestFailedContentNotFound: "RequestFailedContentNotFound",
graphsync.RequestCancelled: "RequestCancelled",
}

func gsResponseStatusCodeString(code graphsync.ResponseStatusCode) string {
str, ok := gsResponseStatusCodes[code]
if ok {
return str
}
return gsResponseStatusCodes[graphsync.RequestFailedUnknown]
}

func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncKey) {
Expand Down
4 changes: 2 additions & 2 deletions transport/graphsync/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,9 +1041,9 @@ func (fe *fakeEvents) OnResponseReceived(chid datatransfer.ChannelID, response d
return err
}

func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, success bool) error {
func (fe *fakeEvents) OnChannelCompleted(chid datatransfer.ChannelID, completeErr error) error {
fe.OnChannelCompletedCalled = true
fe.ChannelCompletedSuccess = success
fe.ChannelCompletedSuccess = completeErr == nil
return fe.OnChannelCompletedErr
}

Expand Down