Skip to content

Commit

Permalink
fix: disable restart ack timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jun 1, 2021
1 parent 3a130c3 commit bd98d5b
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 93 deletions.
66 changes: 1 addition & 65 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type Config struct {
RestartBackoff time.Duration
// Number of times to try to restart before failing
MaxConsecutiveRestarts uint32
// Max time to wait for the peer to acknowledge a restart request.
// Note: Does not include the time taken to reconnect to the peer.
RestartAckTimeout time.Duration
// Max time to wait for the responder to send a Complete message once all
// data has been sent
CompleteTimeout time.Duration
Expand Down Expand Up @@ -80,9 +77,6 @@ func checkConfig(cfg *Config) {
if cfg.MaxConsecutiveRestarts == 0 {
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
}
if cfg.RestartAckTimeout <= 0 {
panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout))
}
if cfg.CompleteTimeout <= 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
}
Expand Down Expand Up @@ -414,8 +408,7 @@ func (mc *monitoredChannel) doRestartChannel() error {
err := mc.sendRestartMessage(restartCount)
if err != nil {
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
// If the restart message could not be sent, or there was a timeout
// waiting for the restart to be acknowledged, try again
// If the restart message could not be sent, try again
return mc.doRestartChannel()
}
log.Infof("%s: restart completed successfully", mc.chid)
Expand All @@ -438,25 +431,12 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
restartResult := mc.waitForRestartResponse()
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}

// The restart message is fire and forget, so we need to watch for a
// restart response to know that the restart message reached the peer.
select {
case <-mc.ctx.Done():
return nil // channel shutdown so just bail out
case err = <-restartResult:
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}
}
log.Infof("%s: received restart response from %s", mc.chid, p)

// The restart message was sent successfully.
// If a restart backoff is configured, backoff after a restart before
// attempting another.
Expand Down Expand Up @@ -490,47 +470,3 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
}
}

// Wait for the peer to send an acknowledgement to the restart request
func (mc *monitoredChannel) waitForRestartResponse() chan error {
restartFired := make(chan struct{})
restarted := make(chan error, 1)
timer := time.NewTimer(mc.cfg.RestartAckTimeout)

unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.ChannelID() != mc.chid {
return
}

// The Restart event is fired when we receive an acknowledgement
// from the peer that it has received a restart request
if event.Code == datatransfer.Restart {
close(restartFired)
}
})

go func() {
defer unsub()
defer timer.Stop()

select {

// Restart ack received from peer
case <-restartFired:
restarted <- nil

// Channel monitor shutdown, just bail out
case <-mc.ctx.Done():
restarted <- nil

// Timer expired before receiving a restart ack from peer
case <-timer.C:
p := mc.chid.OtherParty(mc.mgr.PeerID())
err := xerrors.Errorf("did not receive response to restart request from %s after %s",
p, mc.cfg.RestartAckTimeout)
restarted <- err
}
}()

return restarted
}
28 changes: 0 additions & 28 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
name string
errReconnect bool
errSendRestartMsg bool
timeoutRestartAck bool
}
testCases := []testCase{{
name: "attempt restart",
Expand All @@ -37,9 +36,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
}, {
name: "fail to send restart message",
errSendRestartMsg: true,
}, {
name: "timeout waiting for restart message ack from peer",
timeoutRestartAck: true,
}}

runTest := func(name string, isPush bool) {
Expand All @@ -59,7 +55,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: time.Hour,
MaxConsecutiveRestarts: 1,
RestartAckTimeout: 20 * time.Millisecond,
CompleteTimeout: time.Hour,
})

Expand Down Expand Up @@ -96,16 +91,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
err := mockAPI.awaitRestartSent()
require.NoError(t, err)

// If simulating a restart ack timeout, don't fire the restart
// ack event and expect the channel to be closed with an error
if tc.timeoutRestartAck {
mockAPI.verifyChannelClosed(t, true)
return
}

// Simulate receiving restart message ack from responder
mockAPI.restartEvent()

if isPush {
// Simulate sending the remaining data
mockAPI.dataSent(5)
Expand Down Expand Up @@ -145,7 +130,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: time.Hour,
MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts),
RestartAckTimeout: time.Hour,
CompleteTimeout: time.Hour,
})

Expand All @@ -169,9 +153,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
err := mockAPI.awaitRestartSent()
require.NoError(t, err)

// Simulate receiving restart ack from peer
mockAPI.restartEvent()

err = awaitRestartComplete(mch)
require.NoError(t, err)
}
Expand Down Expand Up @@ -232,7 +213,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
AcceptTimeout: time.Hour,
RestartDebounce: 10 * time.Millisecond,
MaxConsecutiveRestarts: 3,
RestartAckTimeout: time.Hour,
CompleteTimeout: time.Hour,
})

Expand All @@ -256,9 +236,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
// Trigger another error event before the restart has completed
triggerErrorEvent()

// Simulate receiving restart ack from peer (for first restart)
mockAPI.restartEvent()

// A second restart should be sent because of the second error
err = mockAPI.awaitRestartSent()
require.NoError(t, err)
Expand Down Expand Up @@ -312,7 +289,6 @@ func TestChannelMonitorTimeouts(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: acceptTimeout,
MaxConsecutiveRestarts: 1,
RestartAckTimeout: time.Hour,
CompleteTimeout: completeTimeout,
})

Expand Down Expand Up @@ -520,10 +496,6 @@ func (m *mockMonitorAPI) receiveDataErrorEvent() {
m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch)
}

func (m *mockMonitorAPI) restartEvent() {
m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch)
}

type mockChannelState struct {
chid datatransfer.ChannelID
queued uint64
Expand Down

0 comments on commit bd98d5b

Please sign in to comment.