From eee8d1cde5d5d3074ff824e9f0661d98c4c499fd Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 13 Oct 2020 02:20:17 -0700 Subject: [PATCH] Handle network errors/stalls (#101) * feat(datatransfer): handle network errors Handle errors when network messages don't go through * fix(datatransfer): cleanups lint, remove unused status --- channels/channels_fsm.go | 10 ++- channels/channels_test.go | 2 +- errors.go | 6 ++ go.mod | 2 +- go.sum | 4 +- impl/events.go | 38 ++++++++-- impl/impl.go | 26 ++++++- impl/initiating_test.go | 23 ++++-- impl/restart.go | 5 +- statuses.go | 3 - testutil/fakegraphsync.go | 14 ++++ transport.go | 4 + transport/graphsync/graphsync.go | 14 ++++ transport/graphsync/graphsync_test.go | 104 +++++++++++++++++++++++++- 14 files changed, 225 insertions(+), 30 deletions(-) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index f1e7ae74..8d0812e1 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -17,7 +17,10 @@ var log = logging.Logger("data-transfer") var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested), fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing), - fsm.Event(datatransfer.Restart).FromAny().To(datatransfer.Ongoing), + fsm.Event(datatransfer.Restart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error { + chst.Message = "" + return nil + }), fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling), @@ -46,7 +49,10 @@ var ChannelEvents = fsm.Events{ return nil }), - fsm.Event(datatransfer.Disconnected).FromAny().To(datatransfer.PeerDisconnected), + fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error { + chst.Message = datatransfer.ErrDisconnected.Error() + return nil + }), fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() diff --git a/channels/channels_test.go b/channels/channels_test.go index 123b5f06..0f8d715c 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -303,7 +303,7 @@ func TestChannels(t *testing.T) { err = channelList.Disconnected(chid) require.NoError(t, err) state = checkEvent(ctx, t, received, datatransfer.Disconnected) - require.Equal(t, datatransfer.PeerDisconnected, state.Status()) + require.Equal(t, datatransfer.ErrDisconnected.Error(), state.Message()) }) t.Run("test self peer and other peer", func(t *testing.T) { diff --git a/errors.go b/errors.go index cd2e1cf1..be3b5462 100644 --- a/errors.go +++ b/errors.go @@ -33,3 +33,9 @@ const ErrRejected = errorType("response rejected") // ErrUnsupported indicates an operation is not supported by the transport protocol const ErrUnsupported = errorType("unsupported") + +// ErrDisconnected indicates the other peer may have hung up and you should try restarting the channel. +const ErrDisconnected = errorType("other peer appears to have hung up. restart Channel") + +// ErrRemoved indicates the channel was inactive long enough that it was put in a permaneant error state +const ErrRemoved = errorType("channel removed due to inactivity") diff --git a/go.mod b/go.mod index f7cb1319..cfe552ef 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 - github.com/ipfs/go-graphsync v0.2.1 + github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index d8a04ff7..cc38b6ac 100644 --- a/go.sum +++ b/go.sum @@ -198,8 +198,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.2.1 h1:MdehhqBSuTI2LARfKLkpYnt0mUrqHs/mtuDnESXHBfU= -github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10= +github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c h1:De/AZGvRa3WMyw5zdMMhcvRcho46BVo+C0NRud+T4io= +github.com/ipfs/go-graphsync v0.2.1-0.20201013053840-5d8ea8076a2c/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ= diff --git a/impl/events.go b/impl/events.go index 57badec9..2f96bb27 100644 --- a/impl/events.go +++ b/impl/events.go @@ -17,8 +17,6 @@ import ( "github.com/filecoin-project/go-data-transfer/registry" ) -var ChannelRemoveTimeout = 1 * time.Hour - func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error { has, err := m.channels.HasChannel(chid) if err != nil { @@ -170,12 +168,42 @@ func (m *manager) OnRequestTimedOut(ctx context.Context, chid datatransfer.Chann go func() { select { case <-ctx.Done(): - case <-time.After(ChannelRemoveTimeout): + case <-time.After(m.channelRemoveTimeout): + channel, err := m.channels.GetByID(ctx, chid) + if err == nil { + if !(channels.IsChannelTerminated(channel.Status()) || + channels.IsChannelCleaningUp(channel.Status())) { + if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil { + log.Errorf("failed to cancel timed-out channel: %v", err) + return + } + log.Warnf("channel %+v has ben cancelled because of timeout", chid) + } + } + } + }() + + return nil +} + +func (m *manager) OnRequestDisconnected(ctx context.Context, chid datatransfer.ChannelID) error { + log.Warnf("channel %+v has stalled or disconnected", chid) + + // mark peer disconnected for informational purposes + err := m.channels.Disconnected(chid) + if err != nil { + return err + } + + go func() { + select { + case <-ctx.Done(): + case <-time.After(m.channelRemoveTimeout): channel, err := m.channels.GetByID(ctx, chid) if err == nil { if !(channels.IsChannelTerminated(channel.Status()) || channels.IsChannelCleaningUp(channel.Status())) { - if err := m.channels.Cancel(chid); err != nil { + if err := m.channels.Error(chid, datatransfer.ErrRemoved); err != nil { log.Errorf("failed to cancel timed-out channel: %v", err) return } @@ -198,7 +226,7 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool) if msg != nil { if err := m.dataTransferNetwork.SendMessage(context.TODO(), chid.Initiator, msg); err != nil { log.Warnf("failed to send completion message, err : %v", err) - return m.channels.Disconnected(chid) + return m.OnRequestDisconnected(context.TODO(), chid) } } if msg.Accepted() { diff --git a/impl/impl.go b/impl/impl.go index a2a7ef16..633d900e 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/hannahhoward/go-pubsub" "github.com/hashicorp/go-multierror" @@ -39,6 +40,7 @@ type manager struct { peerID peer.ID transport datatransfer.Transport storedCounter *storedcounter.StoredCounter + channelRemoveTimeout time.Duration } type internalEvent struct { @@ -72,8 +74,20 @@ func readyDispatcher(evt pubsub.Event, fn pubsub.SubscriberFn) error { return nil } +// DataTransferOption configures the data transfer manager +type DataTransferOption func(*manager) + +// ChannelRemoveTimeout sets the timeout after which channels are removed from the manager +func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption { + return func(m *manager) { + m.channelRemoveTimeout = timeout + } +} + +const defaultChannelRemoveTimeout = 1 * time.Hour + // NewDataTransfer initializes a new instance of a data transfer manager -func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter) (datatransfer.Manager, error) { +func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) { m := &manager{ dataTransferNetwork: dataTransferNetwork, validatedTypes: registry.NewRegistry(), @@ -85,12 +99,16 @@ func NewDataTransfer(ds datastore.Batching, dataTransferNetwork network.DataTran peerID: dataTransferNetwork.ID(), transport: transport, storedCounter: storedCounter, + channelRemoveTimeout: defaultChannelRemoveTimeout, } channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m}, dataTransferNetwork.ID()) if err != nil { return nil, err } m.channels = channels + for _, option := range options { + option(m) + } return m, nil } @@ -230,7 +248,7 @@ func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.Channe } if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), updateRequest); err != nil { err = fmt.Errorf("Unable to send request: %w", err) - _ = m.channels.Error(channelID, err) + _ = m.OnRequestDisconnected(ctx, channelID) return err } return m.channels.NewVoucher(channelID, voucher) @@ -249,7 +267,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe if err := m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid)); err != nil { err = fmt.Errorf("Unable to send cancel message: %w", err) - _ = m.channels.Error(chid, err) + _ = m.OnRequestDisconnected(ctx, chid) return err } @@ -271,7 +289,7 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe if err := m.dataTransferNetwork.SendMessage(ctx, chid.OtherParty(m.peerID), m.pauseMessage(chid)); err != nil { err = fmt.Errorf("Unable to send pause message: %w", err) - _ = m.channels.Error(chid, err) + _ = m.OnRequestDisconnected(ctx, chid) return err } diff --git a/impl/initiating_test.go b/impl/initiating_test.go index 29c729c6..c2141c69 100644 --- a/impl/initiating_test.go +++ b/impl/initiating_test.go @@ -28,6 +28,7 @@ func TestDataTransferInitiating(t *testing.T) { ctx := context.Background() testCases := map[string]struct { expectedEvents []datatransfer.EventCode + options []DataTransferOption verify func(t *testing.T, h *harness) }{ "OpenPushDataTransfer": { @@ -83,14 +84,9 @@ func TestDataTransferInitiating(t *testing.T) { }, }, "Remove Timed-out request": { - expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Cancel, datatransfer.CleanupComplete}, + expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Error, datatransfer.CleanupComplete}, + options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)}, verify: func(t *testing.T, h *harness) { - orig := ChannelRemoveTimeout - ChannelRemoveTimeout = 10 * time.Millisecond - defer func() { - ChannelRemoveTimeout = orig - }() - channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor) require.NoError(t, err) require.NoError(t, h.transport.EventHandler.OnRequestTimedOut(ctx, channelID)) @@ -98,6 +94,17 @@ func TestDataTransferInitiating(t *testing.T) { time.Sleep(1 * time.Second) }, }, + "Remove disconnected request": { + expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Disconnected, datatransfer.Error, datatransfer.CleanupComplete}, + options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)}, + verify: func(t *testing.T, h *harness) { + channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor) + require.NoError(t, err) + require.NoError(t, h.transport.EventHandler.OnRequestDisconnected(ctx, channelID)) + // need time for the events to take place + time.Sleep(1 * time.Second) + }, + }, "SendVoucher with no channel open": { verify: func(t *testing.T, h *harness) { err := h.dt.SendVoucher(h.ctx, datatransfer.ChannelID{Initiator: h.peers[1], Responder: h.peers[0], ID: 999999}, h.voucher) @@ -344,7 +351,7 @@ func TestDataTransferInitiating(t *testing.T) { h.transport = testutil.NewFakeTransport() h.ds = dss.MutexWrap(datastore.NewMapDatastore()) h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter")) - dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter) + dt, err := NewDataTransfer(h.ds, h.network, h.transport, h.storedCounter, verify.options...) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt) h.dt = dt diff --git a/impl/restart.go b/impl/restart.go index 21b1fa85..79dc17f2 100644 --- a/impl/restart.go +++ b/impl/restart.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/go-data-transfer/message" ) +// ChannelDataTransferType identifies the type of a data transfer channel for the purposes of a restart type ChannelDataTransferType int const ( @@ -23,10 +24,10 @@ const ( // ManagerPeerCreatePush is the type of a channel wherein the manager peer created a Push Data Transfer ManagerPeerCreatePush - // ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Pull Data Transfer Request + // ManagerPeerReceivePull is the type of a channel wherein the manager peer received a Pull Data Transfer Request ManagerPeerReceivePull - // ManagerPeerCreatePush is the type of a channel wherein the manager peer received a Push Data Transfer Request + // ManagerPeerReceivePush is the type of a channel wherein the manager peer received a Push Data Transfer Request ManagerPeerReceivePush ) diff --git a/statuses.go b/statuses.go index d3173979..6a4c89be 100644 --- a/statuses.go +++ b/statuses.go @@ -58,9 +58,6 @@ const ( // ChannelNotFoundError means the searched for data transfer does not exist ChannelNotFoundError - - // PeerDisconnected means that we do NOT have a connection to the other peer - PeerDisconnected ) // Statuses are human readable names for data transfer states diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index e825e813..299e9db0 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -110,6 +110,8 @@ type FakeGraphSync struct { RequestUpdatedHook graphsync.OnRequestUpdatedHook IncomingResponseHook graphsync.OnIncomingResponseHook RequestorCancelledListener graphsync.OnRequestorCancelledListener + BlockSentListener graphsync.OnBlockSentListener + NetworkErrorListener graphsync.OnNetworkErrorListener } // NewFakeGraphSync returns a new fake graphsync implementation @@ -352,6 +354,18 @@ func (fgs *FakeGraphSync) RegisterRequestorCancelledListener(listener graphsync. return nil } +// RegisterBlockSentListener adds a listener on the responder as blocks go out +func (fgs *FakeGraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc { + fgs.BlockSentListener = listener + return nil +} + +// RegisterNetworkErrorListener adds a listener on the responder as blocks go out +func (fgs *FakeGraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc { + fgs.NetworkErrorListener = listener + return nil +} + var _ graphsync.GraphExchange = &FakeGraphSync{} type fakeBlkData struct { diff --git a/transport.go b/transport.go index 95b939e4..3ca94ba6 100644 --- a/transport.go +++ b/transport.go @@ -51,6 +51,10 @@ type EventsHandler interface { // OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out. // Error returns are logged but otherwise have no effect OnRequestTimedOut(ctx context.Context, chid ChannelID) error + + // OnRequestDisconnected is called when a network error occurs in a graphsync request + // or we appear to stall while receiving data + OnRequestDisconnected(ctx context.Context, chid ChannelID) error } /* diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 738ef5b6..431ea7e0 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -284,6 +284,7 @@ func (t *Transport) SetEventHandler(events datatransfer.EventsHandler) error { t.gs.RegisterIncomingResponseHook(t.gsIncomingResponseHook) t.gs.RegisterRequestUpdatedHook(t.gsRequestUpdatedHook) t.gs.RegisterRequestorCancelledListener(t.gsRequestorCancelledListener) + t.gs.RegisterNetworkErrorListener(t.gsNetworkErrorListener) return nil } @@ -597,3 +598,16 @@ func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.Re t.requestorCancelledMap[chid] = struct{}{} } } + +func (t *Transport) gsNetworkErrorListener(p peer.ID, request graphsync.RequestData, err error) { + t.dataLock.Lock() + defer t.dataLock.Unlock() + + chid, ok := t.graphsyncRequestMap[graphsyncKey{request.ID(), p}] + if ok { + err := t.events.OnRequestDisconnected(context.TODO(), chid) + if err != nil { + log.Error(err) + } + } +} diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 138c509a..df35a60c 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -628,6 +628,16 @@ func TestManager(t *testing.T) { assertDecodesToMessage(t, gsData.incomingRequestHookActions.SentExtension.Data, gsData.incoming) }, }, + "recognized incoming request will record network error": { + action: func(gsData *harness) { + gsData.incomingRequestHook() + gsData.networkErrorListener(errors.New("something went wrong")) + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + require.Equal(t, 1, events.OnRequestReceivedCallCount) + require.True(t, events.OnRequestDisconnectedCalled) + }, + }, "open channel adds doNotSendCids to the DoNotSend extension": { action: func(gsData *harness) { cids := testutil.GenerateCids(2) @@ -686,6 +696,85 @@ func TestManager(t *testing.T) { require.NoError(t, requestReceived2.Ctx.Err()) }, }, + "OnChannelCompleted called when outgoing request completes successfully": { + action: func(gsData *harness) { + gsData.fgs.LeaveRequestsOpen() + stor, _ := gsData.outgoing.Selector() + + _ = gsData.transport.OpenChannel( + gsData.ctx, + gsData.other, + datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, + cidlink.Link{Cid: gsData.outgoing.BaseCid()}, + stor, + nil, + gsData.outgoing) + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + close(requestReceived.ResponseErrChan) + + require.Eventually(t, func() bool { + return events.OnChannelCompletedCalled == true + }, 2*time.Second, 100*time.Millisecond) + require.True(t, events.ChannelCompletedSuccess) + }, + }, + "OnChannelCompleted called when outgoing request completes with error": { + action: func(gsData *harness) { + gsData.fgs.LeaveRequestsOpen() + stor, _ := gsData.outgoing.Selector() + + _ = gsData.transport.OpenChannel( + gsData.ctx, + gsData.other, + datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, + cidlink.Link{Cid: gsData.outgoing.BaseCid()}, + stor, + nil, + gsData.outgoing) + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + requestReceived.ResponseErrChan <- graphsync.RequestFailedUnknownErr{} + close(requestReceived.ResponseErrChan) + + require.Eventually(t, func() bool { + return events.OnChannelCompletedCalled == true + }, 2*time.Second, 100*time.Millisecond) + require.False(t, events.ChannelCompletedSuccess) + }, + }, + "OnChannelComplete when outgoing request cancelled by caller": { + action: func(gsData *harness) { + gsData.fgs.LeaveRequestsOpen() + stor, _ := gsData.outgoing.Selector() + + _ = gsData.transport.OpenChannel( + gsData.ctx, + gsData.other, + datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, + cidlink.Link{Cid: gsData.outgoing.BaseCid()}, + stor, + nil, + gsData.outgoing) + + gsData.outgoingRequestHook() + }, + check: func(t *testing.T, events *fakeEvents, gsData *harness) { + requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + extensions := make(map[graphsync.ExtensionName][]byte) + for _, ext := range requestReceived.Extensions { + extensions[ext.Name] = ext.Data + } + request := testutil.NewFakeRequest(graphsync.RequestID(rand.Int31()), extensions) + gsData.fgs.OutgoingRequestHook(gsData.other, request, gsData.outgoingRequestHookActions) + _ = gsData.transport.CloseChannel(gsData.ctx, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}) + require.Eventually(t, func() bool { + return requestReceived.Ctx.Err() != nil + }, 2*time.Second, 100*time.Millisecond) + }, + }, "request times out if we get request context cancelled error": { action: func(gsData *harness) { gsData.fgs.LeaveRequestsOpen() @@ -849,8 +938,10 @@ type fakeEvents struct { OnChannelCompletedCalled bool OnChannelCompletedErr error - OnRequestTimedOutCalled bool - OnRequestTimedOutChannelId datatransfer.ChannelID + OnRequestTimedOutCalled bool + OnRequestTimedOutChannelId datatransfer.ChannelID + OnRequestDisconnectedCalled bool + OnRequestDisconnectedChannelID datatransfer.ChannelID ChannelCompletedSuccess bool DataSentMessage datatransfer.Message @@ -866,6 +957,12 @@ func (fe *fakeEvents) OnRequestTimedOut(_ context.Context, chid datatransfer.Cha return nil } +func (fe *fakeEvents) OnRequestDisconnected(_ context.Context, chid datatransfer.ChannelID) error { + fe.OnRequestDisconnectedCalled = true + fe.OnRequestDisconnectedChannelID = chid + return nil +} + func (fe *fakeEvents) OnChannelOpened(chid datatransfer.ChannelID) error { fe.ChannelOpenedChannelID = chid return fe.OnChannelOpenedError @@ -954,6 +1051,9 @@ func (ha *harness) responseCompletedListener() { func (ha *harness) requestorCancelledListener() { ha.fgs.RequestorCancelledListener(ha.other, ha.request) } +func (ha *harness) networkErrorListener(err error) { + ha.fgs.NetworkErrorListener(ha.other, ha.request, err) +} type dtConfig struct { dtExtensionMissing bool