Skip to content

Commit

Permalink
[exporter/loadbalancer] Add a new routing key: streamID (#34086)
Browse files Browse the repository at this point in the history
**Description:** This adds a new routing option for metrics: streamID.
This routes datapoints based on their streamID. That's the unique hash
of all it's attributes, plus the attributes and identifying information
of its resource, scope, and metric data

**Link to tracking Issue:**
#32513

**Testing:** I added to the existing testing suites, testing the new
routing, as well as adding to the benchmark suite

**Documentation:** I updated the README to describe the new routingKey:
`metricID`, and how it works
  • Loading branch information
RichieSams committed Jul 16, 2024
1 parent 438df69 commit 12d071d
Show file tree
Hide file tree
Showing 13 changed files with 1,154 additions and 18 deletions.
27 changes: 27 additions & 0 deletions .chloggen/loadbalacingexporter-update-metric-routing-options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: loadbalancingexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds a new streamID routingKey, which will route based on the datapoint ID. See updated README for details"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32513]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
43 changes: 25 additions & 18 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

This is an exporter that will consistently export spans, metrics and logs depending on the `routing_key` configured.

The options for `routing_key` are: `service`, `traceID`, `metric` (metric name), `resource`.
The options for `routing_key` are: `service`, `traceID`, `metric` (metric name), `resource`, `streamID`.

| routing_key | can be used for |
| ------------- |-----------|
| service | logs, spans, metrics |
| traceID | logs, spans |
| resource | metrics |
| metric | metrics |
| routing_key | can be used for |
| ----------- | -------------------- |
| service | logs, spans, metrics |
| traceID | logs, spans |
| resource | metrics |
| metric | metrics |
| streamID | metrics |

If no `routing_key` is configured, the default routing mechanism is `traceID` for traces, while `service` is the default for metrics. This means that spans belonging to the same `traceID` (or `service.name`, when `service` is used as the `routing_key`) will be sent to the same backend.

Expand All @@ -33,7 +34,7 @@ Note that either the Trace ID or Service name is used for the decision on which

This load balancer is especially useful for backends configured with tail-based samplers or red-metrics-collectors, which make a decision based on the view of the full trace.

When a list of backends is updated, some of the signals will be rerouted to different backends.
When a list of backends is updated, some of the signals will be rerouted to different backends.
Around R/N of the "routes" will be rerouted differently, where:

* A "route" is either a trace ID or a service name mapped to a certain backend.
Expand All @@ -45,10 +46,11 @@ This should be stable enough for most cases, and the larger the number of backen
This also supports service name based exporting for traces. If you have two or more collectors that collect traces and then use spanmetrics connector to generate metrics and push to prometheus, there is a high chance of facing label collisions on prometheus if the routing is based on `traceID` because every collector sees the `service+operation` label. With service name based routing, each collector can only see one service name and can push metrics without any label collisions.

## Resilience and scaling considerations

The `loadbalancingexporter` will, irrespective of the chosen resolver (`static`, `dns`, `k8s`), create one exporter per endpoint. The exporter conforms to its published configuration regarding sending queue and retry mechanisms. Importantly, the `loadbalancingexporter` will not attempt to re-route data to a healthy endpoint on delivery failure, and data loss is therefore possible if the exporter's target remains unavailable once redelivery is exhausted. Due consideration needs to be given to the exporter queue and retry configuration when running in a highly elastic environment.

- When using the `static` resolver and a target is unavailable, all the target's load-balanced telemetry will fail to be delivered until either the target is restored or removed from the static list. The same principle applies to the `dns` resolver.
- When using `k8s`, `dns`, and likely future resolvers, topology changes are eventually reflected in the `loadbalancingexporter`. The `k8s` resolver will update more quickly than `dns`, but a window of time in which the true topology doesn't match the view of the `loadbalancingexporter` remains.
* When using the `static` resolver and a target is unavailable, all the target's load-balanced telemetry will fail to be delivered until either the target is restored or removed from the static list. The same principle applies to the `dns` resolver.
* When using `k8s`, `dns`, and likely future resolvers, topology changes are eventually reflected in the `loadbalancingexporter`. The `k8s` resolver will update more quickly than `dns`, but a window of time in which the true topology doesn't match the view of the `loadbalancingexporter` remains.

## Configuration

Expand All @@ -72,22 +74,24 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `interval` resolver interval in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `30s` will be used.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `5s` will be used.
* `port` port to be used for exporting the traces to the addresses resolved from `service`. By default, the port is set in Cloud Map, but can be be overridden with a static value in this config
* `health_status` filter in AWS Cloud Map, you can specify the health status of the instances that you want to discover. The health_status filter is optional and allows you to query based on the health status of the instances.
* Available values are
* `health_status` filter in AWS Cloud Map, you can specify the health status of the instances that you want to discover. The health_status filter is optional and allows you to query based on the health status of the instances.
* Available values are
* `HEALTHY`: Only return instances that are healthy.
* `UNHEALTHY`: Only return instances that are unhealthy.
* `ALL`: Return all instances, regardless of their health status.
* `HEALTHY_OR_ELSE_ALL`: Returns healthy instances, unless none are reporting a healthy state. In that case, return all instances. This is also called failing open.
* Resolver's default filter is set to `HEALTHY` when none is explicitly defined
* **Notes:**
* This resolver currently returns a maximum of 100 hosts.
* **Notes:**
* This resolver currently returns a maximum of 100 hosts.
* `TODO`: Feature request [29771](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29771) aims to cover the pagination for this scenario
* The `routing_key` property is used to route spans to exporters based on different parameters. This functionality is currently enabled only for `trace` pipeline types. It supports one of the following values:
* `service`: exports spans based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `traceID` (default): exports spans based on their `traceID`.
* If not configured, defaults to `traceID` based routing.
* The `routing_key` property is used to specify how to route values (spans or metrics) to exporters based on different parameters. This functionality is currently enabled only for `trace` and `metric` pipeline types. It supports one of the following values:
* `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `traceID`: Routes spans based on their `traceID`. Invalid for metrics.
* `metric`: Routes metrics based on their metric name. Invalid for spans.
* `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data

Simple example

```yaml
receivers:
otlp:
Expand Down Expand Up @@ -133,6 +137,7 @@ service:
```

Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))

```yaml
receivers:
otlp:
Expand Down Expand Up @@ -175,6 +180,7 @@ service:
```

AWS CloudMap resolver example

```yaml
receivers:
otlp:
Expand Down Expand Up @@ -214,6 +220,7 @@ service:
```

For testing purposes, the following configuration can be used, where both the load balancer and all backends are running locally:

```yaml
receivers:
otlp/loadbalancer:
Expand Down
2 changes: 2 additions & 0 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ const (
svcRouting
metricNameRouting
resourceRouting
streamIDRouting
)

const (
svcRoutingStr = "service"
traceIDRoutingStr = "traceID"
metricNameRoutingStr = "metric"
resourceRoutingStr = "resource"
streamIDRoutingStr = "streamID"
)

// Config defines configuration for the exporter.
Expand Down
132 changes: 132 additions & 0 deletions exporter/loadbalancingexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func newMetricsExporter(params exporter.Settings, cfg component.Config) (*metric
metricExporter.routingKey = resourceRouting
case metricNameRoutingStr:
metricExporter.routingKey = metricNameRouting
case streamIDRoutingStr:
metricExporter.routingKey = streamIDRouting
default:
return nil, fmt.Errorf("unsupported routing_key: %q", cfg.(*Config).RoutingKey)
}
Expand Down Expand Up @@ -101,6 +103,8 @@ func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metri
batches = splitMetricsByResourceID(md)
case metricNameRouting:
batches = splitMetricsByMetricName(md)
case streamIDRouting:
batches = splitMetricsByStreamID(md)
}

// Now assign each batch to an exporter, and merge as we go
Expand Down Expand Up @@ -221,6 +225,134 @@ func splitMetricsByMetricName(md pmetric.Metrics) map[string]pmetric.Metrics {
return results
}

func splitMetricsByStreamID(md pmetric.Metrics) map[string]pmetric.Metrics {
results := map[string]pmetric.Metrics{}

for i := 0; i < md.ResourceMetrics().Len(); i++ {
rm := md.ResourceMetrics().At(i)
res := rm.Resource()

for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
scope := sm.Scope()

for k := 0; k < sm.Metrics().Len(); k++ {
m := sm.Metrics().At(k)
metricID := identity.OfResourceMetric(res, scope, m)

switch m.Type() {
case pmetric.MetricTypeGauge:
gauge := m.Gauge()

for l := 0; l < gauge.DataPoints().Len(); l++ {
dp := gauge.DataPoints().At(l)

newMD, mClone := cloneMetricWithoutType(rm, sm, m)
gaugeClone := mClone.SetEmptyGauge()

dpClone := gaugeClone.DataPoints().AppendEmpty()
dp.CopyTo(dpClone)

key := identity.OfStream(metricID, dp).String()
existing, ok := results[key]
if ok {
metrics.Merge(existing, newMD)
} else {
results[key] = newMD
}
}
case pmetric.MetricTypeSum:
sum := m.Sum()

for l := 0; l < sum.DataPoints().Len(); l++ {
dp := sum.DataPoints().At(l)

newMD, mClone := cloneMetricWithoutType(rm, sm, m)
sumClone := mClone.SetEmptySum()
sumClone.SetIsMonotonic(sum.IsMonotonic())
sumClone.SetAggregationTemporality(sum.AggregationTemporality())

dpClone := sumClone.DataPoints().AppendEmpty()
dp.CopyTo(dpClone)

key := identity.OfStream(metricID, dp).String()
existing, ok := results[key]
if ok {
metrics.Merge(existing, newMD)
} else {
results[key] = newMD
}
}
case pmetric.MetricTypeHistogram:
histogram := m.Histogram()

for l := 0; l < histogram.DataPoints().Len(); l++ {
dp := histogram.DataPoints().At(l)

newMD, mClone := cloneMetricWithoutType(rm, sm, m)
histogramClone := mClone.SetEmptyHistogram()
histogramClone.SetAggregationTemporality(histogram.AggregationTemporality())

dpClone := histogramClone.DataPoints().AppendEmpty()
dp.CopyTo(dpClone)

key := identity.OfStream(metricID, dp).String()
existing, ok := results[key]
if ok {
metrics.Merge(existing, newMD)
} else {
results[key] = newMD
}
}
case pmetric.MetricTypeExponentialHistogram:
expHistogram := m.ExponentialHistogram()

for l := 0; l < expHistogram.DataPoints().Len(); l++ {
dp := expHistogram.DataPoints().At(l)

newMD, mClone := cloneMetricWithoutType(rm, sm, m)
expHistogramClone := mClone.SetEmptyExponentialHistogram()
expHistogramClone.SetAggregationTemporality(expHistogram.AggregationTemporality())

dpClone := expHistogramClone.DataPoints().AppendEmpty()
dp.CopyTo(dpClone)

key := identity.OfStream(metricID, dp).String()
existing, ok := results[key]
if ok {
metrics.Merge(existing, newMD)
} else {
results[key] = newMD
}
}
case pmetric.MetricTypeSummary:
summary := m.Summary()

for l := 0; l < summary.DataPoints().Len(); l++ {
dp := summary.DataPoints().At(l)

newMD, mClone := cloneMetricWithoutType(rm, sm, m)
sumClone := mClone.SetEmptySummary()

dpClone := sumClone.DataPoints().AppendEmpty()
dp.CopyTo(dpClone)

key := identity.OfStream(metricID, dp).String()
existing, ok := results[key]
if ok {
metrics.Merge(existing, newMD)
} else {
results[key] = newMD
}
}
}
}
}
}

return results
}

func cloneMetricWithoutType(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (md pmetric.Metrics, mClone pmetric.Metric) {
md = pmetric.NewMetrics()

Expand Down
19 changes: 19 additions & 0 deletions exporter/loadbalancingexporter/metrics_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ func TestSplitMetrics(t *testing.T) {
name: "duplicate_metric_name",
splitFunc: splitMetricsByMetricName,
},
{
name: "basic_stream_id",
splitFunc: splitMetricsByStreamID,
},
{
name: "duplicate_stream_id",
splitFunc: splitMetricsByStreamID,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -322,6 +330,10 @@ func TestConsumeMetrics_SingleEndpoint(t *testing.T) {
name: "metric_name",
routingKey: metricNameRoutingStr,
},
{
name: "stream_id",
routingKey: streamIDRoutingStr,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -427,6 +439,10 @@ func TestConsumeMetrics_TripleEndpoint(t *testing.T) {
name: "metric_name",
routingKey: metricNameRoutingStr,
},
{
name: "stream_id",
routingKey: streamIDRoutingStr,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -969,6 +985,9 @@ func BenchmarkConsumeMetrics(b *testing.B) {
{
routingKey: metricNameRoutingStr,
},
{
routingKey: streamIDRoutingStr,
},
}

for _, tc := range testCases {
Expand Down
Loading

0 comments on commit 12d071d

Please sign in to comment.