Skip to content

Commit

Permalink
feat(inputs): Add option to choose the metric time source (#15917)
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman authored Oct 8, 2024
1 parent b029889 commit 56f2d6e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 6 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,8 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e
cp.CollectionJitter, _ = c.getFieldDuration(tbl, "collection_jitter")
cp.CollectionOffset, _ = c.getFieldDuration(tbl, "collection_offset")
cp.StartupErrorBehavior = c.getFieldString(tbl, "startup_error_behavior")
cp.TimeSource = c.getFieldString(tbl, "time_source")

cp.MeasurementPrefix = c.getFieldString(tbl, "name_prefix")
cp.MeasurementSuffix = c.getFieldString(tbl, "name_suffix")
cp.NameOverride = c.getFieldString(tbl, "name_override")
Expand Down
8 changes: 8 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,14 @@ Parameters that can be used with any input plugin:

When this value is set on a service input, multiple events occurring at the
same timestamp may be merged by the output database.
- **time_source**:
Specifies the source of the timestamp on metrics. Possible values are:
- `metric` will not alter the metric (default)
- `collection_start` sets the timestamp to when collection started
- `collection_end` set the timestamp to when collection finished

`time_source` will NOT be used for service inputs. It is up to each individual
service input to set the timestamp.
- **collection_jitter**:
Overrides the `collection_jitter` setting of the [agent][Agent] for the
plugin. Collection jitter is used to jitter the collection by a random
Expand Down
32 changes: 26 additions & 6 deletions models/running_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type RunningInput struct {
log telegraf.Logger
defaultTags map[string]string

startAcc telegraf.Accumulator
started bool
retries uint64
startAcc telegraf.Accumulator
started bool
retries uint64
gatherStart time.Time
gatherEnd time.Time

MetricsGathered selfstat.Stat
GatherTime selfstat.Stat
Expand Down Expand Up @@ -87,6 +89,7 @@ type InputConfig struct {
CollectionJitter time.Duration
CollectionOffset time.Duration
Precision time.Duration
TimeSource string
StartupErrorBehavior string
LogLevel string

Expand Down Expand Up @@ -114,6 +117,14 @@ func (r *RunningInput) Init() error {
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}

switch r.Config.TimeSource {
case "":
r.Config.TimeSource = "metric"
case "metric", "collection_start", "collection_end":
default:
return fmt.Errorf("invalid 'time_source' setting %q", r.Config.TimeSource)
}

if p, ok := r.Input.(telegraf.Initializer); ok {
return p.Init()
}
Expand Down Expand Up @@ -206,6 +217,14 @@ func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
makemetric(metric, "", "", "", local, global)
}

switch r.Config.TimeSource {
case "collection_start":
metric.SetTime(r.gatherStart)
case "collection_end":
metric.SetTime(r.gatherEnd)
default:
}

r.MetricsGathered.Incr(1)
GlobalMetricsGathered.Incr(1)
return metric
Expand All @@ -228,10 +247,11 @@ func (r *RunningInput) Gather(acc telegraf.Accumulator) error {
}
}

start := time.Now()
r.gatherStart = time.Now()
err := r.Input.Gather(acc)
elapsed := time.Since(start)
r.GatherTime.Incr(elapsed.Nanoseconds())
r.gatherEnd = time.Now()

r.GatherTime.Incr(r.gatherEnd.Sub(r.gatherStart).Nanoseconds())
return err
}

Expand Down
59 changes: 59 additions & 0 deletions models/running_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,65 @@ func TestMakeMetricWithAlwaysKeepingPluginTagsEnabled(t *testing.T) {
require.Equal(t, expected, actual)
}

func TestMakeMetricWithGatherMetricTimeSource(t *testing.T) {
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
Tags: make(map[string]string),
Filter: Filter{},
AlwaysIncludeLocalTags: false,
AlwaysIncludeGlobalTags: false,
TimeSource: "metric",
})
start := time.Now()
ri.gatherStart = start
ri.gatherEnd = start.Add(time.Second)

expected := testutil.MockMetrics()[0]

m := testutil.MockMetrics()[0]
actual := ri.MakeMetric(m)

require.Equal(t, expected, actual)
}

func TestMakeMetricWithGatherStartTimeSource(t *testing.T) {
start := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
Tags: make(map[string]string),
Filter: Filter{},
AlwaysIncludeLocalTags: false,
AlwaysIncludeGlobalTags: false,
TimeSource: "collection_start",
})
ri.gatherStart = start

expected := testutil.MockMetrics()[0]
expected.SetTime(start)

m := testutil.MockMetrics()[0]
actual := ri.MakeMetric(m)

require.Equal(t, expected, actual)
}

func TestMakeMetricWithGatherEndTimeSource(t *testing.T) {
end := time.Now()
ri := NewRunningInput(&testInput{}, &InputConfig{
Name: "TestRunningInput",
TimeSource: "collection_end",
})
ri.gatherEnd = end

expected := testutil.MockMetrics()[0]
expected.SetTime(end)

m := testutil.MockMetrics()[0]
actual := ri.MakeMetric(m)

require.Equal(t, expected, actual)
}

type testInput struct{}

func (t *testInput) Description() string { return "" }
Expand Down

0 comments on commit 56f2d6e

Please sign in to comment.