Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework plugin tickers to prevent drift and spread write ticks #7390

Merged
merged 3 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 34 additions & 65 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,57 +265,45 @@ func (a *Agent) runInputs(
interval = input.Config.Interval
}

var ticker Ticker
if a.Config.Agent.RoundInterval {
ticker = NewAlignedTicker(startTime, interval, jitter)
} else {
ticker = NewUnalignedTicker(interval, jitter)
reimda marked this conversation as resolved.
Show resolved Hide resolved
}
defer ticker.Stop()
ssoroka marked this conversation as resolved.
Show resolved Hide resolved

acc := NewAccumulator(input, dst)
acc.SetPrecision(a.Precision())

wg.Add(1)
go func(input *models.RunningInput) {
defer wg.Done()

if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

a.gatherOnInterval(ctx, acc, input, interval, jitter)
a.gatherLoop(ctx, acc, input, ticker)
}(input)
}
wg.Wait()

wg.Wait()
return nil
}

// gather runs an input's gather function periodically until the context is
// done.
func (a *Agent) gatherOnInterval(
func (a *Agent) gatherLoop(
ctx context.Context,
acc telegraf.Accumulator,
input *models.RunningInput,
interval time.Duration,
jitter time.Duration,
ticker Ticker,
) {
defer panicRecover(input)

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
err := internal.SleepContext(ctx, internal.RandomDuration(jitter))
if err != nil {
return
}

err = a.gatherOnce(acc, input, interval)
if err != nil {
acc.AddError(err)
}

select {
case <-ticker.C:
continue
case <-ticker.Elapsed():
err := a.gatherOnce(acc, input, ticker)
if err != nil {
acc.AddError(err)
}
case <-ctx.Done():
return
}
Expand All @@ -327,11 +315,8 @@ func (a *Agent) gatherOnInterval(
func (a *Agent) gatherOnce(
acc telegraf.Accumulator,
input *models.RunningInput,
timeout time.Duration,
ticker Ticker,
) error {
ticker := time.NewTicker(timeout)
defer ticker.Stop()

done := make(chan error)
go func() {
done <- input.Gather(acc)
Expand All @@ -341,7 +326,7 @@ func (a *Agent) gatherOnce(
select {
case err := <-done:
return err
case <-ticker.C:
case <-ticker.Elapsed():
log.Printf("W! [agent] [%s] did not complete within its interval",
input.LogName())
}
Expand Down Expand Up @@ -514,10 +499,13 @@ func (a *Agent) runOutputs(
jitter = *output.Config.FlushJitter
}

ticker := NewRollingTicker(interval, jitter)
defer ticker.Stop()

wg.Add(1)
go func(output *models.RunningOutput) {
defer wg.Done()
a.flushLoop(ctx, startTime, output, interval, jitter)
a.flushLoop(ctx, output, ticker)
}(output)
}

Expand All @@ -542,10 +530,8 @@ func (a *Agent) runOutputs(
// done.
func (a *Agent) flushLoop(
ctx context.Context,
startTime time.Time,
output *models.RunningOutput,
interval time.Duration,
jitter time.Duration,
ticker Ticker,
) {
logError := func(err error) {
if err != nil {
Expand All @@ -558,44 +544,30 @@ func (a *Agent) flushLoop(
watchForFlushSignal(flushRequested)
defer stopListeningForFlushSignal(flushRequested)

// align to round interval
if a.Config.Agent.RoundInterval {
err := internal.SleepContext(
ctx, internal.AlignDuration(startTime, interval))
if err != nil {
return
}
}

// since we are watching two channels we need a ticker with the jitter
// integrated.
ticker := NewTicker(interval, jitter)
defer ticker.Stop()

for {
// Favor shutdown over other methods.
select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
logError(a.flushOnce(output, ticker, output.Write))
return
default:
}

select {
case <-ctx.Done():
logError(a.flushOnce(output, interval, output.Write))
logError(a.flushOnce(output, ticker, output.Write))
return
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
case <-flushRequested:
logError(a.flushOnce(output, interval, output.Write))
logError(a.flushOnce(output, ticker, output.Write))
case <-output.BatchReady:
// Favor the ticker over batch ready
select {
case <-ticker.C:
logError(a.flushOnce(output, interval, output.Write))
case <-ticker.Elapsed():
logError(a.flushOnce(output, ticker, output.Write))
default:
logError(a.flushOnce(output, interval, output.WriteBatch))
logError(a.flushOnce(output, ticker, output.WriteBatch))
}
}
}
Expand All @@ -605,12 +577,9 @@ func (a *Agent) flushLoop(
// interval it fails to complete before.
func (a *Agent) flushOnce(
output *models.RunningOutput,
timeout time.Duration,
ticker Ticker,
writeFunc func() error,
) error {
ticker := time.NewTicker(timeout)
defer ticker.Stop()

done := make(chan error)
go func() {
done <- writeFunc()
Expand All @@ -621,7 +590,7 @@ func (a *Agent) flushOnce(
case err := <-done:
output.LogBufferStatus()
return err
case <-ticker.C:
case <-ticker.Elapsed():
log.Printf("W! [agent] [%q] did not complete within its flush interval",
output.LogName())
output.LogBufferStatus()
Expand Down
Loading