Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove restart ack timeout #211

Merged
merged 2 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 20 additions & 72 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,18 @@ type Monitor struct {
}

type Config struct {
// Max time to wait for other side to accept open channel request before attempting restart
// Max time to wait for other side to accept open channel request before attempting restart.
// Set to 0 to disable timeout.
AcceptTimeout time.Duration
// Debounce when restart is triggered by multiple errors
RestartDebounce time.Duration
// Backoff after restarting
RestartBackoff time.Duration
// Number of times to try to restart before failing
MaxConsecutiveRestarts uint32
// Max time to wait for the peer to acknowledge a restart request.
// Note: Does not include the time taken to reconnect to the peer.
RestartAckTimeout time.Duration
// Max time to wait for the responder to send a Complete message once all
// data has been sent
// data has been sent.
// Set to 0 to disable timeout.
CompleteTimeout time.Duration
// Called when a restart completes successfully
OnRestartComplete func(id datatransfer.ChannelID)
Expand All @@ -74,17 +73,14 @@ func checkConfig(cfg *Config) {
}

prefix := "data-transfer channel monitor config "
if cfg.AcceptTimeout <= 0 {
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be > 0", cfg.AcceptTimeout))
if cfg.AcceptTimeout < 0 {
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be >= 0", cfg.AcceptTimeout))
}
if cfg.MaxConsecutiveRestarts == 0 {
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
}
if cfg.RestartAckTimeout <= 0 {
panic(fmt.Sprintf(prefix+"RestartAckTimeout is %s but must be > 0", cfg.RestartAckTimeout))
}
if cfg.CompleteTimeout <= 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
if cfg.CompleteTimeout < 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be >= 0", cfg.CompleteTimeout))
}
}

Expand Down Expand Up @@ -275,6 +271,11 @@ func (mc *monitoredChannel) start() {
// an Accept to our open channel request before the accept timeout.
// Returns a function that can be used to cancel the timer.
func (mc *monitoredChannel) watchForResponderAccept() func() {
// Check if the accept timeout is disabled
if mc.cfg.AcceptTimeout == 0 {
return func() {}
}

// Start a timer for the accept timeout
timer := time.NewTimer(mc.cfg.AcceptTimeout)

Expand All @@ -297,6 +298,11 @@ func (mc *monitoredChannel) watchForResponderAccept() func() {

// Wait up to the configured timeout for the responder to send a Complete message
func (mc *monitoredChannel) watchForResponderComplete() {
// Check if the complete timeout is disabled
if mc.cfg.CompleteTimeout == 0 {
return
}

// Start a timer for the complete timeout
timer := time.NewTimer(mc.cfg.CompleteTimeout)
defer timer.Stop()
Expand All @@ -308,7 +314,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
case <-timer.C:
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.chid, mc.cfg.CompleteTimeout)
mc.closeChannelAndShutdown(err)
}
}
Expand Down Expand Up @@ -414,8 +420,7 @@ func (mc *monitoredChannel) doRestartChannel() error {
err := mc.sendRestartMessage(restartCount)
if err != nil {
log.Warnf("%s: restart failed, trying again: %s", mc.chid, err)
// If the restart message could not be sent, or there was a timeout
// waiting for the restart to be acknowledged, try again
// If the restart message could not be sent, try again
return mc.doRestartChannel()
}
log.Infof("%s: restart completed successfully", mc.chid)
Expand All @@ -438,25 +443,12 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error {
log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start))

// Send a restart message for the channel
restartResult := mc.waitForRestartResponse()
log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount)
err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}

// The restart message is fire and forget, so we need to watch for a
// restart response to know that the restart message reached the peer.
select {
case <-mc.ctx.Done():
return nil // channel shutdown so just bail out
case err = <-restartResult:
if err != nil {
return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err)
}
}
log.Infof("%s: received restart response from %s", mc.chid, p)

// The restart message was sent successfully.
// If a restart backoff is configured, backoff after a restart before
// attempting another.
Expand Down Expand Up @@ -490,47 +482,3 @@ func (mc *monitoredChannel) closeChannelAndShutdown(cherr error) {
log.Errorf("error closing data-transfer channel %s: %s", mc.chid, err)
}
}

// Wait for the peer to send an acknowledgement to the restart request
func (mc *monitoredChannel) waitForRestartResponse() chan error {
restartFired := make(chan struct{})
restarted := make(chan error, 1)
timer := time.NewTimer(mc.cfg.RestartAckTimeout)

unsub := mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if channelState.ChannelID() != mc.chid {
return
}

// The Restart event is fired when we receive an acknowledgement
// from the peer that it has received a restart request
if event.Code == datatransfer.Restart {
close(restartFired)
}
})

go func() {
defer unsub()
defer timer.Stop()

select {

// Restart ack received from peer
case <-restartFired:
restarted <- nil

// Channel monitor shutdown, just bail out
case <-mc.ctx.Done():
restarted <- nil

// Timer expired before receiving a restart ack from peer
case <-timer.C:
p := mc.chid.OtherParty(mc.mgr.PeerID())
err := xerrors.Errorf("did not receive response to restart request from %s after %s",
p, mc.cfg.RestartAckTimeout)
restarted <- err
}
}()

return restarted
}
51 changes: 20 additions & 31 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
name string
errReconnect bool
errSendRestartMsg bool
timeoutRestartAck bool
}
testCases := []testCase{{
name: "attempt restart",
Expand All @@ -37,9 +36,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
}, {
name: "fail to send restart message",
errSendRestartMsg: true,
}, {
name: "timeout waiting for restart message ack from peer",
timeoutRestartAck: true,
}}

runTest := func(name string, isPush bool) {
Expand All @@ -59,7 +55,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: time.Hour,
MaxConsecutiveRestarts: 1,
RestartAckTimeout: 20 * time.Millisecond,
CompleteTimeout: time.Hour,
})

Expand Down Expand Up @@ -96,16 +91,6 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
err := mockAPI.awaitRestartSent()
require.NoError(t, err)

// If simulating a restart ack timeout, don't fire the restart
// ack event and expect the channel to be closed with an error
if tc.timeoutRestartAck {
mockAPI.verifyChannelClosed(t, true)
return
}

// Simulate receiving restart message ack from responder
mockAPI.restartEvent()

if isPush {
// Simulate sending the remaining data
mockAPI.dataSent(5)
Expand Down Expand Up @@ -145,7 +130,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: time.Hour,
MaxConsecutiveRestarts: uint32(maxConsecutiveRestarts),
RestartAckTimeout: time.Hour,
CompleteTimeout: time.Hour,
})

Expand All @@ -169,9 +153,6 @@ func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
err := mockAPI.awaitRestartSent()
require.NoError(t, err)

// Simulate receiving restart ack from peer
mockAPI.restartEvent()

err = awaitRestartComplete(mch)
require.NoError(t, err)
}
Expand Down Expand Up @@ -232,7 +213,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
AcceptTimeout: time.Hour,
RestartDebounce: 10 * time.Millisecond,
MaxConsecutiveRestarts: 3,
RestartAckTimeout: time.Hour,
CompleteTimeout: time.Hour,
})

Expand All @@ -256,9 +236,6 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {
// Trigger another error event before the restart has completed
triggerErrorEvent()

// Simulate receiving restart ack from peer (for first restart)
mockAPI.restartEvent()

// A second restart should be sent because of the second error
err = mockAPI.awaitRestartSent()
require.NoError(t, err)
Expand All @@ -273,9 +250,11 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {

func TestChannelMonitorTimeouts(t *testing.T) {
type testCase struct {
name string
expectAccept bool
expectComplete bool
name string
expectAccept bool
expectComplete bool
acceptTimeoutDisabled bool
completeTimeoutDisabled bool
}
testCases := []testCase{{
name: "accept in time",
Expand All @@ -284,6 +263,10 @@ func TestChannelMonitorTimeouts(t *testing.T) {
}, {
name: "accept too late",
expectAccept: false,
}, {
name: "disable accept timeout",
acceptTimeoutDisabled: true,
expectAccept: true,
}, {
name: "complete in time",
expectAccept: true,
Expand All @@ -292,6 +275,11 @@ func TestChannelMonitorTimeouts(t *testing.T) {
name: "complete too late",
expectAccept: true,
expectComplete: false,
}, {
name: "disable complete timeout",
completeTimeoutDisabled: true,
expectAccept: true,
expectComplete: true,
}}

runTest := func(name string, isPush bool) {
Expand All @@ -309,10 +297,15 @@ func TestChannelMonitorTimeouts(t *testing.T) {

acceptTimeout := 10 * time.Millisecond
completeTimeout := 10 * time.Millisecond
if tc.acceptTimeoutDisabled {
acceptTimeout = 0
}
if tc.completeTimeoutDisabled {
completeTimeout = 0
}
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: acceptTimeout,
MaxConsecutiveRestarts: 1,
RestartAckTimeout: time.Hour,
CompleteTimeout: completeTimeout,
})

Expand Down Expand Up @@ -520,10 +513,6 @@ func (m *mockMonitorAPI) receiveDataErrorEvent() {
m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch)
}

func (m *mockMonitorAPI) restartEvent() {
m.fireEvent(datatransfer.Event{Code: datatransfer.Restart}, m.ch)
}

type mockChannelState struct {
chid datatransfer.ChannelID
queued uint64
Expand Down
1 change: 0 additions & 1 deletion impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,6 @@ func TestAutoRestart(t *testing.T) {
RestartDebounce: 500 * time.Millisecond,
RestartBackoff: 500 * time.Millisecond,
MaxConsecutiveRestarts: 10,
RestartAckTimeout: 100 * time.Millisecond,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)
Expand Down