Skip to content

Commit

Permalink
feat: limit consecutive restarts with no data transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Dec 17, 2020
1 parent de0face commit cb9fa71
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 78 deletions.
16 changes: 11 additions & 5 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,26 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
// restarting push channels
// - interval is the time over which minBytesSent must have been sent
// - checksPerInterval is the number of times to check per interval
// - minBytesSent is the minimum amount of data that must have been sent over the interval
// - minBytesSent is the minimum amount of data that must have been sent over
// the interval
// - restartBackoff is the time to wait before checking again for restarts
// - maxConsecutiveRestarts is the maximum number of restarts in a row to
// attempt where no data is transferred. When the limit is reached the
// channel is closed.
func PushChannelRestartConfig(
interval time.Duration,
checksPerInterval uint32,
minBytesSent uint64,
restartBackoff time.Duration,
maxConsecutiveRestarts uint32,
) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
MaxConsecutiveRestarts: maxConsecutiveRestarts,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func TestPushRequestAutoRestart(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond)
restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond, 5)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
Expand Down
68 changes: 48 additions & 20 deletions pushchannelmonitor/pushchannelmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type Monitor struct {
}

type Config struct {
Interval time.Duration
MinBytesSent uint64
ChecksPerInterval uint32
RestartBackoff time.Duration
Interval time.Duration
MinBytesSent uint64
ChecksPerInterval uint32
RestartBackoff time.Duration
MaxConsecutiveRestarts uint32
}

func NewMonitor(mgr monitorAPI, cfg *Config) *Monitor {
Expand Down Expand Up @@ -66,6 +67,9 @@ func checkConfig(cfg *Config) {
if cfg.MinBytesSent == 0 {
panic(fmt.Sprintf(prefix+"MinBytesSent is %d but must be > 0", cfg.MinBytesSent))
}
if cfg.MaxConsecutiveRestarts == 0 {
panic(fmt.Sprintf(prefix+"MaxConsecutiveRestarts is %d but must be > 0", cfg.MaxConsecutiveRestarts))
}
}

// AddChannel adds a channel to the push channel monitor
Expand Down Expand Up @@ -158,10 +162,11 @@ type monitoredChannel struct {
unsub datatransfer.Unsubscribe
onShutdown func(*monitoredChannel)

statsLk sync.RWMutex
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint
statsLk sync.RWMutex
queued uint64
sent uint64
dataRatePoints chan *dataRatePoint
consecutiveRestarts int

restartLk sync.RWMutex
restarting bool
Expand Down Expand Up @@ -220,6 +225,8 @@ func (mc *monitoredChannel) start() {
case datatransfer.DataSent:
// Keep track of the amount of data sent
mc.sent = channelState.Sent()
// Some data was sent so reset the consecutive restart counter
mc.consecutiveRestarts = 0
}
})
}
Expand Down Expand Up @@ -277,31 +284,52 @@ func (mc *monitoredChannel) restartChannel() {
return
}

mc.statsLk.Lock()
mc.consecutiveRestarts++
restartCount := mc.consecutiveRestarts
mc.statsLk.Unlock()

if uint32(restartCount) > mc.cfg.MaxConsecutiveRestarts {
// If no data has been transferred since the last transfer, and we've
// reached the consecutive restart limit, close the channel and
// shutdown the monitor
log.Errorf("Closing channel after %d consecutive restarts for push data-channel %s", restartCount, mc.chid)
mc.closeChannelAndShutdown()
return
}

defer func() {
// Backoff a little time after a restart before attempting another
select {
case <-time.After(mc.cfg.RestartBackoff):
case <-mc.ctx.Done():
if mc.cfg.RestartBackoff > 0 {
// Backoff a little time after a restart before attempting another
select {
case <-time.After(mc.cfg.RestartBackoff):
case <-mc.ctx.Done():
}
}

mc.restartLk.Lock()
mc.restarting = false
mc.restartLk.Unlock()
}()

// Send a restart message for the channel
// Send a restart message for the channel.
// Note that at the networking layer there is logic to retry if a network
// connection cannot be established, so this may take some time.
log.Infof("Sending restart message for push data-channel %s", mc.chid)
err := mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
log.Warnf("closing channel after failing to send restart message for push data-channel %s: %s", mc.chid, err)

// If it wasn't possible to restart the channel, close the channel
// and shut down the monitor
defer mc.Shutdown()
log.Errorf("Closing channel after failing to send restart message for push data-channel %s: %s", mc.chid, err)
mc.closeChannelAndShutdown()
}
}

err := mc.mgr.CloseDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
log.Errorf("error closing data transfer channel %s: %w", mc.chid, err)
}
func (mc *monitoredChannel) closeChannelAndShutdown() {
err := mc.mgr.CloseDataTransferChannel(mc.ctx, mc.chid)
if err != nil {
log.Errorf("Error closing data transfer channel %s: %w", mc.chid, err)
}

mc.Shutdown()
}
Loading

0 comments on commit cb9fa71

Please sign in to comment.