Skip to content

Commit

Permalink
fix timestamps from prometheus instruments
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jul 25, 2023
1 parent ab50b0f commit c528092
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions bridge/prometheus/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewMetricProducer(opts ...Option) metric.Producer {
}

func (p *producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) {
now := time.Now()
var errs multierr
otelMetrics := make([]metricdata.Metrics, 0)
for _, gatherer := range p.gatherers {
Expand All @@ -62,7 +63,7 @@ func (p *producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) {
errs = append(errs, err)
continue
}
m, err := convertPrometheusMetricsInto(promMetrics)
m, err := convertPrometheusMetricsInto(promMetrics, now)
otelMetrics = append(otelMetrics, m...)
if err != nil {
errs = append(errs, err)
Expand All @@ -82,7 +83,7 @@ func (p *producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) {
}}, nil
}

func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily) ([]metricdata.Metrics, error) {
func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily, now time.Time) ([]metricdata.Metrics, error) {
var errs multierr
otelMetrics := make([]metricdata.Metrics, 0)
for _, pm := range promMetrics {
Expand All @@ -92,11 +93,11 @@ func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily) ([]metricdata
}
switch pm.GetType() {
case dto.MetricType_GAUGE:
newMetric.Data = convertGauge(pm.GetMetric())
newMetric.Data = convertGauge(pm.GetMetric(), now)
case dto.MetricType_COUNTER:
newMetric.Data = convertCounter(pm.GetMetric())
newMetric.Data = convertCounter(pm.GetMetric(), now)
case dto.MetricType_HISTOGRAM:
newMetric.Data = convertHistogram(pm.GetMetric())
newMetric.Data = convertHistogram(pm.GetMetric(), now)
default:
// MetricType_GAUGE_HISTOGRAM, MetricType_SUMMARY, MetricType_UNTYPED
errs = append(errs, fmt.Errorf("%w: %v for metric %v", errUnsupportedType, pm.GetType(), pm.GetName()))
Expand All @@ -107,55 +108,67 @@ func convertPrometheusMetricsInto(promMetrics []*dto.MetricFamily) ([]metricdata
return otelMetrics, errs.errOrNil()
}

func convertGauge(metrics []*dto.Metric) metricdata.Gauge[float64] {
func convertGauge(metrics []*dto.Metric, now time.Time) metricdata.Gauge[float64] {
otelGauge := metricdata.Gauge[float64]{
DataPoints: make([]metricdata.DataPoint[float64], len(metrics)),
}
for i, m := range metrics {
otelGauge.DataPoints[i] = metricdata.DataPoint[float64]{
dp := metricdata.DataPoint[float64]{
Attributes: convertLabels(m.GetLabel()),
Time: time.UnixMilli(m.GetTimestampMs()),
Time: now,
Value: m.GetGauge().GetValue(),
}
if m.GetTimestampMs() != 0 {
dp.Time = time.UnixMilli(m.GetTimestampMs())
}
otelGauge.DataPoints[i] = dp
}
return otelGauge
}

func convertCounter(metrics []*dto.Metric) metricdata.Sum[float64] {
func convertCounter(metrics []*dto.Metric, now time.Time) metricdata.Sum[float64] {
otelCounter := metricdata.Sum[float64]{
DataPoints: make([]metricdata.DataPoint[float64], len(metrics)),
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
}
for i, m := range metrics {
otelCounter.DataPoints[i] = metricdata.DataPoint[float64]{
dp := metricdata.DataPoint[float64]{
Attributes: convertLabels(m.GetLabel()),
StartTime: m.GetCounter().GetCreatedTimestamp().AsTime(),
Time: time.UnixMilli(m.GetTimestampMs()),
Time: now,
Value: m.GetCounter().GetValue(),
Exemplars: []metricdata.Exemplar[float64]{convertExemplar(m.GetCounter().GetExemplar())},
}
if m.GetTimestampMs() != 0 {
dp.Time = time.UnixMilli(m.GetTimestampMs())
}
otelCounter.DataPoints[i] = dp
}
return otelCounter
}

func convertHistogram(metrics []*dto.Metric) metricdata.Histogram[float64] {
func convertHistogram(metrics []*dto.Metric, now time.Time) metricdata.Histogram[float64] {
otelHistogram := metricdata.Histogram[float64]{
DataPoints: make([]metricdata.HistogramDataPoint[float64], len(metrics)),
Temporality: metricdata.CumulativeTemporality,
}
for i, m := range metrics {
bounds, bucketCounts, exemplars := convertBuckets(m.GetHistogram().GetBucket())
otelHistogram.DataPoints[i] = metricdata.HistogramDataPoint[float64]{
dp := metricdata.HistogramDataPoint[float64]{
Attributes: convertLabels(m.GetLabel()),
StartTime: m.GetHistogram().GetCreatedTimestamp().AsTime(),
Time: time.UnixMilli(m.GetTimestampMs()),
Time: now,
Count: m.GetHistogram().GetSampleCount(),
Sum: m.GetHistogram().GetSampleSum(),
Bounds: bounds,
BucketCounts: bucketCounts,
Exemplars: exemplars,
}
if m.GetTimestampMs() != 0 {
dp.Time = time.UnixMilli(m.GetTimestampMs())
}
otelHistogram.DataPoints[i] = dp
}
return otelHistogram
}
Expand Down

0 comments on commit c528092

Please sign in to comment.