Skip to content

Commit

Permalink
Remove restart ack timeout (#211)
Browse files Browse the repository at this point in the history
* fix: disable restart ack timeout

* fix: option to disable accept and complete timeouts
  • Loading branch information
dirkmc authored Jun 2, 2021
1 parent 3a130c3 commit 2af50e8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 104 deletions.
92 changes: 20 additions & 72 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@ type Monitor struct {
}

type Config struct {
// Max time to wait for other side to accept open channel request before attempting restart
// Max time to wait for other side to accept open channel request before attempting restart.
// Set to 0 to disable timeout.
AcceptTimeout time.Duration
// Debounce when restart is triggered by multiple errors
RestartDebounce time.Duration
// Backoff after restarting
RestartBackoff time.Duration
// Number of times to try to restart before failing
MaxConsecutiveRestarts uint32
// Max time to wait for the peer to acknowledge a restart request.
// Note: Does not include the time taken to reconnect to the peer.
RestartAckTimeout time.Duration
// Max time to wait for the responder to send a Complete message once all
// data has been sent
// data has been sent.
// Set to 0 to disable timeout.
CompleteTimeout time.Duration
// Called when a restart completes successfully
OnRestartComplete func(id datatransfer.ChannelID)
Expand All @@ -74,17 +73,14 @@ func checkConfig(cfg *Config) {
}

prefix := "data-transfer channel monitor config "
if cfg.AcceptTimeout <= 0 {
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be > 0", cfg.AcceptTimeout))
if cfg.AcceptTimeout < 0 {
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be >= 0", cfg.AcceptTimeout))
}
if cfg.MaxConsecutiveRestarts == 0 {
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
}
if cfg.RestartAckTimeout <= 0 {
panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout))
}
if cfg.CompleteTimeout <= 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
if cfg.CompleteTimeout < 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be >= 0", cfg.CompleteTimeout))
}
}

Expand Down Expand Up @@ -275,6 +271,11 @@ func (mc *monitoredChannel) start() {
// an Accept to our open channel request before the accept timeout.
// Returns a function that can be used to cancel the timer.
func (mc *monitoredChannel) watchForResponderAccept() func() {
// Check if the accept timeout is disabled
if mc.cfg.AcceptTimeout == 0 {
return func() {}
}

// Start a timer for the accept timeout
timer := time.NewTimer(mc.cfg.AcceptTimeout)

Expand All @@ -297,6 +298,11 @@ func (mc *monitoredChannel) watchForResponderAccept() func() {

// Wait up to the configured timeout for the responder to send a Complete message
func (mc *monitoredChannel) watchForResponderComplete() {
// Check if the complete timeout is disabled
if mc.cfg.CompleteTimeout == 0 {
return
}

// Start a timer for the complete timeout
timer := time.NewTimer(mc.cfg.CompleteTimeout)
defer timer.Stop()
Expand All @@ -308,7 +314,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
case <-timer.C:
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.chid, mc.cfg.CompleteTimeout)
mc.closeChannelAndShutdown(err)
}
}
Expand Down Expand Up @@ -414,8 +420,7 @@ func (mc *monitoredChannel) doRestartChannel() error {
err := mc.sendRestartMessage(restartCount)
if err != nil {
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
// If the restart message could not be sent, or there was a timeout
// waiting for the restart to be acknowledged, try again
// If the restart message could not be sent, try again
return mc.doRestartChannel()
}
log.Infof("%s: restart completed successfully", mc.chid)
Expand All @@ -438,25 +443,12 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
restartResult := mc.waitForRestartResponse()
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}

// The restart message is fire and forget, so we need to watch for a
// restart response to know that the restart message reached the peer.
select {
case <-mc.ctx.Done():
return nil // channel shutdown so just bail out
case err = <-restartResult:
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}
}
log.Infof("%s: received restart response from %s", mc.chid, p)

// The restart message was sent successfully.
// If a restart backoff is configured, backoff after a restart before
// attempting another.
Expand Down Expand Up @@ -490,47 +482,3 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
}
}

// Wait for the peer to send an acknowledgement to the restart request
func (mc *monitoredChannel) waitForRestartResponse() chan error {
restartFired := make(chan struct{})
restarted := make(chan error, 1)
timer := time.NewTimer(mc.cfg.RestartAckTimeout)

unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.ChannelID() != mc.chid {
return
}

// The Restart event is fired when we receive an acknowledgement
// from the peer that it has received a restart request
if event.Code == datatransfer.Restart {
close(restartFired)
}
})

go func() {
defer unsub()
defer timer.Stop()

select {

// Restart ack received from peer
case <-restartFired:
restarted <- nil

// Channel monitor shutdown, just bail out
case <-mc.ctx.Done():
restarted <- nil

// Timer expired before receiving a restart ack from peer
case <-timer.C:
p := mc.chid.OtherParty(mc.mgr.PeerID())
err := xerrors.Errorf("did not receive response to restart request from %s after %s",
p, mc.cfg.RestartAckTimeout)
restarted <- err
}
}()

return restarted
}
51 changes: 20 additions & 31 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
name string
errReconnect bool
errSendRestartMsg bool
timeoutRestartAck bool
}
testCases := []testCase{{
name: "attempt restart",
Expand All @@ -37,9 +36,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
}, {
name: "fail to send restart message",
errSendRestartMsg: true,
}, {
name: "timeout waiting for restart message ack from peer",
timeoutRestartAck: true,
}}

runTest := func(name string, isPush bool) {
Expand All @@ -59,7 +55,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: time.Hour,
MaxConsecutiveRestarts: 1,
RestartAckTimeout: 20 * time.Millisecond,
CompleteTimeout: time.Hour,
})

Expand Down Expand Up @@ -96,16 +91,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
err := mockAPI.awaitRestartSent()
require.NoError(t, err)

// If simulating a restart ack timeout, don't fire the restart
// ack event and expect the channel to be closed with an error
if tc.timeoutRestartAck {
mockAPI.verifyChannelClosed(t, true)
return
}

// Simulate receiving restart message ack from responder
mockAPI.restartEvent()

if isPush {
// Simulate sending the remaining data
mockAPI.dataSent(5)
Expand Down Expand Up @@ -145,7 +130,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: time.Hour,
MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts),
RestartAckTimeout: time.Hour,
CompleteTimeout: time.Hour,
})

Expand All @@ -169,9 +153,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
err := mockAPI.awaitRestartSent()
require.NoError(t, err)

// Simulate receiving restart ack from peer
mockAPI.restartEvent()

err = awaitRestartComplete(mch)
require.NoError(t, err)
}
Expand Down Expand Up @@ -232,7 +213,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
AcceptTimeout: time.Hour,
RestartDebounce: 10 * time.Millisecond,
MaxConsecutiveRestarts: 3,
RestartAckTimeout: time.Hour,
CompleteTimeout: time.Hour,
})

Expand All @@ -256,9 +236,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
// Trigger another error event before the restart has completed
triggerErrorEvent()

// Simulate receiving restart ack from peer (for first restart)
mockAPI.restartEvent()

// A second restart should be sent because of the second error
err = mockAPI.awaitRestartSent()
require.NoError(t, err)
Expand All @@ -273,9 +250,11 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {

func TestChannelMonitorTimeouts(t *testing.T) {
type testCase struct {
name string
expectAccept bool
expectComplete bool
name string
expectAccept bool
expectComplete bool
acceptTimeoutDisabled bool
completeTimeoutDisabled bool
}
testCases := []testCase{{
name: "accept in time",
Expand All @@ -284,6 +263,10 @@ func TestChannelMonitorTimeouts(t *testing.T) {
}, {
name: "accept too late",
expectAccept: false,
}, {
name: "disable accept timeout",
acceptTimeoutDisabled: true,
expectAccept: true,
}, {
name: "complete in time",
expectAccept: true,
Expand All @@ -292,6 +275,11 @@ func TestChannelMonitorTimeouts(t *testing.T) {
name: "complete too late",
expectAccept: true,
expectComplete: false,
}, {
name: "disable complete timeout",
completeTimeoutDisabled: true,
expectAccept: true,
expectComplete: true,
}}

runTest := func(name string, isPush bool) {
Expand All @@ -309,10 +297,15 @@ func TestChannelMonitorTimeouts(t *testing.T) {

acceptTimeout := 10 * time.Millisecond
completeTimeout := 10 * time.Millisecond
if tc.acceptTimeoutDisabled {
acceptTimeout = 0
}
if tc.completeTimeoutDisabled {
completeTimeout = 0
}
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: acceptTimeout,
MaxConsecutiveRestarts: 1,
RestartAckTimeout: time.Hour,
CompleteTimeout: completeTimeout,
})

Expand Down Expand Up @@ -520,10 +513,6 @@ func (m *mockMonitorAPI) receiveDataErrorEvent() {
m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch)
}

func (m *mockMonitorAPI) restartEvent() {
m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch)
}

type mockChannelState struct {
chid datatransfer.ChannelID
queued uint64
Expand Down
1 change: 0 additions & 1 deletion impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ func TestAutoRestart(t *testing.T) {
RestartDebounce: 500 * time.Millisecond,
RestartBackoff: 500 * time.Millisecond,
MaxConsecutiveRestarts: 10,
RestartAckTimeout: 100 * time.Millisecond,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)
Expand Down

0 comments on commit 2af50e8

Please sign in to comment.