-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add design doc for processing query language. #4444
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,289 @@ | ||
# OpenTelemetry Collector Processor Exploration | ||
|
||
## Objective | ||
|
||
To describe a user experience and strategies for configuring processors in the OpenTelemetry collector. | ||
|
||
This work is being prototyped in opentelemetry-collector-contrib, the design doc is here for broader discussion. | ||
|
||
## Summary | ||
|
||
The OpenTelemetry (OTel) collector is a tool to set up pipelines to receive telemetry from an application and export it | ||
to an observability backend. Part of the pipeline can include processing stages, which executes various business logic | ||
on incoming telemetry before it is exported. | ||
|
||
Over time, the collector has added various processors to satisfy different use cases, generally in an ad-hoc way to | ||
support each feature independently. We can improve the experience for users of the collector by consolidating processing | ||
patterns in terms of user experience, and this can be supported by defining a querying model for processors | ||
within the collector core, and likely also for use in SDKs, to simplify implementation and promote the consistent user | ||
experience and best practices. | ||
|
||
## Goals and non-goals | ||
|
||
Goals: | ||
- List out use cases for processing within the collector | ||
- Consider what could be an ideal configuration experience for users | ||
|
||
Non-Goals: | ||
- Merge every processor into one. Many use cases overlap and generalize, but not all of them | ||
- Technical design or implementation of configuration experience. Currently focused on user experience. | ||
|
||
## Use cases for processing | ||
|
||
### Telemetry mutation | ||
|
||
Processors can be used to mutate the telemetry in the collector pipeline. OpenTelemetry SDKs collect detailed telemetry | ||
from applications, and it is common to have to mutate this into a way that is appropriate for an individual use case. | ||
|
||
Some types of mutation include | ||
|
||
- Remove a forbidden attribute such as `http.request.header.authorization` | ||
- Reduce cardinality of an attribute such as translating `http.target` value of `/user/123451/profile` to `/user/{userId}/profile` | ||
- Decrease the size of the telemetry payload by removing large resource attributes such as `process.command_line` | ||
- Filtering out signals such as by removing all telemetry with a `http.target` of `/health` | ||
- Attach information from resource into telemetry, for example adding certain resource fields as metric dimensions | ||
|
||
The processors implementing this use case are `attributesprocessor`, `filterprocessor`, `metricstransformprocessor`, | ||
`resourceprocessor`, `spanprocessor`. | ||
|
||
### Metric generation | ||
|
||
The collector may generate new metrics based on incoming telemetry. This can be for covering gaps in SDK coverage of | ||
metrics vs spans, or to create new metrics based on existing ones to model the data better for backend-specific | ||
expectations. | ||
|
||
- Create new metrics based on information in spans, for example to create a duration metric that is not implemented in the SDK yet | ||
- Apply arithmetic between multiple incoming metrics to produce an output one, for example divide an `amount` and a `capacity` to create a `utilization` metric | ||
|
||
The processors implementing this use case are `metricsgenerationprocessor`, `spanmetricsprocessor`. | ||
|
||
### Grouping | ||
|
||
Some processors are stateful, grouping telemetry over a window of time based on either a trace ID or an attribute value, | ||
or just general batching. | ||
|
||
- Batch incoming telemetry before sending to exporters to reduce export requests | ||
- Group spans by trace ID to allow doing tail sampling | ||
- Group telemetry for the same path | ||
|
||
The processors implementing this use case are `batchprocessor`, `groupbyattrprocessor`, `groupbytraceprocessor`. | ||
|
||
### Metric temporality | ||
|
||
Two processors convert between the two types of temporality, cumulative and delta. The conversion is generally expected | ||
to happen as close to the source data as possible, for example within receivers themselves. The same configuration | ||
mechanism could be used for selecting metrics for temporality conversion as other cases, but it is expected that in | ||
practice configuration will be limited. | ||
|
||
The processors implementing this use case are `cumulativetodeltaprocessor` and `deltatorateprocessor`. | ||
|
||
### Telemetry enrichment | ||
|
||
OpenTelemetry SDKs focus on collecting application specific data. They also may include resource detectors to populate | ||
environment specific data but the collector is commonly used to fill gaps in coverage of environment specific data. | ||
|
||
- Add environment about a cloud provider to `Resource` of all incoming telemetry | ||
|
||
The processors implementing this use case are `k8sattributesprocessor`, `resourcedetectionprocessor`. | ||
|
||
## Telemetry query language | ||
|
||
When looking at the use cases, there are certain common features for telemetry mutation and metric generation. | ||
|
||
- Identify the type of signal (span, metric, log, resource), or apply to all signals | ||
- Navigate to a path within the telemetry to operate on it | ||
- Define an operation, and possibly operation arguments | ||
|
||
We can try to model these into a query language, in particular allowing the first two points to be shared among all | ||
processing operations, and only have implementation of individual types of processing need to implement operators that | ||
the user can use within an expression. | ||
|
||
Telemetry is modeled in the collector as [`pdata`](https://github.com/open-telemetry/opentelemetry-collector/tree/main/model/pdata) | ||
which is roughly a 1:1 mapping of the [OTLP protocol](https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto). | ||
This data can be navigated using field expressions, which are fields within the protocol separated by dots. For example, | ||
the status message of a span is `status.message`. A map lookup can include the key as a string, for example `attributes["http.status_code"]`. | ||
|
||
Operations can be scoped to the type of a signal (`span`, `metric`, `log`), with all of the flattened points of that | ||
signal being part of a query space. Virtual fields are added to access data from a higher level before flattening, for | ||
`resource`, `library_info`. For metrics, the structure presented for processing is actual data points, e.g. `NumberDataPoint`, | ||
`HistogramDataPoint`, with the information from higher levels like `Metric` or the data type available as virtual fields. | ||
|
||
Navigation can then be used with a simple expression language for identifying telemetry to operate on. | ||
|
||
`... where name = "GET /cats"` | ||
`... from span where attributes["http.target"] = "/health"` | ||
`... where resource.attributes["deployment"] = "canary"` | ||
`... from metric where descriptor.metric_type = gauge` | ||
`... from metric where descriptor.metric_name = "http.active_requests"` | ||
bogdandrutu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Fields should always be fully specified - for example `attributes` refers to the `attributes` field in the telemetry, not | ||
the `resource`. In the future, we may allow shorthand for accessing scoped information that is not ambiguous. | ||
|
||
Having selected telemetry to operate on, any needed operations can be defined as functions. Known useful functions should | ||
be implemented within the collector itself, provide registration from extension modules to allow customization with | ||
contrib components, and in the future can even allow user plugins possibly through WASM, similar to work in | ||
[HTTP proxies](https://github.com/proxy-wasm/spec). The arguments to operations will primarily be field expressions, | ||
allowing the operation to mutate telemetry as needed. | ||
|
||
### Examples | ||
|
||
Remove a forbidden attribute such as `http.request.header.authorization` from all telemetry. | ||
|
||
`delete(attributes["http.request.header.authorization"])` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is nice, but a bit confusing. Does this remove the attribute from the "span.events" as well? What about the "resource.attributes"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, as on line 119 I defined all field accesses as fully specified, so it is not ambiguous I think. Let me know if I should phrase it differently |
||
|
||
Remove a forbidden attribute from spans only | ||
|
||
`delete(attributes["http.request.header.authorization"]) from span` | ||
|
||
Remove all attributes except for some | ||
|
||
`keep(attributes, "http.method", "http.status_code") from metric` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We call the "signals" as [traces/metrics/logs] may want to be consistent here and use plural at least, and probably use "traces" instead of "spans"? |
||
|
||
Reduce cardinality of an attribute | ||
|
||
`replace_wildcards("/user/*/list/*", "/user/{userId}/list/{listId}", attributes["http.target"])` | ||
|
||
Reduce cardinality of a span name | ||
|
||
`replace_wildcards("GET /user/*/list/*", "GET /user/{userId}/list/{listId}", name) from span` | ||
|
||
Decrease the size of the telemetry payload by removing large resource attributes | ||
|
||
`delete(resource.attributes["process.command_line"])` | ||
|
||
Filtering out signals such as by removing all telemetry with a `http.target` of `/health` | ||
|
||
`drop() where attributes["http.target"] = "/health"` | ||
|
||
Attach information from resource into telemetry, for example adding certain resource fields as metric attributes | ||
|
||
`set(attributes["k8s_pod"], resource.attributes["k8s.pod.name"]) from metric` | ||
|
||
Stateful processing can also be modeled by the language. The processor implementation would set up the state while | ||
parsing the configuration. | ||
|
||
Create duration_metric with two attributes copied from a span | ||
|
||
``` | ||
create_histogram("duration", end_time_nanos - start_time_nanos) from span | ||
keep(attributes, "http.method") from metric where descriptor.metric_name = "duration" | ||
``` | ||
|
||
Group spans by trace ID | ||
|
||
`group_by(trace_id, 2m) from span` | ||
|
||
Create utilization metric from base metrics. Because navigation expressions only operate on a single piece of telemetry, | ||
helper functions for reading values from other metrics need to be provided. | ||
|
||
`create_gauge("pod.cpu.utilized", read_gauge("pod.cpu.usage") / read_gauge("node.cpu.limit") from metric` | ||
|
||
A lot of processing. Queries are executed in order. While initially performance may degrade compared to more specialized | ||
processors, the expectation is that over time, the query processor's engine would improve to be able to apply optimizations | ||
across queries, compile into machine code, etc. | ||
|
||
```yaml | ||
receivers: | ||
otlp: | ||
|
||
exporters: | ||
otlp: | ||
|
||
processors: | ||
query: | ||
# Assuming group_by is defined in a contrib extension module, not baked into the "query" processor | ||
extensions: [group_by] | ||
expressions: | ||
- drop() where attributes["http.target"] = "/health" | ||
- delete(attributes["http.request.header.authorization"]) | ||
- replace_wildcards("/user/*/list/*", "/user/{userId}/list/{listId}", attributes["http.target"]) | ||
- set(attributes["k8s_pod"], resource.attributes["k8s.pod.name"]) from metric | ||
- group_by(trace_id, 2m) from span | ||
|
||
pipelines: | ||
- receivers: [otlp] | ||
exporters: [otlp] | ||
processors: [query] | ||
``` | ||
|
||
The expressions would be executed in order, with each expression either mutating an input telemetry, dropping input | ||
telemetry, or adding additional telemetry (usually for stateful processors like batch processor which will drop telemetry | ||
for a window and then add them all at the same time). One caveat to note is that we would like to implement optimizations | ||
in the query engine, for example to only apply filtering once for multiple operations with a shared filter. Functions | ||
with unknown side effects may cause issues with optimization we will need to explore. | ||
|
||
## Declarative configuration | ||
|
||
The telemetry query language presents an SQL-like experience for defining telemetry transformations - it is made up of | ||
the three primary components described above, however, and can be presented declaratively instead depending on what makes | ||
sense as a user experience. | ||
|
||
```yaml | ||
- type: span | ||
filter: | ||
match: | ||
path: status.code | ||
value: OK | ||
operation: | ||
name: drop | ||
- type: all | ||
operation: | ||
name: delete | ||
args: | ||
- attributes["http.request.header.authorization"] | ||
``` | ||
|
||
An implementation of the query language would likely parse expressions into this sort of structure so given an SQL-like | ||
implementation, it would likely be little overhead to support a YAML approach in addition. | ||
|
||
## Implementing a processor function | ||
|
||
The `replace_wildcards` function may look like this. | ||
|
||
```go | ||
|
||
package replacewildcards | ||
|
||
import "regexp" | ||
|
||
import "github.com/open-telemetry/opentelemetry/processors" | ||
|
||
// Assuming this is not in "core" | ||
processors.register("replace_wildcards", replace_wildcards) | ||
|
||
func replace_wildcards(pattern regexp.Regexp, replacement string, path processors.TelemetryPath) processors.Result { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does the string in the call get converted to a regexp.Regexp here? The example above shows: Related to this: do we adopt weak typing, when you can pass a string when regexp is expected? Weak typing may be more complicated to optimize (i.e. precompile regexp patterns once) so it may be worth thinking about this aspect of the language. Do we intend to make the compiler smart enough to figure out that the regexp compilation can be done ahead of time and not for every invocation for every span? That's a fairly big ask for a simpler compiler, but without that the execution can be quite slow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great callout - while writing the POC I realized the functions need to be factories of the actual logic function. (a bit of shorthand)
The UX for defining a function is a bit reduced but seems quite reasonable. And with this it will be possible to use reflection to convert types passed into the factory so instread of string, it could be defined to accept regex. The framework would only need to reflect, convert regex, and invoke factory, once during config parse time. I'll add a note in the doc about this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, is it the same smartness you're referring to? 😅 I think it achieves some balance by creating the factory / logic function split to make the optimization easier without making the compiler have to be too complex. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The factory approach requires that some arguments are compile time constants. We loose the ability to use a value computed at runtime as the regular expression, for example. It also requires us to declare which of the function parameters are compile-time parameters and which are runtime parameters. In this example the fist 2 arguments of replace_wildcards are compile-time constants. What if I want to replace a span attribute by a value of another attribute and not by a constant string? It doesn't seem to be possible. To be honest, I have reservations about the approach you suggest. I think it limits expressiveness of the language. Implementation-wise this approach is deviating from how language compilers and VMs are typically implemented and it may be difficult to fix it without major rewrites in the future. Alternative approach is to have Regexp type as a first class citizen in the language. So you can do this instead: We have another unclear moment with the 3rd parameter of replace_wildcards. It is not clear how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Could you describe an example of a regex computed at runtime? As this is a static config (well possibly dynamically updated via a remote), I figured that by nature the config elements, such as a regex, would be static.
Yeah - since it's not a string, it is parsed as a telemetry path. If the function's argument in that position does not accept a telemetry path, it would be a configuration error and cause a warning. It's true that currently only telemetry paths are runtime values - I think this hits all of the use cases for telemetry transform (the only runtime data is the telemetry itself) but let me know what other ones there are that may need something more. Currently the It would be relatively easy to expand support for special syntax, e.g. There doesn't seem to be a way to handle this generically, but the factory approach seems to work well for it. |
||
val := path.Get() | ||
if val == nil { | ||
return processors.CONTINUE | ||
} | ||
|
||
// replace finds placeholders in "replacement" and swaps them in for regex matched substrings. | ||
replaced := replace(val, pattern, replacement) | ||
path.Set(replaced) | ||
return processors.CONTINUE | ||
} | ||
``` | ||
|
||
Here, the processor framework recognizes the first parameter of the function is `regexp.Regexp` so will compile the string | ||
provided by the user in the config when processing it. Similarly for `path`, it recognizes properties of type `TelemetryPath` | ||
and will resolve it to the path within a matched telemetry during execution and pass it to the function. The path allows | ||
scalar operations on the field within the telemetry. The processor does not need to be aware of telemetry filtering, | ||
the `where ...` clause, as that will be handled by the framework before passing to the function. | ||
|
||
## Embedded processors | ||
|
||
The above describes a query language for configuring processing logic in the OpenTelemetry collector. There will be a | ||
single processor that exposes the processing logic into the collector config; however, the logic will be implemented | ||
within core packages rather than directly inside a processor. This is to ensure that where appropriate, processing | ||
can be embedded into other components, for example metric processing is often most appropriate to execute within a | ||
receiver based on receiver-specific requirements. | ||
|
||
## Limitations | ||
|
||
There are some known issues and limitations that we hope to address while iterating on this idea. | ||
|
||
- Handling array-typed attributes | ||
- Working on a array of points, rather than a single point | ||
- Metric alignment - for example defining an expression on two metrics, that may not be at the same timestamp | ||
- The collector has separate pipelines per signal - while the query language could apply cross-signal, we will need | ||
to remain single-signal for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deltatorateprocessor
is not temporality related :D the name is a bit wrong to include "delta", it is just a rate calculation value/time_interval, this is more meaningful for delta I agree, but the operation can be done on cumulative as well.