Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metric prefix filtering #4878

Merged
merged 5 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
// Setup telemetry related config
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics
conf.DisableDispatchedJobSummaryMetrics = agentConfig.Telemetry.DisableDispatchedJobSummaryMetrics
conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics

return conf, nil
Expand Down
13 changes: 13 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
metricsConf.EnableHostname = true
}

allowedPrefixes, blockedPrefixes, err := telConfig.PrefixFilters()
if err != nil {
return inm, err
}

metricsConf.AllowedPrefixes = allowedPrefixes
metricsConf.BlockedPrefixes = blockedPrefixes

if telConfig.FilterDefault != nil {
metricsConf.FilterDefault = *telConfig.FilterDefault
}

// Configure the statsite sink
var fanout metrics.FanoutSink
if telConfig.StatsiteAddr != "" {
Expand Down Expand Up @@ -895,6 +907,7 @@ func (c *Command) setupTelemetry(config *Config) (*metrics.InmemSink, error) {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, inm)
}

return inm, nil
}

Expand Down
43 changes: 43 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,19 @@ type Telemetry struct {
// key/value structure as done in older versions of Nomad
BackwardsCompatibleMetrics bool `mapstructure:"backwards_compatible_metrics"`

// PrefixFilter allows for filtering out metrics from being collected
nickethier marked this conversation as resolved.
Show resolved Hide resolved
PrefixFilter []string `mapstructure:"prefix_filter"`

// FilterDefault controls whether to allow metrics that have not been specified
// by the filter
FilterDefault *bool `mapstructure:"filter_default"`

// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
// publishing Job summary metrics. This is useful in environment that produce
// high numbers of single count dispatch jobs as the metrics for each take up
// a small memory overhead.
DisableDispatchedJobSummaryMetrics bool `mapstructure:"disable_dispatched_job_summary_metrics"`
nickethier marked this conversation as resolved.
Show resolved Hide resolved

// Circonus: see https://github.com/circonus-labs/circonus-gometrics
// for more details on the various configuration options.
// Valid configuration combinations:
Expand Down Expand Up @@ -506,6 +519,24 @@ type Telemetry struct {
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
}

// PrefixFilters parses the PrefixFilter field and returns a list of allowed and blocked filters
func (t *Telemetry) PrefixFilters() (allowed, blocked []string, err error) {
for _, rule := range t.PrefixFilter {
if rule == "" {
continue
}
switch rule[0] {
case '+':
allowed = append(allowed, rule[1:])
case '-':
blocked = append(blocked, rule[1:])
default:
return nil, nil, fmt.Errorf("Filter rule must begin with either '+' or '-': %q", rule)
}
}
return allowed, blocked, nil
}

// Ports encapsulates the various ports we bind to for network services. If any
// are not specified then the defaults are used instead.
type Ports struct {
Expand Down Expand Up @@ -1323,6 +1354,18 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
result.BackwardsCompatibleMetrics = b.BackwardsCompatibleMetrics
}

if b.PrefixFilter != nil {
result.PrefixFilter = b.PrefixFilter
}

if b.FilterDefault != nil {
result.FilterDefault = b.FilterDefault
}

if b.DisableDispatchedJobSummaryMetrics {
result.DisableDispatchedJobSummaryMetrics = b.DisableDispatchedJobSummaryMetrics
}

return &result
}

Expand Down
3 changes: 3 additions & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ func parseTelemetry(result **Telemetry, list *ast.ObjectList) error {
"circonus_broker_select_tag",
"disable_tagged_metrics",
"backwards_compatible_metrics",
"prefix_filter",
"filter_default",
"disable_dispatched_job_summary_metrics",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
Expand Down
44 changes: 44 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestConfig_Merge(t *testing.T) {
CirconusCheckTags: "cat1:tag1,cat2:tag2",
CirconusBrokerID: "0",
CirconusBrokerSelectTag: "dc:dc1",
PrefixFilter: []string{"filter1", "filter2"},
nickethier marked this conversation as resolved.
Show resolved Hide resolved
},
Client: &ClientConfig{
Enabled: false,
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestConfig_Merge(t *testing.T) {
CirconusCheckTags: "cat1:tag1,cat2:tag2",
CirconusBrokerID: "1",
CirconusBrokerSelectTag: "dc:dc2",
PrefixFilter: []string{"prefix1", "prefix2"},
},
Client: &ClientConfig{
Enabled: true,
Expand Down Expand Up @@ -1013,3 +1015,45 @@ func TestMergeServerJoin(t *testing.T) {
require.Equal(result.RetryInterval, retryInterval)
}
}

func TestTelemetry_PrefixFilters(t *testing.T) {
t.Parallel()
cases := []struct {
in []string
expAllow []string
expBlock []string
expErr bool
}{
{
in: []string{"+foo"},
expAllow: []string{"foo"},
},
{
in: []string{"-foo"},
expBlock: []string{"foo"},
},
{
in: []string{"+a.b.c", "-x.y.z"},
expAllow: []string{"a.b.c"},
expBlock: []string{"x.y.z"},
},
{
in: []string{"+foo", "bad", "-bar"},
expErr: true,
},
}

for i, c := range cases {
t.Run(fmt.Sprintf("PrefixCase%d", i), func(t *testing.T) {
require := require.New(t)
tel := &Telemetry{
PrefixFilter: c.in,
}

allow, block, err := tel.PrefixFilters()
require.Exactly(c.expAllow, allow)
require.Exactly(c.expBlock, block)
require.Equal(c.expErr, err != nil)
})
}
}
4 changes: 4 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ type Config struct {
// key/value/tag format, or simply a key/value format
DisableTaggedMetrics bool

// DisableDispatchedJobSummaryMetrics allows for ignore dispatched jobs when
// publishing Job summary metrics
DisableDispatchedJobSummaryMetrics bool

// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older versions, or to only show the new format
BackwardsCompatibleMetrics bool
Expand Down
77 changes: 44 additions & 33 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,45 +613,56 @@ func (s *Server) publishJobSummaryMetrics(stopCh chan struct{}) {
break
}
summary := raw.(*structs.JobSummary)
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
}
job, err := state.JobByID(ws, summary.Namespace, summary.JobID)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to lookup job for summary: %v", err)
}
if s.config.DisableDispatchedJobSummaryMetrics && job.Dispatched {
nickethier marked this conversation as resolved.
Show resolved Hide resolved
continue
}
s.iterateJobSummaryMetrics(summary)
}
}
}
}

func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
for name, tgSummary := range summary.Summary {
if !s.config.DisableTaggedMetrics {
labels := []metrics.Label{
{
Name: "job",
Value: summary.JobID,
},
{
Name: "task_group",
Value: name,
},
}
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "queued"},
float32(tgSummary.Queued), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "complete"},
float32(tgSummary.Complete), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "failed"},
float32(tgSummary.Failed), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "running"},
float32(tgSummary.Running), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "starting"},
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
}
if s.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "queued"}, float32(tgSummary.Queued))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "complete"}, float32(tgSummary.Complete))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "failed"}, float32(tgSummary.Failed))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "running"}, float32(tgSummary.Running))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "starting"}, float32(tgSummary.Starting))
metrics.SetGauge([]string{"nomad", "job_summary", summary.JobID, name, "lost"}, float32(tgSummary.Lost))
}
}
}

// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
Expand Down
14 changes: 7 additions & 7 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,13 +1574,13 @@ func (n *Node) Stub() *NodeListStub {
addr, _, _ := net.SplitHostPort(n.HTTPAddr)

return &NodeListStub{
Address: addr,
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Version: n.Attributes["nomad.version"],
Drain: n.Drain,
Address: addr,
ID: n.ID,
Datacenter: n.Datacenter,
Name: n.Name,
NodeClass: n.NodeClass,
Version: n.Attributes["nomad.version"],
Drain: n.Drain,
SchedulingEligibility: n.SchedulingEligibility,
Status: n.Status,
StatusDescription: n.StatusDescription,
Expand Down
25 changes: 24 additions & 1 deletion website/source/docs/agent/configuration/telemetry.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,36 @@ The following options are available on all telemetry configurations.
only be added to tagged metrics. Note that this option is used to transition
monitoring to tagged metrics and will eventually be deprecated.


- `disable_tagged_metrics` `(bool: false)` - Specifies if Nomad should not emit
tagged metrics and only emit metrics compatible with versions below Nomad
0.7. Note that this option is used to transition monitoring to tagged
metrics and will eventually be deprecated.

- `filter_default` `(bool: true)` - This controls whether to allow metrics that
have not been specified by the filter. Defaults to true, which will allow all
metrics when no filters are provided. When set to false with no filters, no
metrics will be sent.

- `prefix_filter` `(list: [])` - This is a list of filter rules to apply for
allowing/blocking metrics by prefix. A leading "<b>+</b>" will enable any
metrics with the given prefix, and a leading "<b>-</b>" will block them. If
there is overlap between two rules, the more specific rule will take
precedence. Blocking will take priority if the same prefix is listed multiple
times.

```javascript
[
"-nomad.raft",
"+nomad.raft.apply",
"-nomad.memberlist",
]
```

- `disable_dispatched_job_summary_metrics` `(bool: false)` - Specifies if Nomad
should ignore jobs dispatched from a parameterized job when publishing job
summary statistics. Since each job has a small memory overhead for tracking
summary statistics, it is sometimes desired to trade these statistics for
more memory when dispatching high volumes of jobs.

### `statsite`

Expand Down