Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pdata for prometheus monitor #5545

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ import (
"strconv"

dto "github.com/prometheus/client_model/go"
"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/sfxclient"
"github.com/signalfx/signalfx-agent/pkg/utils"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type extractor func(m *dto.Metric) float64
type dpFactory func(string, map[string]string, float64) *datapoint.Datapoint

func gaugeExtractor(m *dto.Metric) float64 {
return m.GetGauge().GetValue()
Expand All @@ -24,18 +22,18 @@ func counterExtractor(m *dto.Metric) float64 {
return m.GetCounter().GetValue()
}

func convertMetricFamily(mf *dto.MetricFamily) []*datapoint.Datapoint {
func convertMetricFamily(mf *dto.MetricFamily) []pmetric.Metric {
//nolint:protogetter
if mf.Type == nil || mf.Name == nil {
return nil
}
switch *mf.Type { //nolint:protogetter
case dto.MetricType_GAUGE:
return makeSimpleDatapoints(mf.GetName(), mf.GetMetric(), sfxclient.GaugeF, gaugeExtractor)
return makeSimpleDatapoints(mf.GetName(), mf.GetMetric(), gaugeExtractor)
case dto.MetricType_COUNTER:
return makeSimpleDatapoints(mf.GetName(), mf.GetMetric(), sfxclient.CumulativeF, counterExtractor)
return makeSimpleCumulativeSum(mf.GetName(), mf.GetMetric(), counterExtractor)
case dto.MetricType_UNTYPED:
return makeSimpleDatapoints(mf.GetName(), mf.GetMetric(), sfxclient.GaugeF, untypedExtractor)
return makeSimpleDatapoints(mf.GetName(), mf.GetMetric(), untypedExtractor)
case dto.MetricType_SUMMARY:
return makeSummaryDatapoints(mf.GetName(), mf.GetMetric())
// TODO: figure out how to best convert histograms, in particular the
Expand All @@ -47,16 +45,38 @@ func convertMetricFamily(mf *dto.MetricFamily) []*datapoint.Datapoint {
}
}

func makeSimpleDatapoints(name string, ms []*dto.Metric, dpf dpFactory, e extractor) []*datapoint.Datapoint {
dps := make([]*datapoint.Datapoint, len(ms))
func makeSimpleCumulativeSum(name string, ms []*dto.Metric, e extractor) []pmetric.Metric {
dps := make([]pmetric.Metric, len(ms))
for i, m := range ms {
dps[i] = dpf(name, labelsToDims(m.GetLabel()), e(m))
metric := pmetric.NewMetric()
metric.SetName(name)
s := metric.SetEmptySum()
s.SetIsMonotonic(true)
s.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := s.DataPoints().AppendEmpty()
dp.SetDoubleValue(e(m))
_ = dp.Attributes().FromRaw(labelsToDims(m.GetLabel()))
dps[i] = metric
}
return dps
}

func makeSummaryDatapoints(name string, ms []*dto.Metric) []*datapoint.Datapoint {
var dps []*datapoint.Datapoint
func makeSimpleDatapoints(name string, ms []*dto.Metric, e extractor) []pmetric.Metric {
dps := make([]pmetric.Metric, len(ms))
for i, m := range ms {
metric := pmetric.NewMetric()
metric.SetName(name)
g := metric.SetEmptyGauge()
dp := g.DataPoints().AppendEmpty()
dp.SetDoubleValue(e(m))
_ = dp.Attributes().FromRaw(labelsToDims(m.GetLabel()))
dps[i] = metric
}
return dps
}

func makeSummaryDatapoints(name string, ms []*dto.Metric) []pmetric.Metric {
var dps []pmetric.Metric
for _, m := range ms {
dims := labelsToDims(m.GetLabel())
s := m.GetSummary()
Expand All @@ -66,27 +86,49 @@ func makeSummaryDatapoints(name string, ms []*dto.Metric) []*datapoint.Datapoint

//nolint:protogetter
if s.SampleCount != nil {
dps = append(dps, sfxclient.Cumulative(name+"_count", dims, int64(s.GetSampleCount()))) //nolint:gosec
metric := pmetric.NewMetric()
metric.SetName(name + "_count")
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(s.GetSampleCount())) //nolint:gosec // disable G115
_ = dp.Attributes().FromRaw(dims)
dps = append(dps, metric)
}

//nolint:protogetter
if s.SampleSum != nil {
dps = append(dps, sfxclient.CumulativeF(name, dims, s.GetSampleSum()))
metric := pmetric.NewMetric()
metric.SetName(name)
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := sum.DataPoints().AppendEmpty()
dp.SetDoubleValue(s.GetSampleSum()) //nolint:gosec // disable G115
_ = dp.Attributes().FromRaw(dims)
dps = append(dps, metric)
}

qs := s.GetQuantile()
for i := range qs {
quantileDims := utils.MergeStringMaps(dims, map[string]string{
quantileDims := utils.MergeMaps(dims, map[string]any{
"quantile": strconv.FormatFloat(qs[i].GetQuantile(), 'f', 6, 64),
})
dps = append(dps, sfxclient.GaugeF(name+"_quantile", quantileDims, qs[i].GetValue()))
metric := pmetric.NewMetric()
metric.SetName(name + "_quantile")
g := metric.SetEmptyGauge()
dp := g.DataPoints().AppendEmpty()
dp.SetDoubleValue(qs[i].GetValue())
_ = dp.Attributes().FromRaw(quantileDims)
dps = append(dps, metric)
}
}
return dps
}

func makeHistogramDatapoints(name string, ms []*dto.Metric) []*datapoint.Datapoint {
var dps []*datapoint.Datapoint
func makeHistogramDatapoints(name string, ms []*dto.Metric) []pmetric.Metric {
var dps []pmetric.Metric
for _, m := range ms {
dims := labelsToDims(m.GetLabel())
h := m.GetHistogram()
Expand All @@ -96,27 +138,51 @@ func makeHistogramDatapoints(name string, ms []*dto.Metric) []*datapoint.Datapoi

//nolint:protogetter
if h.SampleCount != nil {
dps = append(dps, sfxclient.Cumulative(name+"_count", dims, int64(h.GetSampleCount()))) //nolint:gosec
metric := pmetric.NewMetric()
metric.SetName(name + "_count")
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(h.GetSampleCount())) //nolint:gosec // disable G115
_ = dp.Attributes().FromRaw(dims)
dps = append(dps, metric)
Comment on lines +141 to +149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we are creating one metric with one data point for each Prometheus time series. Is that correct? I don't see any combining logic down the road...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the existing sfxDatapointsToPDataMetrics implementation does pretty much the same though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a simple follow through to make sure we get the same behavior for now.

}

//nolint:protogetter
if h.SampleSum != nil {
dps = append(dps, sfxclient.CumulativeF(name, dims, h.GetSampleSum()))
metric := pmetric.NewMetric()
metric.SetName(name)
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := sum.DataPoints().AppendEmpty()
dp.SetDoubleValue(h.GetSampleSum()) //nolint:gosec // disable G115
_ = dp.Attributes().FromRaw(dims)
dps = append(dps, metric)
}

buckets := h.GetBucket()
for i := range buckets {
bucketDims := utils.MergeStringMaps(dims, map[string]string{
bucketDims := utils.MergeMaps(dims, map[string]any{
"upper_bound": strconv.FormatFloat(buckets[i].GetUpperBound(), 'f', 6, 64),
})
dps = append(dps, sfxclient.Cumulative(name+"_bucket", bucketDims, int64(buckets[i].GetCumulativeCount()))) //nolint:gosec
metric := pmetric.NewMetric()
metric.SetName(name + "_bucket")
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
dp := sum.DataPoints().AppendEmpty()
dp.SetIntValue(int64(buckets[i].GetCumulativeCount())) //nolint:gosec // disable G115
_ = dp.Attributes().FromRaw(bucketDims)
dps = append(dps, metric)
}
}
return dps
}

func labelsToDims(labels []*dto.LabelPair) map[string]string {
dims := map[string]string{}
func labelsToDims(labels []*dto.LabelPair) map[string]any {
dims := map[string]any{}
for i := range labels {
dims[labels[i].GetName()] = labels[i].GetValue()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/signalfx/golib/v3/datapoint"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/collector/pdata/pmetric"
"k8s.io/client-go/rest"

"github.com/signalfx/signalfx-agent/pkg/core/common/auth"
Expand Down Expand Up @@ -155,19 +155,19 @@ func (m *Monitor) Configure(conf *Config) error {
return
}

m.Output.SendDatapoints(dps...)
m.Output.SendMetrics(dps...)
}, time.Duration(conf.IntervalSeconds)*time.Second)

return nil
}

func fetchPrometheusMetrics(fetch fetcher) ([]*datapoint.Datapoint, error) {
func fetchPrometheusMetrics(fetch fetcher) ([]pmetric.Metric, error) {
metricFamilies, err := doFetch(fetch)
if err != nil {
return nil, err
}

var dps []*datapoint.Datapoint
var dps []pmetric.Metric
for i := range metricFamilies {
dps = append(dps, convertMetricFamily(metricFamilies[i])...)
}
Expand Down
30 changes: 9 additions & 21 deletions pkg/receiver/smartagentreceiver/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,34 +325,22 @@ func (out *output) filterMetrics(metrics []pmetric.Metric) []pmetric.Metric {
filteredMetrics := make([]pmetric.Metric, 0, len(metrics))
for _, m := range metrics {
atLeastOneDataPoint := false
newM := pmetric.NewMetric()
newM.SetName(m.Name())
newM.SetDescription(m.Description())
newM.SetUnit(m.Unit())
switch m.Type() {
case pmetric.MetricTypeGauge:
newM.SetEmptyGauge()
for i := 0; i < m.Gauge().DataPoints().Len(); i++ {
dp := m.Gauge().DataPoints().At(i)
if !out.monitorFiltering.filterSet.MatchesMetricDataPoint(m.Name(), dp.Attributes()) {
atLeastOneDataPoint = true
dp.CopyTo(newM.Gauge().DataPoints().AppendEmpty())
}
}
m.Gauge().DataPoints().RemoveIf(func(point pmetric.NumberDataPoint) bool {
return out.monitorFiltering.filterSet.MatchesMetricDataPoint(m.Name(), point.Attributes())
})
atLeastOneDataPoint = m.Gauge().DataPoints().Len() > 0
case pmetric.MetricTypeSum:
newM.SetEmptySum()
for i := 0; i < m.Sum().DataPoints().Len(); i++ {
dp := m.Sum().DataPoints().At(i)
if !out.monitorFiltering.filterSet.MatchesMetricDataPoint(m.Name(), dp.Attributes()) {
atLeastOneDataPoint = true
dp.CopyTo(newM.Sum().DataPoints().AppendEmpty())
}
}
m.Sum().DataPoints().RemoveIf(func(point pmetric.NumberDataPoint) bool {
return out.monitorFiltering.filterSet.MatchesMetricDataPoint(m.Name(), point.Attributes())
})
atLeastOneDataPoint = m.Sum().DataPoints().Len() > 0
default:
panic("unsupported metric type")
}
if atLeastOneDataPoint {
filteredMetrics = append(filteredMetrics, newM)
filteredMetrics = append(filteredMetrics, m)
}
}
return filteredMetrics
Expand Down
Loading