Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit consecutive restarts with no data transfer #129

Merged
merged 1 commit into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,26 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
// restarting push channels
// - interval is the time over which minBytesSent must have been sent
// - checksPerInterval is the number of times to check per interval
// - minBytesSent is the minimum amount of data that must have been sent over the interval
// - minBytesSent is the minimum amount of data that must have been sent over
// the interval
// - restartBackoff is the time to wait before checking again for restarts
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
// attempt where no data is transferred. When the limit is reached the
// channel is closed.
func PushChannelRestartConfig(
interval time.Duration,
checksPerInterval uint32,
minBytesSent uint64,
restartBackoff time.Duration,
maxConsecutiveRestarts uint32,
) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
MaxConsecutiveRestarts: maxConsecutiveRestarts,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func TestPushRequestAutoRestart(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond)
restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond, 5)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
Expand Down
68 changes: 48 additions & 20 deletions pushchannelmonitor/pushchannelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type Monitor struct {
}

type Config struct {
Interval time.Duration
MinBytesSent uint64
ChecksPerInterval uint32
RestartBackoff time.Duration
Interval time.Duration
MinBytesSent uint64
ChecksPerInterval uint32
RestartBackoff time.Duration
MaxConsecutiveRestarts uint32
}

func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor {
Expand Down Expand Up @@ -66,6 +67,9 @@ func checkConfig(cfg *Config) {
if cfg.MinBytesSent == 0 {
panic(fmt.Sprintf(prefix+"MinBytesSent is %d but must be > 0", cfg.MinBytesSent))
}
if cfg.MaxConsecutiveRestarts == 0 {
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
}
}

// AddChannel adds a channel to the push channel monitor
Expand Down Expand Up @@ -158,10 +162,11 @@ type monitoredChannel struct {
unsub datatransfer.Unsubscribe
onShutdown func(*monitoredChannel)

statsLk sync.RWMutex
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint
statsLk sync.RWMutex
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint
consecutiveRestarts int

restartLk sync.RWMutex
restarting bool
Expand Down Expand Up @@ -220,6 +225,8 @@ func (mc *monitoredChannel) start() {
case datatransfer.DataSent:
// Keep track of the amount of data sent
mc.sent = channelState.Sent()
// Some data was sent so reset the consecutive restart counter
mc.consecutiveRestarts = 0
}
})
}
Expand Down Expand Up @@ -277,31 +284,52 @@ func (mc *monitoredChannel) restartChannel() {
return
}

mc.statsLk.Lock()
mc.consecutiveRestarts++
restartCount := mc.consecutiveRestarts
mc.statsLk.Unlock()

if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts {
// If no data has been transferred since the last transfer, and we've
// reached the consecutive restart limit, close the channel and
// shutdown the monitor
log.Errorf("Closing channel after %d consecutive restarts for push data-channel %s", restartCount, mc.chid)
mc.closeChannelAndShutdown()
return
}

defer func() {
// Backoff a little time after a restart before attempting another
select {
case <-time.After(mc.cfg.RestartBackoff):
case <-mc.ctx.Done():
if mc.cfg.RestartBackoff > 0 {
// Backoff a little time after a restart before attempting another
select {
case <-time.After(mc.cfg.RestartBackoff):
case <-mc.ctx.Done():
}
}

mc.restartLk.Lock()
mc.restarting = false
mc.restartLk.Unlock()
}()

// Send a restart message for the channel
// Send a restart message for the channel.
// Note that at the networking layer there is logic to retry if a network
// connection cannot be established, so this may take some time.
log.Infof("Sending restart message for push data-channel %s", mc.chid)
err := mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
log.Warnf("closing channel after failing to send restart message for push data-channel %s: %s", mc.chid, err)

// If it wasn't possible to restart the channel, close the channel
// and shut down the monitor
defer mc.Shutdown()
log.Errorf("Closing channel after failing to send restart message for push data-channel %s: %s", mc.chid, err)
mc.closeChannelAndShutdown()
}
}

err := mc.mgr.CloseDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
log.Errorf("error closing data transfer channel %s: %w", mc.chid, err)
}
func (mc *monitoredChannel) closeChannelAndShutdown() {
err := mc.mgr.CloseDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
log.Errorf("Error closing data transfer channel %s: %w", mc.chid, err)
}

mc.Shutdown()
}
Loading