Skip to content

Commit

Permalink
Finish tests and simplify config
Browse files Browse the repository at this point in the history
  • Loading branch information
liustanley committed Aug 14, 2024
1 parent 6f5747e commit 95a2151
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 221 deletions.
185 changes: 9 additions & 176 deletions e2e-tests/tests/config.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
receivers:
jaeger: null
zipkin: null
hostmetrics:
scrapers:
paging:
metrics:
system.paging.utilization:
enabled: true
cpu:
metrics:
system.cpu.utilization:
enabled: true
system.cpu.physical.count:
enabled: true
system.cpu.logical.count:
enabled: true
system.cpu.frequency:
enabled: true
disk:
load:
memory:
network:
processes:
otlp:
protocols:
grpc:
Expand All @@ -32,172 +9,28 @@
debug:
verbosity: detailed
datadog:
host_metadata:
tags: ['env:${env:OTEL_K8S_NAMESPACE}']
metrics:
resource_attributes_as_tags: true
histograms:
mode: counters
send_count_sum_metrics: true
traces:
span_name_as_resource_name: true
compute_stats_by_span_kind: true
peer_service_aggregation: true
trace_buffer: 1000
api:
key: "${DD_API_KEY}"
fail_on_invalid_key: false
metrics:
resource_attributes_as_tags: true
processors:
attributes:
actions:
- key: log.file.path
pattern: \/var\/log\/pods\/otel_opentelemetry-demo-(?P<service_name>.*?)-.*
action: extract
- key: log.file.path
pattern: \/var\/log\/pods\/otel_opentelemetry-demo-(?P<source>.*?)-.*
action: extract
- key: service
from_attribute: service_name
action: upsert
transform:
metric_statements: &statements
- context: resource
statements:
- set(attributes["datadog.host.use_as_metadata"],true)
- set(attributes["datadog.host.tag.foo"],"bar")
trace_statements: *statements
log_statements: *statements
attributes/kafkasrc:
include:
match_type: regexp
attributes:
- {key: "source", value: 'kafka|frauddetectionservice|orderproducer'}
actions:
# this makes sure the source for logs that come from kafka|frauddetectionservice|orderproducer is
# set to kafka. The OOTB box kafka dashboard filters logs on source:kafka. This will still not work
# as it will be a log attribute and can only be searched by @source:kafka. We would need to support
# attributes_as_tags or find another solution for this to work.
- key: source
action: update
value: "kafka"
memory_limiter:
check_interval: 1s
limit_mib: 500
resourcedetection:
# ensures host.name and other important resource tags
# get picked up
detectors: [env, gcp, ecs, ec2, azure, system]
timeout: 5s
override: false
system:
# Enable optional system attributes
resource_attributes:
os.type:
enabled: true
os.description:
enabled: true
host.ip:
enabled: true
host.mac:
enabled: true
host.arch:
enabled: true
host.cpu.vendor.id:
enabled: true
host.cpu.model.name:
enabled: true
host.cpu.family:
enabled: true
host.cpu.model.id:
enabled: true
host.cpu.stepping:
enabled: true
host.cpu.cache.l2.size:
enabled: true
host.id:
enabled: false
# adds various tags related to k8s
# adds various tags related to k8s
k8sattributes:
passthrough: false
auth_type: "serviceAccount"
pod_association:
- sources:
- from: resource_attribute
name: k8s.pod.ip
extract:
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.deployment.name
- k8s.node.name
- k8s.namespace.name
- k8s.pod.start_time
- k8s.replicaset.name
- k8s.replicaset.uid
- k8s.daemonset.name
- k8s.daemonset.uid
- k8s.job.name
- k8s.job.uid
- k8s.cronjob.name
- k8s.statefulset.name
- k8s.statefulset.uid
- container.image.name
- container.image.tag
- container.id
- k8s.container.name
- container.image.name
- container.image.tag
- container.id
labels:
- tag_name: kube_app_name
key: app.kubernetes.io/name
from: pod
- tag_name: kube_app_instance
key: app.kubernetes.io/instance
from: pod
- tag_name: kube_app_version
key: app.kubernetes.io/version
from: pod
- tag_name: kube_app_component
key: app.kubernetes.io/component
from: pod
- tag_name: kube_app_part_of
key: app.kubernetes.io/part-of
from: pod
- tag_name: kube_app_managed_by
key: app.kubernetes.io/managed-by
from: pod
batch:
send_batch_max_size: 1000
send_batch_size: 100
timeout: 10s
probabilistic_sampler:
hash_seed: 22
sampling_percentage: 15.3
connectors:
datadog/connector:
traces:
span_name_as_resource_name: true
service:
telemetry:
logs:
encoding: "json"
initial_fields:
- service: "otel-collector"
pipelines:
metrics:
receivers: [otlp, hostmetrics, prometheus, datadog/connector]
processors: [resourcedetection, k8sattributes, transform, batch]
exporters: [datadog]
receivers: [otlp, datadog/connector]
processors: [batch]
exporters: [datadog, debug]
traces:
receivers: [otlp]
processors: [resourcedetection, k8sattributes, transform, batch]
exporters: [datadog/connector]
traces/sampled:
receivers: [datadog/connector]
processors: [probabilistic_sampler, batch]
exporters: [datadog]
processors: [batch]
exporters: [datadog/connector, datadog]
logs:
processors: [memory_limiter, resourcedetection, k8sattributes, attributes, attributes/kafkasrc, transform, batch]
receivers: [otlp]
processors: [batch]
exporters: [datadog]
90 changes: 45 additions & 45 deletions e2e-tests/tests/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/e2e"
"github.com/DataDog/datadog-agent/test/new-e2e/pkg/runner"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,32 +55,21 @@ image:
`, pipelineID, commitSHA)
otelOptions = append(otelOptions, otelparams.WithHelmValues(values))
}

suiteParams := []e2e.SuiteOption{e2e.WithProvisioner(otelcollector.Provisioner(otelcollector.WithOTelOptions(otelOptions...), otelcollector.WithExtraParams(extraParams)))}

e2e.Run(t, &otelSuite{}, suiteParams...)
}

// TODO write a test that actually test something
func (v *otelSuite) TestExecute() {
res, _ := v.Env().KubernetesCluster.Client().CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{})
for _, pod := range res.Items {
v.T().Logf("Pod: %s", pod.Name)
}
assert.EventuallyWithT(v.T(), func(t *assert.CollectT) {
metricsName, err := v.Env().FakeIntake.Client().GetMetricNames()
assert.NoError(t, err)
fmt.Printf("metriiiics: %v", metricsName)
logs, err := v.Env().FakeIntake.Client().FilterLogs("")
for _, l := range logs {
fmt.Printf("logs : %v", l.Tags)
func (s *otelSuite) TestCollectorReady() {
s.T().Log("Waiting for collector pod startup")
assert.EventuallyWithT(s.T(), func(t *assert.CollectT) {
res, _ := s.Env().KubernetesCluster.Client().CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{})
for _, pod := range res.Items {
for _, containerStatus := range append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...) {
assert.Truef(t, containerStatus.Ready, "Container %s of pod %s isn’t ready", containerStatus.Name, pod.Name)
assert.Zerof(t, containerStatus.RestartCount, "Container %s of pod %s has restarted", containerStatus.Name, pod.Name)
}
}

assert.NoError(t, err)
assert.NotEmpty(t, metricsName)
assert.NotEmpty(t, logs)

}, 1*time.Minute, 10*time.Second)
}, 2*time.Minute, 10*time.Second)
}

func (s *otelSuite) TestOTLPTraces() {
Expand All @@ -91,23 +81,27 @@ func (s *otelSuite) TestOTLPTraces() {
s.T().Log("Starting telemetrygen")
s.createTelemetrygenJob(ctx, "traces", []string{"--service", service, "--traces", fmt.Sprint(numTraces)})

var traces []*aggregator.TracePayload
var err error
s.T().Log("Waiting for traces")
s.EventuallyWithT(func(c *assert.CollectT) {
traces, err := s.Env().FakeIntake.Client().GetTraces()
require.EventuallyWithT(s.T(), func(c *assert.CollectT) {
traces, err = s.Env().FakeIntake.Client().GetTraces()
assert.NoError(c, err)
assert.NotEmpty(c, traces)
trace := traces[0]
assert.Equal(c, "none", trace.Env)
assert.NotEmpty(c, trace.TracerPayloads)
tp := trace.TracerPayloads[0]
assert.NotEmpty(c, tp.Chunks)
assert.NotEmpty(c, tp.Chunks[0].Spans)
spans := tp.Chunks[0].Spans
for _, sp := range spans {
assert.Equal(c, service, sp.Service)
assert.Equal(c, "telemetrygen", sp.Meta["otel.library.name"])
}
}, 2*time.Minute, 10*time.Second)

require.NotEmpty(s.T(), traces)
trace := traces[0]
assert.Equal(s.T(), "none", trace.Env)
require.NotEmpty(s.T(), trace.TracerPayloads)
tp := trace.TracerPayloads[0]
require.NotEmpty(s.T(), tp.Chunks)
require.NotEmpty(s.T(), tp.Chunks[0].Spans)
spans := tp.Chunks[0].Spans
for _, sp := range spans {
assert.Equal(s.T(), service, sp.Service)
assert.Equal(s.T(), "telemetrygen", sp.Meta["otel.library.name"])
}
}

func (s *otelSuite) TestOTLPMetrics() {
Expand All @@ -121,7 +115,7 @@ func (s *otelSuite) TestOTLPMetrics() {
s.createTelemetrygenJob(ctx, "metrics", []string{"--metrics", fmt.Sprint(numMetrics), "--otlp-attributes", serviceAttribute})

s.T().Log("Waiting for metrics")
s.EventuallyWithT(func(c *assert.CollectT) {
require.EventuallyWithT(s.T(), func(c *assert.CollectT) {
serviceTag := "service:" + service
metrics, err := s.Env().FakeIntake.Client().FilterMetrics("gen", fakeintake.WithTags[*aggregator.MetricSeries]([]string{serviceTag}))
assert.NoError(c, err)
Expand All @@ -140,23 +134,25 @@ func (s *otelSuite) TestOTLPLogs() {
s.T().Log("Starting telemetrygen")
s.createTelemetrygenJob(ctx, "logs", []string{"--logs", fmt.Sprint(numLogs), "--otlp-attributes", serviceAttribute, "--body", logBody})

var logs []*aggregator.Log
var err error
s.T().Log("Waiting for logs")
s.EventuallyWithT(func(c *assert.CollectT) {
logs, err := s.Env().FakeIntake.Client().FilterLogs(service)
require.EventuallyWithT(s.T(), func(c *assert.CollectT) {
logs, err = s.Env().FakeIntake.Client().FilterLogs(service)
assert.NoError(c, err)
assert.NotEmpty(c, logs)
for _, log := range logs {
assert.Contains(c, log.Message, logBody)
}
}, 2*time.Minute, 10*time.Second)

require.NotEmpty(s.T(), logs)
for _, log := range logs {
assert.Contains(s.T(), log.Message, logBody)
}
}

func (s *otelSuite) createTelemetrygenJob(ctx context.Context, telemetry string, options []string) {
var ttlSecondsAfterFinished int32 = 0 //nolint:revive // We want to see this is explicitly set to 0
var ttlSecondsAfterFinished int32 = 60 //nolint:revive // We want to see this is explicitly set to 0
var backOffLimit int32 = 4

otlpEndpoint := fmt.Sprintf("%v:4317", s.Env().OTelCollector.LabelSelectors["app.kubernetes.io/name"])
s.T().Log("otlpEndpoint", otlpEndpoint)
jobSpec := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("telemetrygen-job-%v", telemetry),
Expand All @@ -168,9 +164,13 @@ func (s *otelSuite) createTelemetrygenJob(ctx context.Context, telemetry string,
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Env: []corev1.EnvVar{{
Name: "HOST_IP",
ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"}},
}},
Name: "telemetrygen-job",
Image: "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest",
Command: append([]string{"/telemetrygen", telemetry, "--otlp-endpoint", otlpEndpoint, "--otlp-insecure"}, options...),
Command: append([]string{"/telemetrygen", telemetry, "--otlp-endpoint", "$(HOST_IP):4317", "--otlp-insecure"}, options...),
},
},
RestartPolicy: corev1.RestartPolicyNever,
Expand All @@ -181,5 +181,5 @@ func (s *otelSuite) createTelemetrygenJob(ctx context.Context, telemetry string,
}

_, err := s.Env().KubernetesCluster.Client().BatchV1().Jobs("default").Create(ctx, jobSpec, metav1.CreateOptions{})
assert.NoError(s.T(), err, "Could not properly start job")
require.NoError(s.T(), err, "Could not properly start job")
}

0 comments on commit 95a2151

Please sign in to comment.