Skip to content

Commit

Permalink
metrics transformation + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Jul 1, 2024
1 parent 6d16b88 commit 6b556fb
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 181 deletions.
161 changes: 0 additions & 161 deletions controllers/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package constants

import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -58,163 +57,3 @@ var EnvNoHTTP2 = corev1.EnvVar{
Name: "GODEBUG",
Value: "http2server=0",
}

// OpenTelemetryDefaultTransformRules defined the default Open Telemetry format
// See https://github.com/rhobs/observability-data-model/blob/main/network-observability.md#format-proposal
var OpenTelemetryDefaultTransformRules = []api.GenericTransformRule{{
Input: "SrcAddr",
Output: "source.address",
}, {
Input: "SrcMac",
Output: "source.mac",
}, {
Input: "SrcHostIP",
Output: "source.host.address",
}, {
Input: "SrcK8S_HostName",
Output: "source.k8s.node.name",
}, {
Input: "SrcPort",
Output: "source.port",
}, {
Input: "SrcK8S_Name",
Output: "source.k8s.name",
}, {
Input: "SrcK8S_Type",
Output: "source.k8s.kind",
}, {
Input: "SrcK8S_OwnerName",
Output: "source.k8s.owner.name",
}, {
Input: "SrcK8S_OwnerType",
Output: "source.k8s.owner.kind",
}, {
Input: "SrcK8S_Namespace",
Output: "source.k8s.namespace.name",
}, {
Input: "SrcK8S_HostIP",
Output: "source.k8s.host.address",
}, {
Input: "SrcK8S_HostName",
Output: "source.k8s.host.name",
}, {
Input: "SrcK8S_Zone",
Output: "source.zone",
}, {
Input: "DstAddr",
Output: "destination.address",
}, {
Input: "DstMac",
Output: "destination.mac",
}, {
Input: "DstHostIP",
Output: "destination.host.address",
}, {
Input: "DstK8S_HostName",
Output: "destination.k8s.node.name",
}, {
Input: "DstPort",
Output: "destination.port",
}, {
Input: "DstK8S_Name",
Output: "destination.k8s.name",
}, {
Input: "DstK8S_Type",
Output: "destination.k8s.kind",
}, {
Input: "DstK8S_OwnerName",
Output: "destination.k8s.owner.name",
}, {
Input: "DstK8S_OwnerType",
Output: "destination.k8s.owner.kind",
}, {
Input: "DstK8S_Namespace",
Output: "destination.k8s.namespace.name",
}, {
Input: "DstK8S_HostIP",
Output: "destination.k8s.host.address",
}, {
Input: "DstK8S_HostName",
Output: "destination.k8s.host.name",
}, {
Input: "DstK8S_Zone",
Output: "destination.zone",
}, {
Input: "Bytes",
Output: "bytes",
}, {
Input: "Packets",
Output: "packets",
}, {
Input: "Proto",
Output: "protocol",
}, {
Input: "Flags",
Output: "tcp.flags",
}, {
Input: "TimeFlowRttNs",
Output: "tcp.rtt",
}, {
Input: "Interfaces",
Output: "interface.names",
}, {
Input: "IfDirections",
Output: "interface.directions",
}, {
Input: "FlowDirection",
Output: "host.direction",
}, {
Input: "DnsErrno",
Output: "dns.errno",
}, {
Input: "DnsFlags",
Output: "dns.flags",
}, {
Input: "DnsFlagsResponseCode",
Output: "dns.responsecode",
}, {
Input: "DnsId",
Output: "dns.id",
}, {
Input: "DnsLatencyMs",
Output: "dns.latency",
}, {
Input: "Dscp",
Output: "dscp",
}, {
Input: "IcmpCode",
Output: "icmp.code",
}, {
Input: "IcmpType",
Output: "icmp.type",
}, {
Input: "K8S_ClusterName",
Output: "k8s.cluster.name",
}, {
Input: "K8S_FlowLayer",
Output: "k8s.layer",
}, {
Input: "PktDropBytes",
Output: "drops.bytes",
}, {
Input: "PktDropPackets",
Output: "drops.packets",
}, {
Input: "PktDropLatestDropCause",
Output: "drops.latestcause",
}, {
Input: "PktDropLatestFlags",
Output: "drops.latestflags",
}, {
Input: "PktDropLatestState",
Output: "drops.lateststate",
}, {
Input: "TimeFlowEndMs",
Output: "timeflowend",
}, {
Input: "TimeFlowStartMs",
Output: "timeflowstart",
}, {
Input: "TimeReceived",
Output: "timereceived",
}}
24 changes: 4 additions & 20 deletions controllers/flp/flp_pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,23 +495,8 @@ func (b *PipelineBuilder) createOpenTelemetryStage(name string, spec *flowslates

// otel logs config
if spec.Logs.Enable != nil && *spec.Logs.Enable {
transformConfig := api.TransformGeneric{
Policy: "replace_keys",
Rules: constants.OpenTelemetryDefaultTransformRules,
}
// set custom rules if specified
if spec.Rules != nil {
transformConfig.Rules = []api.GenericTransformRule{}
for _, r := range *spec.Rules {
transformConfig.Rules = append(transformConfig.Rules, api.GenericTransformRule{
Input: r.Input,
Output: r.Output,
Multiplier: r.Multiplier,
})
}
}
// add transform stage
transformStage := fromStage.TransformGeneric(fmt.Sprintf("%s-transform", name), transformConfig)
transformStage := fromStage.TransformGeneric(fmt.Sprintf("%s-transform", name), helper.GetOtelTransformConfig(spec.Rules))
// add encode stage(s)
transformStage.EncodeOtelLogs(fmt.Sprintf("%s-logs", name), api.EncodeOtlpLogs{
OtlpConnectionInfo: &conn,
Expand All @@ -523,10 +508,9 @@ func (b *PipelineBuilder) createOpenTelemetryStage(name string, spec *flowslates
fromStage.EncodeOtelMetrics(fmt.Sprintf("%s-metrics", name), api.EncodeOtlpMetrics{
OtlpConnectionInfo: &conn,
Prefix: "netobserv",
// TODO: rewrite flpMetrics to map labels / filters with the otel semantic
Metrics: flpMetrics,
PushTimeInterval: api.Duration{Duration: spec.Metrics.PushTimeInterval.Duration},
ExpiryTime: api.Duration{Duration: 2 * time.Minute},
Metrics: helper.GetOtelMetrics(flpMetrics),
PushTimeInterval: api.Duration{Duration: spec.Metrics.PushTimeInterval.Duration},
ExpiryTime: api.Duration{Duration: 2 * time.Minute},
})
}

Expand Down
Loading

0 comments on commit 6b556fb

Please sign in to comment.