From bd98d5be5319409cff3128f9d888a6ab0f2f328f Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 1 Jun 2021 09:45:46 -0600 Subject: [PATCH] fix: disable restart ack timeout --- channelmonitor/channelmonitor.go | 66 +-------------------------- channelmonitor/channelmonitor_test.go | 28 ------------ 2 files changed, 1 insertion(+), 93 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 5c53624b..02a29034 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -46,9 +46,6 @@ type Config struct { RestartBackoff time.Duration // Number of times to try to restart before failing MaxConsecutiveRestarts uint32 - // Max time to wait for the peer to acknowledge a restart request. - // Note: Does not include the time taken to reconnect to the peer. - RestartAckTimeout time.Duration // Max time to wait for the responder to send a Complete message once all // data has been sent CompleteTimeout time.Duration @@ -80,9 +77,6 @@ func checkConfig(cfg *Config) { if cfg.MaxConsecutiveRestarts == 0 { panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts)) } - if cfg.RestartAckTimeout <= 0 { - panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout)) - } if cfg.CompleteTimeout <= 0 { panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout)) } @@ -414,8 +408,7 @@ func (mc *monitoredChannel) doRestartChannel() error { err := mc.sendRestartMessage(restartCount) if err != nil { log.Warnf("%s: restart failed, trying again: %s", mc.chid, err) - // If the restart message could not be sent, or there was a timeout - // waiting for the restart to be acknowledged, try again + // If the restart message could not be sent, try again return mc.doRestartChannel() } log.Infof("%s: restart completed successfully", mc.chid) @@ -438,25 +431,12 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) // Send a restart message for the channel - restartResult := mc.waitForRestartResponse() log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) if err != nil { return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) } - // The restart message is fire and forget, so we need to watch for a - // restart response to know that the restart message reached the peer. - select { - case <-mc.ctx.Done(): - return nil // channel shutdown so just bail out - case err = <-restartResult: - if err != nil { - return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) - } - } - log.Infof("%s: received restart response from %s", mc.chid, p) - // The restart message was sent successfully. // If a restart backoff is configured, backoff after a restart before // attempting another. @@ -490,47 +470,3 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) { log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err) } } - -// Wait for the peer to send an acknowledgement to the restart request -func (mc *monitoredChannel) waitForRestartResponse() chan error { - restartFired := make(chan struct{}) - restarted := make(chan error, 1) - timer := time.NewTimer(mc.cfg.RestartAckTimeout) - - unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { - if channelState.ChannelID() != mc.chid { - return - } - - // The Restart event is fired when we receive an acknowledgement - // from the peer that it has received a restart request - if event.Code == datatransfer.Restart { - close(restartFired) - } - }) - - go func() { - defer unsub() - defer timer.Stop() - - select { - - // Restart ack received from peer - case <-restartFired: - restarted <- nil - - // Channel monitor shutdown, just bail out - case <-mc.ctx.Done(): - restarted <- nil - - // Timer expired before receiving a restart ack from peer - case <-timer.C: - p := mc.chid.OtherParty(mc.mgr.PeerID()) - err := xerrors.Errorf("did not receive response to restart request from %s after %s", - p, mc.cfg.RestartAckTimeout) - restarted <- err - } - }() - - return restarted -} diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index 81f4177b..70f95642 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -27,7 +27,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { name string errReconnect bool errSendRestartMsg bool - timeoutRestartAck bool } testCases := []testCase{{ name: "attempt restart", @@ -37,9 +36,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { }, { name: "fail to send restart message", errSendRestartMsg: true, - }, { - name: "timeout waiting for restart message ack from peer", - timeoutRestartAck: true, }} runTest := func(name string, isPush bool) { @@ -59,7 +55,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: time.Hour, MaxConsecutiveRestarts: 1, - RestartAckTimeout: 20 * time.Millisecond, CompleteTimeout: time.Hour, }) @@ -96,16 +91,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) { err := mockAPI.awaitRestartSent() require.NoError(t, err) - // If simulating a restart ack timeout, don't fire the restart - // ack event and expect the channel to be closed with an error - if tc.timeoutRestartAck { - mockAPI.verifyChannelClosed(t, true) - return - } - - // Simulate receiving restart message ack from responder - mockAPI.restartEvent() - if isPush { // Simulate sending the remaining data mockAPI.dataSent(5) @@ -145,7 +130,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: time.Hour, MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts), - RestartAckTimeout: time.Hour, CompleteTimeout: time.Hour, }) @@ -169,9 +153,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) { err := mockAPI.awaitRestartSent() require.NoError(t, err) - // Simulate receiving restart ack from peer - mockAPI.restartEvent() - err = awaitRestartComplete(mch) require.NoError(t, err) } @@ -232,7 +213,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) { AcceptTimeout: time.Hour, RestartDebounce: 10 * time.Millisecond, MaxConsecutiveRestarts: 3, - RestartAckTimeout: time.Hour, CompleteTimeout: time.Hour, }) @@ -256,9 +236,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) { // Trigger another error event before the restart has completed triggerErrorEvent() - // Simulate receiving restart ack from peer (for first restart) - mockAPI.restartEvent() - // A second restart should be sent because of the second error err = mockAPI.awaitRestartSent() require.NoError(t, err) @@ -312,7 +289,6 @@ func TestChannelMonitorTimeouts(t *testing.T) { m := NewMonitor(mockAPI, &Config{ AcceptTimeout: acceptTimeout, MaxConsecutiveRestarts: 1, - RestartAckTimeout: time.Hour, CompleteTimeout: completeTimeout, }) @@ -520,10 +496,6 @@ func (m *mockMonitorAPI) receiveDataErrorEvent() { m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch) } -func (m *mockMonitorAPI) restartEvent() { - m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch) -} - type mockChannelState struct { chid datatransfer.ChannelID queued uint64