diff --git a/errors.go b/errors.go index 38c77f46..cd2e1cf1 100644 --- a/errors.go +++ b/errors.go @@ -24,3 +24,12 @@ const ErrPause = errorType("pause channel") // ErrResume is a special error that the RequestReceived / ResponseReceived hooks can // use to resume the channel const ErrResume = errorType("resume channel") + +// ErrIncomplete indicates a channel did not finish transferring data successfully +const ErrIncomplete = errorType("incomplete response") + +// ErrRejected indicates a request was not accepted +const ErrRejected = errorType("response rejected") + +// ErrUnsupported indicates an operation is not supported by the transport protocol +const ErrUnsupported = errorType("unsupported") diff --git a/impl/events.go b/impl/events.go index f69fe1c5..3642a0b4 100644 --- a/impl/events.go +++ b/impl/events.go @@ -123,7 +123,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat } } if !response.Accepted() { - return m.channels.Error(chid, errors.New("Response Rejected")) + return m.channels.Error(chid, datatransfer.ErrRejected) } if response.IsNew() { err := m.channels.Accept(chid) @@ -170,7 +170,15 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) } return m.channels.FinishTransfer(chid) } - return m.channels.Error(chid, errors.New("incomplete response")) + chst, err := m.channels.GetByID(context.TODO(), chid) + if err != nil { + return err + } + // send an error, but only if we haven't already errored for some reason + if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed { + return m.channels.Error(chid, datatransfer.ErrIncomplete) + } + return nil } func (m *manager) receiveNewRequest( diff --git a/impl/impl.go b/impl/impl.go index 7d684f30..a4e27abc 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -231,7 +231,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe pausable, ok := m.transport.(datatransfer.PauseableTransport) if !ok { - return errors.New("unsupported") + return datatransfer.ErrUnsupported } err := pausable.PauseChannel(ctx, chid) @@ -252,7 +252,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe func (m *manager) ResumeDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error { pausable, ok := m.transport.(datatransfer.PauseableTransport) if !ok { - return errors.New("unsupported") + return datatransfer.ErrUnsupported } err := pausable.ResumeChannel(ctx, m.resumeMessage(chid), chid) diff --git a/impl/integration_test.go b/impl/integration_test.go index 352c9168..fdb0e636 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -890,6 +890,83 @@ func TestPauseAndResume(t *testing.T) { } } +func TestUnrecognizedVoucherRoundTrip(t *testing.T) { + ctx := context.Background() + testCases := map[string]bool{ + "push requests": false, + "pull requests": true, + } + for testCase, isPull := range testCases { + t.Run(testCase, func(t *testing.T) { + // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + // defer cancel() + + gsData := testutil.NewGraphsyncTestingData(ctx, t) + host1 := gsData.Host1 // initiator, data sender + host2 := gsData.Host2 // data recipient + + tp1 := gsData.SetupGSTransportHost1() + tp2 := gsData.SetupGSTransportHost2() + + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1) + require.NoError(t, err) + err = dt1.Start(ctx) + require.NoError(t, err) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2) + require.NoError(t, err) + err = dt2.Start(ctx) + require.NoError(t, err) + + finished := make(chan struct{}, 2) + errChan := make(chan string, 2) + opened := make(chan struct{}, 2) + var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) { + if channelState.Status() == datatransfer.Failed { + finished <- struct{}{} + } + if event.Code == datatransfer.Error { + errChan <- channelState.Message() + } + if event.Code == datatransfer.Open { + opened <- struct{}{} + } + } + dt1.SubscribeToEvents(subscriber) + dt2.SubscribeToEvents(subscriber) + voucher := testutil.FakeDTType{Data: "applesauce"} + + root, _ := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1) + rootCid := root.(cidlink.Link).Cid + + if isPull { + _, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector) + } else { + _, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector) + } + require.NoError(t, err) + opens := 0 + var errMessages []string + finishes := 0 + for opens < 1 || finishes < 1 { + select { + case <-ctx.Done(): + t.Fatal("Did not complete succcessful data transfer") + case <-finished: + finishes++ + case <-opened: + opens++ + case errMessage := <-errChan: + require.Equal(t, errMessage, datatransfer.ErrRejected.Error()) + errMessages = append(errMessages, errMessage) + if len(errMessages) > 1 { + t.Fatal("too many errors") + } + } + } + }) + } +} + func TestDataTransferSubscribing(t *testing.T) { // create network ctx := context.Background()