Skip to content

Commit

Permalink
Cherry-pick #18691 to 7.x: Move service config under stackdriver.metr…
Browse files Browse the repository at this point in the history
…ics and simplify metric_types (#18981)

* Move service config under stackdriver.metrics and simplify metric_types (#18691)

* Move service name config under stackdriver.metrics and simplify metric_types
* change config stackdriver.metrics to metrics

(cherry picked from commit be501ae)
  • Loading branch information
kaiyan-sheng committed Jun 5, 2020
1 parent da19adc commit 4d6cd60
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 104 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- kubernetes.container.cpu.limit.cores and kubernetes.container.cpu.requests.cores are now floats. {issue}11975[11975]
- Update cloudwatch metricset mapping for both metrics and dimensions. {pull}15245[15245]
- Make use of secure port when accessing Kubelet API {pull}16063[16063]
- Move service config under metrics and simplify metric types. {pull}18691[18691]

*Packetbeat*

Expand Down
17 changes: 8 additions & 9 deletions metricbeat/docs/modules/googlecloud.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,14 @@ metricbeat.modules:
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 1m
stackdriver:
service: compute
metrics:
- aligner: ALIGN_NONE
metric_types:
- "compute.googleapis.com/instance/cpu/reserved_cores"
- "compute.googleapis.com/instance/cpu/usage_time"
- "compute.googleapis.com/instance/cpu/utilization"
- "compute.googleapis.com/instance/uptime"
metrics:
- aligner: ALIGN_NONE
service: compute
metric_types:
- "instance/cpu/reserved_cores"
- "instance/cpu/usage_time"
- "instance/cpu/utilization"
- "instance/uptime"
----

[float]
Expand Down
17 changes: 8 additions & 9 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -531,15 +531,14 @@ metricbeat.modules:
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 1m
stackdriver:
service: compute
metrics:
- aligner: ALIGN_NONE
metric_types:
- "compute.googleapis.com/instance/cpu/reserved_cores"
- "compute.googleapis.com/instance/cpu/usage_time"
- "compute.googleapis.com/instance/cpu/utilization"
- "compute.googleapis.com/instance/uptime"
metrics:
- aligner: ALIGN_NONE
service: compute
metric_types:
- "instance/cpu/reserved_cores"
- "instance/cpu/usage_time"
- "instance/cpu/utilization"
- "instance/uptime"

#------------------------------- Graphite Module -------------------------------
- module: graphite
Expand Down
17 changes: 8 additions & 9 deletions x-pack/metricbeat/module/googlecloud/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 1m
stackdriver:
service: compute
metrics:
- aligner: ALIGN_NONE
metric_types:
- "compute.googleapis.com/instance/cpu/reserved_cores"
- "compute.googleapis.com/instance/cpu/usage_time"
- "compute.googleapis.com/instance/cpu/utilization"
- "compute.googleapis.com/instance/uptime"
metrics:
- aligner: ALIGN_NONE
service: compute
metric_types:
- "instance/cpu/reserved_cores"
- "instance/cpu/usage_time"
- "instance/cpu/utilization"
- "instance/uptime"
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ given aggregation aligner applied for each metric type.
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 300s
stackdriver:
service: compute
metrics:
- aligner: ALIGN_MEAN
metric_types:
- "compute.googleapis.com/instance/cpu/usage_time"
- "compute.googleapis.com/instance/cpu/utilization"
- aligner: ALIGN_SUM
metric_types:
- "compute.googleapis.com/instance/uptime"
metrics:
- aligner: ALIGN_MEAN
service: compute
metric_types:
- "instance/cpu/usage_time"
- "instance/cpu/utilization"
- aligner: ALIGN_SUM
service: compute
metric_types:
- "instance/uptime"
----

Expand All @@ -83,14 +83,14 @@ ignored.
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 60s
stackdriver:
service: compute
metrics:
- aligner: ALIGN_MEAN
metric_types:
- "compute.googleapis.com/instance/cpu/usage_time"
- "compute.googleapis.com/instance/cpu/utilization"
- aligner: ALIGN_SUM
metric_types:
- "compute.googleapis.com/instance/uptime"
metrics:
- aligner: ALIGN_MEAN
service: compute
metric_types:
- "instance/cpu/usage_time"
- "instance/cpu/utilization"
- aligner: ALIGN_SUM
service: compute
metric_types:
- "instance/uptime"
----
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func GetConfigForTest(t *testing.T, metricSetName string) map[string]interface{}
}

if metricSetName == "stackdriver" {
config["stackdriver.service"] = "compute"
stackDriverConfig := stackDriverConfig{
ServiceName: "compute",
Aligner: "ALIGN_NONE",
MetricTypes: []string{"compute.googleapis.com/instance/uptime"},
MetricTypes: []string{"instance/uptime"},
}
config["stackdriver.metrics"] = stackDriverConfig
config["metrics"] = stackDriverConfig
}
}
return config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

// NewMetadataServiceForConfig returns a service to fetch metadata from a config struct. It must return the Compute
// abstraction to fetch metadata, the pubsub abstraction, etc.
func NewMetadataServiceForConfig(c config) (googlecloud.MetadataService, error) {
switch c.ServiceName {
func NewMetadataServiceForConfig(c config, serviceName string) (googlecloud.MetadataService, error) {
switch serviceName {
case googlecloud.ServiceCompute:
return compute.NewMetadataService(c.ProjectID, c.Zone, c.Region, c.opt...)
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,27 @@ func (r *stackdriverMetricsRequester) Metric(ctx context.Context, metricType str
return
}

func (r *stackdriverMetricsRequester) Metrics(ctx context.Context, stackDriverConfigs []stackDriverConfig, metricsMeta map[string]metricMeta) ([]timeSeriesWithAligner, error) {
func (r *stackdriverMetricsRequester) Metrics(ctx context.Context, sdc stackDriverConfig, metricsMeta map[string]metricMeta) ([]timeSeriesWithAligner, error) {
var lock sync.Mutex
var wg sync.WaitGroup
results := make([]timeSeriesWithAligner, 0)

for _, sdc := range stackDriverConfigs {
aligner := sdc.Aligner
for _, mt := range sdc.MetricTypes {
metricType := mt
wg.Add(1)

go func(metricType string) {
defer wg.Done()

metricMeta := metricsMeta[metricType]
interval, aligner := getTimeIntervalAligner(metricMeta.ingestDelay, metricMeta.samplePeriod, r.config.period, aligner)
ts := r.Metric(ctx, metricType, interval, aligner)

lock.Lock()
defer lock.Unlock()
results = append(results, ts)
}(metricType)
}
aligner := sdc.Aligner
serviceName := sdc.ServiceName
for _, mt := range sdc.MetricTypes {
wg.Add(1)

go func(mt string) {
defer wg.Done()

metricMeta := metricsMeta[mt]
r.logger.Debugf("For metricType %s, metricMeta = %s", mt, metricMeta)
interval, aligner := getTimeIntervalAligner(metricMeta.ingestDelay, metricMeta.samplePeriod, r.config.period, aligner)
ts := r.Metric(ctx, serviceName+".googleapis.com/"+mt, interval, aligner)
lock.Lock()
defer lock.Unlock()
results = append(results, ts)
}(mt)
}

wg.Wait()
Expand Down Expand Up @@ -139,6 +137,7 @@ func (r *stackdriverMetricsRequester) getFilterForMetric(m string) (f string) {
f = fmt.Sprintf(`%s AND resource.labels.zone = starts_with("%s")`, f, zone)
}
}
r.logger.Debugf("ListTimeSeries API filter = %s", f)
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,37 @@ func TestGetFilterForMetric(t *testing.T) {
{
"compute service with zone in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}, logger: logger},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = starts_with(\"us-central1-a\")",
},
{
"pubsub service with zone in config",
"pubsub.googleapis.com/subscription/ack_message_count",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}, logger: logger},
"metric.type=\"pubsub.googleapis.com/subscription/ack_message_count\"",
},
{
"loadbalancing service with zone in config",
"loadbalancing.googleapis.com/https/backend_latencies",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}, logger: logger},
"metric.type=\"loadbalancing.googleapis.com/https/backend_latencies\"",
},
{
"compute service with region in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
stackdriverMetricsRequester{config: config{Region: "us-east1"}, logger: logger},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = starts_with(\"us-east1\")",
},
{
"pubsub service with region in config",
"pubsub.googleapis.com/subscription/ack_message_count",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
stackdriverMetricsRequester{config: config{Region: "us-east1"}, logger: logger},
"metric.type=\"pubsub.googleapis.com/subscription/ack_message_count\"",
},
{
"loadbalancing service with region in config",
"loadbalancing.googleapis.com/https/backend_latencies",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
stackdriverMetricsRequester{config: config{Region: "us-east1"}, logger: logger},
"metric.type=\"loadbalancing.googleapis.com/https/backend_latencies\"",
},
{
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestGetFilterForMetric(t *testing.T) {
{
"compute service with no region/zone in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{}},
stackdriverMetricsRequester{config: config{}, logger: logger},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\"",
},
}
Expand Down
44 changes: 27 additions & 17 deletions x-pack/metricbeat/module/googlecloud/stackdriver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type MetricSet struct {

//stackDriverConfig holds a configuration specific for stackdriver metricset.
type stackDriverConfig struct {
ServiceName string `config:"service" validate:"required"`
MetricTypes []string `config:"metric_types" validate:"required"`
Aligner string `config:"aligner"`
}
Expand All @@ -65,7 +66,6 @@ type config struct {
Region string `config:"region"`
ProjectID string `config:"project_id" validate:"required"`
ExcludeLabels bool `config:"exclude_labels"`
ServiceName string `config:"stackdriver.service" validate:"required"`
CredentialsFilePath string `config:"credentials_file_path"`

opt []option.ClientOption
Expand All @@ -84,14 +84,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

stackDriverConfigs := struct {
StackDriverMetrics []stackDriverConfig `config:"stackdriver.metrics" validate:"nonzero,required"`
StackDriverMetrics []stackDriverConfig `config:"metrics" validate:"nonzero,required"`
}{}

if err := base.Module().UnpackConfig(&stackDriverConfigs); err != nil {
return nil, err
}

m.stackDriverConfig = stackDriverConfigs.StackDriverMetrics
fmt.Println("m.stackDriverConfig = ", m.stackDriverConfig)
m.config.opt = []option.ClientOption{option.WithCredentialsFile(m.config.CredentialsFilePath)}
m.config.period = &duration.Duration{
Seconds: int64(m.Module().Config().Period.Seconds()),
Expand Down Expand Up @@ -125,31 +126,38 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) (err error) {
responses, err := m.requester.Metrics(ctx, m.stackDriverConfig, m.metricsMeta)
if err != nil {
return errors.Wrapf(err, "error trying to get metrics for project '%s' and zone '%s' or region '%s'", m.config.ProjectID, m.config.Zone, m.config.Region)
}
for _, sdc := range m.stackDriverConfig {
m.Logger().Debugf("stackdriver config: %v", sdc)
responses, err := m.requester.Metrics(ctx, sdc, m.metricsMeta)
if err != nil {
err = errors.Wrapf(err, "error trying to get metrics for project '%s' and zone '%s' or region '%s'", m.config.ProjectID, m.config.Zone, m.config.Region)
m.Logger().Error(err)
return err
}

events, err := m.eventMapping(ctx, responses)
if err != nil {
return err
}
events, err := m.eventMapping(ctx, responses, sdc.ServiceName)
if err != nil {
err = errors.Wrap(err, "eventMapping failed")
m.Logger().Error(err)
return err
}

for _, event := range events {
reporter.Event(event)
m.Logger().Debugf("Total %d of events are created for service name = %s and metric type = %s.", len(events), sdc.ServiceName, sdc.MetricTypes)
for _, event := range events {
reporter.Event(event)
}
}

return nil
}

func (m *MetricSet) eventMapping(ctx context.Context, tss []timeSeriesWithAligner) ([]mb.Event, error) {
func (m *MetricSet) eventMapping(ctx context.Context, tss []timeSeriesWithAligner, serviceName string) ([]mb.Event, error) {
e := newIncomingFieldExtractor(m.Logger())

var gcpService = googlecloud.NewStackdriverMetadataServiceForTimeSeries(nil)
var err error

if !m.config.ExcludeLabels {
if gcpService, err = NewMetadataServiceForConfig(m.config); err != nil {
if gcpService, err = NewMetadataServiceForConfig(m.config, serviceName); err != nil {
return nil, errors.Wrap(err, "error trying to create metadata service")
}
}
Expand Down Expand Up @@ -214,13 +222,15 @@ func (m *MetricSet) metricDescriptor(ctx context.Context, client *monitoring.Met
for _, mt := range sdc.MetricTypes {
req := &monitoringpb.ListMetricDescriptorsRequest{
Name: "projects/" + m.config.ProjectID,
Filter: fmt.Sprintf(`metric.type = "%s"`, mt),
Filter: fmt.Sprintf(`metric.type = "%s"`, sdc.ServiceName+".googleapis.com/"+mt),
}

it := client.ListMetricDescriptors(ctx, req)
out, err := it.Next()
if err != nil {
return metricsWithMeta, errors.Errorf("Could not make ListMetricDescriptors request: %s: %v", mt, err)
err = errors.Errorf("Could not make ListMetricDescriptors request: %s: %v", mt, err)
m.Logger().Error(err)
return metricsWithMeta, err
}

// Set samplePeriod default to 60 seconds and ingestDelay default to 0.
Expand Down
17 changes: 8 additions & 9 deletions x-pack/metricbeat/modules.d/googlecloud.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@
credentials_file_path: "your JSON credentials file path"
exclude_labels: false
period: 1m
stackdriver:
service: compute
metrics:
- aligner: ALIGN_NONE
metric_types:
- "compute.googleapis.com/instance/cpu/reserved_cores"
- "compute.googleapis.com/instance/cpu/usage_time"
- "compute.googleapis.com/instance/cpu/utilization"
- "compute.googleapis.com/instance/uptime"
metrics:
- aligner: ALIGN_NONE
service: compute
metric_types:
- "instance/cpu/reserved_cores"
- "instance/cpu/usage_time"
- "instance/cpu/utilization"
- "instance/uptime"

0 comments on commit 4d6cd60

Please sign in to comment.