Skip to content

Commit

Permalink
fix: option to disable accept and complete timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jun 1, 2021
1 parent bd98d5b commit 7e07047
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
26 changes: 19 additions & 7 deletions channelmonitor/channelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type Monitor struct {
}

type Config struct {
// Max time to wait for other side to accept open channel request before attempting restart
// Max time to wait for other side to accept open channel request before attempting restart.
// Set to 0 to disable timeout.
AcceptTimeout time.Duration
// Debounce when restart is triggered by multiple errors
RestartDebounce time.Duration
Expand All @@ -47,7 +48,8 @@ type Config struct {
// Number of times to try to restart before failing
MaxConsecutiveRestarts uint32
// Max time to wait for the responder to send a Complete message once all
// data has been sent
// data has been sent.
// Set to 0 to disable timeout.
CompleteTimeout time.Duration
// Called when a restart completes successfully
OnRestartComplete func(id datatransfer.ChannelID)
Expand All @@ -71,14 +73,14 @@ func checkConfig(cfg *Config) {
}

prefix := "data-transfer channel monitor config "
if cfg.AcceptTimeout <= 0 {
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be > 0", cfg.AcceptTimeout))
if cfg.AcceptTimeout < 0 {
panic(fmt.Sprintf(prefix+"AcceptTimeout is %s but must be >= 0", cfg.AcceptTimeout))
}
if cfg.MaxConsecutiveRestarts == 0 {
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
}
if cfg.CompleteTimeout <= 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be > 0", cfg.CompleteTimeout))
if cfg.CompleteTimeout < 0 {
panic(fmt.Sprintf(prefix+"CompleteTimeout is %s but must be >= 0", cfg.CompleteTimeout))
}
}

Expand Down Expand Up @@ -269,6 +271,11 @@ func (mc *monitoredChannel) start() {
// an Accept to our open channel request before the accept timeout.
// Returns a function that can be used to cancel the timer.
func (mc *monitoredChannel) watchForResponderAccept() func() {
// Check if the accept timeout is disabled
if mc.cfg.AcceptTimeout == 0 {
return func() {}
}

// Start a timer for the accept timeout
timer := time.NewTimer(mc.cfg.AcceptTimeout)

Expand All @@ -291,6 +298,11 @@ func (mc *monitoredChannel) watchForResponderAccept() func() {

// Wait up to the configured timeout for the responder to send a Complete message
func (mc *monitoredChannel) watchForResponderComplete() {
// Check if the complete timeout is disabled
if mc.cfg.CompleteTimeout == 0 {
return
}

// Start a timer for the complete timeout
timer := time.NewTimer(mc.cfg.CompleteTimeout)
defer timer.Stop()
Expand All @@ -302,7 +314,7 @@ func (mc *monitoredChannel) watchForResponderComplete() {
case <-timer.C:
// Timer expired before we received a Complete message from the responder
err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer",
mc.chid, mc.cfg.AcceptTimeout)
mc.chid, mc.cfg.CompleteTimeout)
mc.closeChannelAndShutdown(err)
}
}
Expand Down
23 changes: 20 additions & 3 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ func TestChannelMonitorQueuedRestart(t *testing.T) {

func TestChannelMonitorTimeouts(t *testing.T) {
type testCase struct {
name string
expectAccept bool
expectComplete bool
name string
expectAccept bool
expectComplete bool
acceptTimeoutDisabled bool
completeTimeoutDisabled bool
}
testCases := []testCase{{
name: "accept in time",
Expand All @@ -261,6 +263,10 @@ func TestChannelMonitorTimeouts(t *testing.T) {
}, {
name: "accept too late",
expectAccept: false,
}, {
name: "disable accept timeout",
acceptTimeoutDisabled: true,
expectAccept: true,
}, {
name: "complete in time",
expectAccept: true,
Expand All @@ -269,6 +275,11 @@ func TestChannelMonitorTimeouts(t *testing.T) {
name: "complete too late",
expectAccept: true,
expectComplete: false,
}, {
name: "disable complete timeout",
completeTimeoutDisabled: true,
expectAccept: true,
expectComplete: true,
}}

runTest := func(name string, isPush bool) {
Expand All @@ -286,6 +297,12 @@ func TestChannelMonitorTimeouts(t *testing.T) {

acceptTimeout := 10 * time.Millisecond
completeTimeout := 10 * time.Millisecond
if tc.acceptTimeoutDisabled {
acceptTimeout = 0
}
if tc.completeTimeoutDisabled {
completeTimeout = 0
}
m := NewMonitor(mockAPI, &Config{
AcceptTimeout: acceptTimeout,
MaxConsecutiveRestarts: 1,
Expand Down

0 comments on commit 7e07047

Please sign in to comment.