Skip to content

Commit

Permalink
Implement "split" metric action
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan committed Jul 7, 2022
1 parent 886822b commit 2566b44
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 105 deletions.
1 change: 0 additions & 1 deletion .idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions schema/ast/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type LabelMapForMetrics struct {
}

type SplitMetric struct {
ApplyToMetric types.MetricName `yaml:"apply_to_metric"`
ByLabel string `yaml:"by_label"`
LabelsToMetrics map[types.LabelValue]types.MetricName `yaml:"labels_to_metrics"`
ApplyToMetric types.MetricName `yaml:"apply_to_metric"`
ByAttribute string `yaml:"by_attribute"`
AttributesToMetrics map[types.LabelValue]types.MetricName `yaml:"attributes_to_metrics"`
}

type MergeMetric struct {
Expand Down
79 changes: 45 additions & 34 deletions schema/compiled/compiled_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
otlpmetric "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
otlpresource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
otlptrace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"

"github.com/tigrannajaryan/telemetry-schema/schema/types"
)

Expand Down Expand Up @@ -39,24 +38,25 @@ func (acts ResourceActions) Apply(resource *otlpresource.Resource) error {
}

type MetricActions struct {
ByName map[types.MetricName][]MetricAction
OtherMetrics []MetricAction
//ByName map[types.MetricName][]MetricAction
Actions []MetricAction
}

func (acts MetricActions) Apply(metric *otlpmetric.Metric) error {
metricName := metric.MetricDescriptor.Name
actions, exists := acts.ByName[types.MetricName(metricName)]
if !exists {
actions = acts.OtherMetrics
}
func (acts MetricActions) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) {
//metricName := metric.MetricDescriptor.Name
//actions, exists := acts.ByName[types.MetricName(metricName)]
//if !exists {
// actions = acts.OtherMetrics
//}

for _, a := range actions {
err := a.Apply(metric)
for _, a := range acts.Actions {
var err error
metrics, err = a.Apply(metrics)
if err != nil {
return err
return metrics, err
}
}
return nil
return metrics, nil
}

type ResourceAction interface {
Expand All @@ -82,7 +82,7 @@ func (acts SpanActions) Apply(span *otlptrace.Span) error {
}

type MetricAction interface {
Apply(metric *otlpmetric.Metric) error
Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error)
}

//type LogRecordAction interface {
Expand All @@ -101,11 +101,15 @@ func (afv ActionsForVersions) Swap(i, j int) {
afv[i], afv[j] = afv[j], afv[i]
}

func (s *Schema) ConvertResourceToLatest(fromVersion types.TelemetryVersion, resource *otlpresource.Resource) error {
startIndex := sort.Search(len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
})
func (s *Schema) ConvertResourceToLatest(
fromVersion types.TelemetryVersion, resource *otlpresource.Resource,
) error {
startIndex := sort.Search(
len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
},
)
if startIndex > len(s.Versions) {
// Nothing to do
return nil
Expand All @@ -120,11 +124,15 @@ func (s *Schema) ConvertResourceToLatest(fromVersion types.TelemetryVersion, res
return nil
}

func (s *Schema) ConvertSpansToLatest(fromVersion types.TelemetryVersion, spans []*otlptrace.Span) error {
startIndex := sort.Search(len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
})
func (s *Schema) ConvertSpansToLatest(
fromVersion types.TelemetryVersion, spans []*otlptrace.Span,
) error {
startIndex := sort.Search(
len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
},
)
if startIndex > len(s.Versions) {
// Nothing to do
return nil
Expand All @@ -142,22 +150,25 @@ func (s *Schema) ConvertSpansToLatest(fromVersion types.TelemetryVersion, spans
return nil
}

func (s *Schema) ConvertMetricsToLatest(fromVersion types.TelemetryVersion, metrics []*otlpmetric.Metric) error {
startIndex := sort.Search(len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
})
func (s *Schema) ConvertMetricsToLatest(
fromVersion types.TelemetryVersion, metrics *[]*otlpmetric.Metric,
) error {
startIndex := sort.Search(
len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
},
)
if startIndex > len(s.Versions) {
// Nothing to do
return nil
}

for i := startIndex; i < len(s.Versions); i++ {
for j := 0; j < len(metrics); j++ {
metric := metrics[j]
if err := s.Versions[i].Metrics.Apply(metric); err != nil {
return err
}
var err error
*metrics, err = s.Versions[i].Metrics.Apply(*metrics)
if err != nil {
return err
}
}

Expand Down
110 changes: 89 additions & 21 deletions schema/compiled/metric_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import (

otlpcommon "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
otlpmetric "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"

"github.com/tigrannajaryan/telemetry-schema/schema/types"
)

type MetricRenameAction map[types.MetricName]types.MetricName

func (act MetricRenameAction) Apply(metric *otlpmetric.Metric) error {
newName, exists := act[types.MetricName(metric.MetricDescriptor.Name)]
if exists {
metric.MetricDescriptor.Name = string(newName)
func (act MetricRenameAction) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) {
for _, metric := range metrics {
newName, exists := act[types.MetricName(metric.MetricDescriptor.Name)]
if exists {
metric.MetricDescriptor.Name = string(newName)
}
}
return nil
return metrics, nil
}

type MetricLabelRenameAction struct {
Expand All @@ -26,27 +27,34 @@ type MetricLabelRenameAction struct {
LabelMap map[string]string
}

func (act MetricLabelRenameAction) Apply(metric *otlpmetric.Metric) error {
if len(act.ApplyOnlyToMetrics) > 0 {
if _, exists := act.ApplyOnlyToMetrics[types.MetricName(metric.MetricDescriptor.Name)]; !exists {
return nil
func (act MetricLabelRenameAction) Apply(metrics []*otlpmetric.Metric) (
[]*otlpmetric.Metric, error,
) {
var retErr error
for _, metric := range metrics {

if len(act.ApplyOnlyToMetrics) > 0 {
if _, exists := act.ApplyOnlyToMetrics[types.MetricName(metric.MetricDescriptor.Name)]; !exists {
continue
}
}
}

dt := metric.MetricDescriptor.Type
switch dt {
case otlpmetric.MetricDescriptor_INT64:
dps := metric.Int64DataPoints
for i := 0; i < len(dps); i++ {
dp := dps[i]
err := renameLabels(dp.Labels, act.LabelMap)
if err != nil {
return err
dt := metric.MetricDescriptor.Type
switch dt {
case otlpmetric.MetricDescriptor_INT64:
dps := metric.Int64DataPoints
for i := 0; i < len(dps); i++ {
dp := dps[i]
err := renameLabels(dp.Labels, act.LabelMap)
if err != nil {
retErr = err
}
}
}

}

return nil
return metrics, retErr
}

func renameLabels(labels []*otlpcommon.StringKeyValue, renameRules map[string]string) error {
Expand Down Expand Up @@ -74,3 +82,63 @@ func renameLabels(labels []*otlpcommon.StringKeyValue, renameRules map[string]st
}
return err
}

type MetricSplitAction struct {
// ApplyOnlyToMetrics limits which metrics this action should apply to. If empty then
// there is no limitation.
MetricName types.MetricName
AttributeName string
SplitMap map[types.LabelValue]types.MetricName
}

func (act MetricSplitAction) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) {
for i := 0; i < len(metrics); i++ {
metric := metrics[i]
if act.MetricName != types.MetricName(metric.MetricDescriptor.Name) {
continue
}

var outputMetrics []*otlpmetric.Metric
dt := metric.MetricDescriptor.Type
switch dt {
case otlpmetric.MetricDescriptor_INT64:
dps := metric.Int64DataPoints
for j := 0; j < len(dps); j++ {
dp := dps[j]
outputMetric := splitMetric(act.AttributeName, act.SplitMap, metric, dp)
outputMetrics = append(outputMetrics, outputMetric)
}
}

metrics = append(append(metrics[0:i], outputMetrics...), metrics[i+1:]...)
}

return metrics, nil
}

func splitMetric(
splitByAttr string,
splitRules map[types.LabelValue]types.MetricName,
input *otlpmetric.Metric,
inputDp *otlpmetric.Int64DataPoint,
) *otlpmetric.Metric {
output := &otlpmetric.Metric{}
descr := *input.MetricDescriptor
output.MetricDescriptor = &descr

outputDp := *inputDp
outputDp.Labels = nil

for _, label := range inputDp.Labels {
if label.Key == splitByAttr {
if convertTo, exists := splitRules[types.LabelValue(label.Value)]; exists {
newMetricName := string(convertTo)
output.MetricDescriptor.Name = newMetricName
}
continue
}
outputDp.Labels = append(outputDp.Labels, label)
}
output.Int64DataPoints = []*otlpmetric.Int64DataPoint{&outputDp}
return output
}
51 changes: 18 additions & 33 deletions schema/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ func Compile(schema *ast.Schema) *compiled.Schema {
compiledActionsForVersion[versionNum] = actionsForVer
}

actionsForVer.Resource = compileResourceActions(versionDescr.All.Changes, versionDescr.Resources.Changes)
actionsForVer.Metrics = compileMetricActions(versionDescr.All.Changes, versionDescr.Metrics.Changes)
actionsForVer.Spans = compileSpanActions(versionDescr.All.Changes, versionDescr.Spans.Changes)
actionsForVer.Resource = compileResourceActions(
versionDescr.All.Changes, versionDescr.Resources.Changes,
)
actionsForVer.Metrics = compileMetricActions(
versionDescr.All.Changes, versionDescr.Metrics.Changes,
)
actionsForVer.Spans = compileSpanActions(
versionDescr.All.Changes, versionDescr.Spans.Changes,
)
}

// Convert map by version to a slice.
Expand Down Expand Up @@ -69,63 +75,42 @@ func compileMetricActions(
metricActions []ast.MetricTranslationAction,
) (result compiled.MetricActions) {

var compiledActionSeq []compiled.MetricAction

// First add actions in "all" section.
for _, action := range allActions {
if action.RenameAttributes != nil {
compiledAction := compiled.MetricLabelRenameAction{
LabelMap: *action.RenameAttributes,
}
compiledActionSeq = append(compiledActionSeq, compiledAction)
// Should apply to all metrics.
result.OtherMetrics = append(result.OtherMetrics, compiledAction)
result.Actions = append(result.Actions, compiledAction)
}
}

// Now compile metric actions and add one by one.
affectedMetrics := map[types.MetricName]bool{}
for _, srcAction := range metricActions {
var compiledAction compiled.MetricAction

if srcAction.RenameMetrics != nil {
compiledAction = compiled.MetricRenameAction(srcAction.RenameMetrics)
for metricName := range srcAction.RenameMetrics {
affectedMetrics[metricName] = true
}
result.Actions = append(result.Actions, compiledAction)
} else if srcAction.RenameLabels != nil {
compiledAction = compiled.MetricLabelRenameAction{
ApplyOnlyToMetrics: metricNamesToMap(srcAction.RenameLabels.ApplyToMetrics),
LabelMap: srcAction.RenameLabels.LabelMap,
}

if len(srcAction.RenameLabels.ApplyToMetrics) == 0 {
// Should apply to all metrics.
result.OtherMetrics = append(result.OtherMetrics, compiledAction)
} else {
// Applies to specific metrics only.
for _, metricName := range srcAction.RenameLabels.ApplyToMetrics {
affectedMetrics[metricName] = true
}
result.Actions = append(result.Actions, compiledAction)
} else if srcAction.Split != nil {
compiledAction = compiled.MetricSplitAction{
MetricName: srcAction.Split.ApplyToMetric,
AttributeName: srcAction.Split.ByAttribute,
SplitMap: srcAction.Split.AttributesToMetrics,
}
}

if compiledAction != nil {
compiledActionSeq = append(compiledActionSeq, compiledAction)
result.Actions = append(result.Actions, compiledAction)
}
}

result.ByName = map[types.MetricName][]compiled.MetricAction{}

for metricName := range affectedMetrics {
result.ByName[metricName] = compiledActionSeq
// TODO: optimize compiledActionSeq by checking if metricName is in the
// ApplyOnlyToMetrics map that limits the application of particular action
// then ApplyOnlyToMetrics can be deleted since it has no effect. That will
// speed up the action execution since we no longer need to lookup the metric
// name in the limit map.
}

return result
}

Expand Down
Loading

0 comments on commit 2566b44

Please sign in to comment.