Skip to content

Commit

Permalink
Feat/cleanup errors (#90)
Browse files Browse the repository at this point in the history
* feat(impl): cleanup errors

cleanup errors -- move more error codes to proper types and insure single error dispatched

* test(impl): add unit test for incomplete response
  • Loading branch information
hannahhoward authored Sep 29, 2020
1 parent 8a1e460 commit d6c2eaf
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 4 deletions.
9 changes: 9 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,12 @@ const ErrPause = errorType("pause channel")
// ErrResume is a special error that the RequestReceived / ResponseReceived hooks can
// use to resume the channel
const ErrResume = errorType("resume channel")

// ErrIncomplete indicates a channel did not finish transferring data successfully
const ErrIncomplete = errorType("incomplete response")

// ErrRejected indicates a request was not accepted
const ErrRejected = errorType("response rejected")

// ErrUnsupported indicates an operation is not supported by the transport protocol
const ErrUnsupported = errorType("unsupported")
12 changes: 10 additions & 2 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
}
if !response.Accepted() {
return m.channels.Error(chid, errors.New("Response Rejected"))
return m.channels.Error(chid, datatransfer.ErrRejected)
}
if response.IsNew() {
err := m.channels.Accept(chid)
Expand Down Expand Up @@ -170,7 +170,15 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
}
return m.channels.FinishTransfer(chid)
}
return m.channels.Error(chid, errors.New("incomplete response"))
chst, err := m.channels.GetByID(context.TODO(), chid)
if err != nil {
return err
}
// send an error, but only if we haven't already errored for some reason
if chst.Status() != datatransfer.Failing && chst.Status() != datatransfer.Failed {
return m.channels.Error(chid, datatransfer.ErrIncomplete)
}
return nil
}

func (m *manager) receiveNewRequest(
Expand Down
4 changes: 2 additions & 2 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe

pausable, ok := m.transport.(datatransfer.PauseableTransport)
if !ok {
return errors.New("unsupported")
return datatransfer.ErrUnsupported
}

err := pausable.PauseChannel(ctx, chid)
Expand All @@ -252,7 +252,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe
func (m *manager) ResumeDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
pausable, ok := m.transport.(datatransfer.PauseableTransport)
if !ok {
return errors.New("unsupported")
return datatransfer.ErrUnsupported
}

err := pausable.ResumeChannel(ctx, m.resumeMessage(chid), chid)
Expand Down
77 changes: 77 additions & 0 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,83 @@ func TestPauseAndResume(t *testing.T) {
}
}

func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
ctx := context.Background()
testCases := map[string]bool{
"push requests": false,
"pull requests": true,
}
for testCase, isPull := range testCases {
t.Run(testCase, func(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1)
require.NoError(t, err)
err = dt1.Start(ctx)
require.NoError(t, err)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
err = dt2.Start(ctx)
require.NoError(t, err)

finished := make(chan struct{}, 2)
errChan := make(chan string, 2)
opened := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.Status() == datatransfer.Failed {
finished <- struct{}{}
}
if event.Code == datatransfer.Error {
errChan <- channelState.Message()
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}

root, _ := testutil.LoadUnixFSFile(ctx, t, gsData.DagService1)
rootCid := root.(cidlink.Link).Cid

if isPull {
_, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
} else {
_, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
}
require.NoError(t, err)
opens := 0
var errMessages []string
finishes := 0
for opens < 1 || finishes < 1 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
case <-finished:
finishes++
case <-opened:
opens++
case errMessage := <-errChan:
require.Equal(t, errMessage, datatransfer.ErrRejected.Error())
errMessages = append(errMessages, errMessage)
if len(errMessages) > 1 {
t.Fatal("too many errors")
}
}
}
})
}
}

func TestDataTransferSubscribing(t *testing.T) {
// create network
ctx := context.Background()
Expand Down
21 changes: 21 additions & 0 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,27 @@ func TestDataTransferResponding(t *testing.T) {
require.False(t, response.IsPaused())
},
},
"validated, incomplete response": {
expectedEvents: []datatransfer.EventCode{
datatransfer.Open,
datatransfer.NewVoucherResult,
datatransfer.Accept,
datatransfer.Error,
datatransfer.CleanupComplete,
},
configureValidator: func(sv *testutil.StubbedValidator) {
sv.ExpectSuccessPull()
sv.StubResult(testutil.NewFakeDTType())
},
configureRevalidator: func(srv *testutil.StubbedRevalidator) {
},
verify: func(t *testing.T, h *receiverHarness) {
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
require.NoError(t, err)
err = h.transport.EventHandler.OnChannelCompleted(channelID(h.id, h.peers), false)
require.NoError(t, err)
},
},
"new push request, customized transport": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.NewVoucherResult, datatransfer.Accept},
configureValidator: func(sv *testutil.StubbedValidator) {
Expand Down

0 comments on commit d6c2eaf

Please sign in to comment.