Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor span output to a common function #918

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,14 @@ fixtures/%:
kubectl wait --for=condition=Ready --timeout=60s pod/test-opentelemetry-collector-0
kubectl -n default create -f .github/workflows/e2e/k8s/sample-job.yml
if kubectl wait --for=condition=Complete --timeout=60s job/sample-job; then \
rm -f ./internal/test/e2e/$(LIBRARY)/traces-orig.json; \
kubectl cp -c filecp default/test-opentelemetry-collector-0:tmp/trace.json ./internal/test/e2e/$(LIBRARY)/traces-orig.json; \
rm -f ./internal/test/e2e/$(LIBRARY)/traces.json; \
bats ./internal/test/e2e/$(LIBRARY)/verify.bats; \
else \
kubectl logs -l app=sample -c auto-instrumentation; \
fi
kind delete cluster
kind delete cluster

.PHONY: prerelease
prerelease: | $(MULTIMOD)
Expand Down
7 changes: 6 additions & 1 deletion internal/include/span_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ static __always_inline void w3c_string_to_span_context(char *str, struct span_co
hex_string_to_bytes(str + trace_flags_start_pos, TRACE_FLAGS_STRING_SIZE, &ctx->TraceFlags);
}

static __always_inline bool trace_flags_is_sampled(u8 flags)
{
return ((flags & FLAG_SAMPLED) == FLAG_SAMPLED);
}

static __always_inline bool is_sampled(struct span_context *ctx)
{
return ((ctx->TraceFlags & FLAG_SAMPLED) == FLAG_SAMPLED);
return trace_flags_is_sampled(ctx->TraceFlags);
}

#endif
37 changes: 37 additions & 0 deletions internal/include/span_output.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "span_context.h"
#include "common.h"
#include "span_context.h"

#ifndef _SPAN_OUTPUT_H_
#define _SPAN_OUTPUT_H_

struct
{
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} events SEC(".maps");

// Output a record to the perf buffer. If the span context is sampled, the record is outputted.
// Returns 0 on success, negative error code on failure.
static __always_inline long output_span_event(void *ctx, void *data, u64 size, struct span_context *sc) {
bool sampled = (sc != NULL && is_sampled(sc));
if (sampled) {
return bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, data, size);
}
return 0;
}

#endif
14 changes: 7 additions & 7 deletions internal/include/uprobe.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "span_context.h"
#include "go_context.h"
#include "go_types.h"
#include "span_output.h"

#define BASE_SPAN_PROPERTIES \
u64 start_time; \
Expand All @@ -38,16 +39,15 @@ SEC("uprobe/##name##")
int uprobe_##name##_Returns(struct pt_regs *ctx) { \
void *ctx_address = get_Go_context(ctx, context_pos, context_offset, passed_as_arg); \
void *key = get_consistent_key(ctx, ctx_address); \
void *req_ptr_map = bpf_map_lookup_elem(&uprobe_context_map, &key); \
if (req_ptr_map == NULL) { \
event_type *event = bpf_map_lookup_elem(&uprobe_context_map, &key); \
if (event == NULL) { \
bpf_printk("event is NULL in ret probe"); \
return 0; \
} \
event_type tmpReq = {0}; \
bpf_probe_read(&tmpReq, sizeof(tmpReq), req_ptr_map); \
tmpReq.end_time = bpf_ktime_get_ns(); \
bpf_perf_event_output(ctx, &events_map, BPF_F_CURRENT_CPU, &tmpReq, sizeof(tmpReq)); \
event->end_time = bpf_ktime_get_ns(); \
output_span_event(ctx, event, sizeof(event_type), &event->sc); \
bpf_map_delete_elem(&uprobe_context_map, &key); \
stop_tracking_span(&tmpReq.sc, &tmpReq.psc); \
stop_tracking_span(&event->sc, &event->psc); \
return 0; \
}

Expand Down
14 changes: 4 additions & 10 deletions internal/pkg/instrumentation/bpf/database/sql/bpf/probe.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ struct {
__uint(max_entries, MAX_CONCURRENT);
} sql_events SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} events SEC(".maps");

// Injected in init
volatile const bool should_include_db_statement;

Expand Down Expand Up @@ -68,10 +64,9 @@ int uprobe_queryDC(struct pt_regs *ctx) {
if (span_ctx != NULL) {
// Set the parent context
bpf_probe_read(&sql_request.psc, sizeof(sql_request.psc), span_ctx);
copy_byte_arrays(sql_request.psc.TraceID, sql_request.sc.TraceID, TRACE_ID_SIZE);
generate_random_bytes(sql_request.sc.SpanID, SPAN_ID_SIZE);
get_span_context_from_parent(&sql_request.psc, &sql_request.sc);
} else {
sql_request.sc = generate_span_context();
get_root_span_context(&sql_request.sc);
}

// Get key
Expand Down Expand Up @@ -112,10 +107,9 @@ int uprobe_execDC(struct pt_regs *ctx) {
if (span_ctx != NULL) {
// Set the parent context
bpf_probe_read(&sql_request.psc, sizeof(sql_request.psc), span_ctx);
copy_byte_arrays(sql_request.psc.TraceID, sql_request.sc.TraceID, TRACE_ID_SIZE);
generate_random_bytes(sql_request.sc.SpanID, SPAN_ID_SIZE);
get_span_context_from_parent(&sql_request.psc, &sql_request.sc);
} else {
sql_request.sc = generate_span_context();
get_root_span_context(&sql_request.sc);
}

// Get key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "go_context.h"
#include "go_types.h"
#include "uprobe.h"
#include "span_output.h"

char __license[] SEC("license") = "Dual MIT/GPL";

Expand Down Expand Up @@ -73,10 +74,6 @@ struct
__uint(max_entries, 1);
} parent_span_context_storage_map SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} events SEC(".maps");

// https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48
struct kafka_header_t {
struct go_string key;
Expand Down Expand Up @@ -163,7 +160,7 @@ int uprobe_FetchMessage(struct pt_regs *ctx) {
get_go_string_from_user_ptr((void *)(reader + reader_config_pos + reader_config_group_id_pos), kafka_request->consumer_group, sizeof(kafka_request->consumer_group));
kafka_request->end_time = bpf_ktime_get_ns();

bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, kafka_request, sizeof(*kafka_request));
output_span_event(ctx, kafka_request, sizeof(*kafka_request), &kafka_request->sc);
stop_tracking_span(&kafka_request->sc, &kafka_request->psc);
bpf_map_delete_elem(&kafka_events, &goroutine);

Expand Down Expand Up @@ -198,10 +195,9 @@ int uprobe_FetchMessage_Returns(struct pt_regs *ctx) {
if (parent_span_ctx != NULL) {
// Set the parent context
bpf_probe_read(&kafka_request->psc, sizeof(kafka_request->psc), parent_span_ctx);
copy_byte_arrays(kafka_request->psc.TraceID, kafka_request->sc.TraceID, TRACE_ID_SIZE);
generate_random_bytes(kafka_request->sc.SpanID, SPAN_ID_SIZE);
get_span_context_from_parent(parent_span_ctx, &kafka_request->sc);
} else {
kafka_request->sc = generate_span_context();
get_root_span_context(&kafka_request->sc);
}

// Collecting message attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "go_context.h"
#include "go_types.h"
#include "uprobe.h"
#include "span_output.h"

char __license[] SEC("license") = "Dual MIT/GPL";

Expand All @@ -33,7 +34,7 @@ char __license[] SEC("license") = "Dual MIT/GPL";
#define MAX_KEY_SIZE 256

struct message_attributes_t {
unsigned char SpanID[SPAN_ID_SIZE];
struct span_context sc;
char topic[MAX_TOPIC_SIZE];
char key[MAX_KEY_SIZE];
};
Expand All @@ -43,7 +44,6 @@ struct kafka_request_t {
u64 start_time;
u64 end_time;
struct span_context psc;
unsigned char TraceID[TRACE_ID_SIZE];
// attributes per message
struct message_attributes_t msgs[MAX_BATCH_SIZE];
char global_topic[MAX_TOPIC_SIZE];
Expand All @@ -65,10 +65,6 @@ struct
__uint(max_entries, 1);
} kafka_request_storage_map SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} events SEC(".maps");

// https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48
struct kafka_header_t {
struct go_string key;
Expand Down Expand Up @@ -170,19 +166,17 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
// Get parent if exists
struct span_context *parent_span_ctx = get_parent_span_context(context_data_ptr);
if (parent_span_ctx != NULL) {
// Set the parent context
bpf_probe_read(&kafka_request->psc, sizeof(kafka_request->psc), parent_span_ctx);
copy_byte_arrays(kafka_request->psc.TraceID, kafka_request->TraceID, TRACE_ID_SIZE);
kafka_request->psc = *parent_span_ctx;
get_span_context_from_parent(&kafka_request->psc, &kafka_request->msgs[0].sc);
} else {
generate_random_bytes(kafka_request->TraceID, TRACE_ID_SIZE);
get_root_span_context(&kafka_request->msgs[0].sc);
}

// Try to get a global topic from Writer
bool global_topic = get_go_string_from_user_ptr((void *)(writer + writer_topic_pos), kafka_request->global_topic, sizeof(kafka_request->global_topic));

void *msg_ptr = msgs_array;
struct kafka_header_t header = {0};
struct span_context current_sc = {0};
// This is hack to get the message size. This calculation is based on the following assumptions:
// 1. "Time" is the last field in the message struct. This looks to be correct for all the versions according to
// https://github.com/segmentio/kafka-go/blob/v0.2.3/message.go#L24C2-L24C6
Expand All @@ -191,7 +185,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
// In the future if more libraries will need to get structs sizes we probably want to have similar
// mechanism to the one we have for the offsets
u16 msg_size = message_time_pos + 8 + 8 + 8;
__builtin_memcpy(current_sc.TraceID, kafka_request->TraceID, TRACE_ID_SIZE);
kafka_request->valid_messages = 0;
// Iterate over the messages
for (u64 i = 0; i < MAX_BATCH_SIZE; i++) {
Expand All @@ -201,10 +194,16 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
// Optionally collect the topic, and always collect key
collect_kafka_attributes(msg_ptr, &kafka_request->msgs[i], !global_topic);
// Generate span id for each message
generate_random_bytes(kafka_request->msgs[i].SpanID, SPAN_ID_SIZE);
__builtin_memcpy(current_sc.SpanID, kafka_request->msgs[i].SpanID, SPAN_ID_SIZE);
if (i > 0) {
generate_random_bytes(kafka_request->msgs[i].sc.SpanID, SPAN_ID_SIZE);
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// Copy the trace id and trace flags from the first message. This means the sampling decision is done on the first message,
// and all the messages in the batch will have the same trace id and trace flags.
kafka_request->msgs[i].sc.TraceFlags = kafka_request->msgs[0].sc.TraceFlags;
__builtin_memcpy(kafka_request->msgs[i].sc.TraceID, kafka_request->msgs[0].sc.TraceID, TRACE_ID_SIZE);
}

// Build the header
if (build_contxet_header(&header, &current_sc) != 0) {
if (build_contxet_header(&header, &kafka_request->msgs[i].sc) != 0) {
bpf_printk("uprobe/WriteMessages: Failed to build header");
return 0;
}
Expand Down Expand Up @@ -235,7 +234,7 @@ int uprobe_WriteMessages_Returns(struct pt_regs *ctx) {
}
kafka_request->end_time = end_time;

bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, kafka_request, sizeof(*kafka_request));
output_span_event(ctx, kafka_request, sizeof(*kafka_request), &kafka_request->msgs[0].sc);
bpf_map_delete_elem(&kafka_events, &key);
// don't need to stop tracking the span, as we don't have a context to propagate locally
return 0;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,16 @@ func New(logger logr.Logger) probe.Probe {
}

type messageAttributes struct {
SpaID trace.SpanID
Topic [256]byte
Key [256]byte
SpanContext context.EBPFSpanContext
Topic [256]byte
Key [256]byte
}

// event represents a batch of kafka messages being sent.
type event struct {
StartTime uint64
EndTime uint64
ParentSpanContext context.EBPFSpanContext
// Same trace id for all the batch
TraceID trace.TraceID
// Message specific attributes
Messages [10]messageAttributes
// Global topic for the batch
Expand All @@ -99,7 +97,7 @@ type event struct {

func convertEvent(e *event) []*probe.SpanEvent {
tsc := trace.SpanContextConfig{
TraceID: e.TraceID,
TraceID: e.Messages[0].SpanContext.TraceID,
TraceFlags: trace.FlagsSampled,
}

Expand Down Expand Up @@ -130,7 +128,7 @@ func convertEvent(e *event) []*probe.SpanEvent {
var res []*probe.SpanEvent
var msgTopic string
for i := uint64(0); i < e.ValidMessages; i++ {
tsc.SpanID = e.Messages[i].SpaID
tsc.SpanID = e.Messages[i].SpanContext.SpanID
sc := trace.NewSpanContext(tsc)
key := unix.ByteSliceToString(e.Messages[i].Key[:])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/auto/internal/pkg/instrumentation/context"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
)

Expand All @@ -36,21 +37,26 @@ func TestProbeConvertEvent(t *testing.T) {
got := convertEvent(&event{
StartTime: uint64(start.UnixNano()),
EndTime: uint64(end.UnixNano()),
TraceID: traceID,
Messages: [10]messageAttributes{
{
// topic1
Topic: [256]byte{0x74, 0x6f, 0x70, 0x69, 0x63, 0x31},
// key1
Key: [256]byte{0x6b, 0x65, 0x79, 0x31},
SpaID: trace.SpanID{1},
Key: [256]byte{0x6b, 0x65, 0x79, 0x31},
SpanContext: context.EBPFSpanContext{
TraceID: traceID,
SpanID: trace.SpanID{1},
},
},
{
// topic2
Topic: [256]byte{0x74, 0x6f, 0x70, 0x69, 0x63, 0x32},
// key2
Key: [256]byte{0x6b, 0x65, 0x79, 0x32},
SpaID: trace.SpanID{2},
Key: [256]byte{0x6b, 0x65, 0x79, 0x32},
SpanContext: context.EBPFSpanContext{
TraceID: traceID,
SpanID: trace.SpanID{2},
},
},
},
ValidMessages: 2,
Expand Down
Loading
Loading