From 09ee8caee24715961a0c542794eb754705ba08c2 Mon Sep 17 00:00:00 2001 From: Ron Federman Date: Sat, 6 Jul 2024 16:22:05 +0300 Subject: [PATCH 1/2] Refactor span output in eBPF to a common function, which will only send the span to user space if the span is sampled --- Makefile | 3 +- internal/include/span_context.h | 7 +++- internal/include/span_output.h | 37 ++++++++++++++++++ internal/include/uprobe.h | 14 +++---- .../bpf/database/sql/bpf/probe.bpf.c | 14 ++----- .../kafka-go/consumer/bpf/probe.bpf.c | 12 ++---- .../kafka-go/producer/bpf/probe.bpf.c | 30 +++++++------- .../kafka-go/producer/bpf_arm64_bpfel.go | 7 ++-- .../kafka-go/producer/bpf_x86_bpfel.go | 7 ++-- .../segmentio/kafka-go/producer/probe.go | 12 +++--- .../segmentio/kafka-go/producer/probe_test.go | 16 +++++--- .../otel/traceglobal/bpf/probe.bpf.c | 12 ++---- .../grpc/client/bpf/probe.bpf.c | 10 +---- .../grpc/server/bpf/probe.bpf.c | 39 +++++++++++-------- .../grpc/server/bpf_arm64_bpfel.go | 3 ++ .../grpc/server/bpf_x86_bpfel.go | 3 ++ .../bpf/net/http/client/bpf/probe.bpf.c | 7 +--- .../bpf/net/http/server/bpf/probe.bpf.c | 14 ++----- 18 files changed, 135 insertions(+), 112 deletions(-) create mode 100644 internal/include/span_output.h diff --git a/Makefile b/Makefile index f7d0c44cb..c6db15fbb 100644 --- a/Makefile +++ b/Makefile @@ -181,13 +181,14 @@ fixtures/%: kubectl wait --for=condition=Ready --timeout=60s pod/test-opentelemetry-collector-0 kubectl -n default create -f .github/workflows/e2e/k8s/sample-job.yml if kubectl wait --for=condition=Complete --timeout=60s job/sample-job; then \ + rm -f ./internal/test/e2e/$(LIBRARY)/traces-orig.json; \ kubectl cp -c filecp default/test-opentelemetry-collector-0:tmp/trace.json ./internal/test/e2e/$(LIBRARY)/traces-orig.json; \ rm -f ./internal/test/e2e/$(LIBRARY)/traces.json; \ bats ./internal/test/e2e/$(LIBRARY)/verify.bats; \ else \ kubectl logs -l app=sample -c auto-instrumentation; \ fi - kind delete cluster + kind delete cluster .PHONY: prerelease prerelease: | $(MULTIMOD) diff --git a/internal/include/span_context.h b/internal/include/span_context.h index f4087b193..9bc32a6d2 100644 --- a/internal/include/span_context.h +++ b/internal/include/span_context.h @@ -96,9 +96,14 @@ static __always_inline void w3c_string_to_span_context(char *str, struct span_co hex_string_to_bytes(str + trace_flags_start_pos, TRACE_FLAGS_STRING_SIZE, &ctx->TraceFlags); } +static __always_inline bool trace_flags_is_sampled(u8 flags) +{ + return ((flags & FLAG_SAMPLED) == FLAG_SAMPLED); +} + static __always_inline bool is_sampled(struct span_context *ctx) { - return ((ctx->TraceFlags & FLAG_SAMPLED) == FLAG_SAMPLED); + return trace_flags_is_sampled(ctx->TraceFlags); } #endif diff --git a/internal/include/span_output.h b/internal/include/span_output.h new file mode 100644 index 000000000..54404f411 --- /dev/null +++ b/internal/include/span_output.h @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "span_context.h" +#include "common.h" +#include "span_context.h" + +#ifndef _SPAN_OUTPUT_H_ +#define _SPAN_OUTPUT_H_ + +struct +{ + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); +} events SEC(".maps"); + +// Output a record to the perf buffer. If the span context is sampled, the record is outputted. +// Returns 0 on success, negative error code on failure. +static __always_inline long output_span_event(void *ctx, void *data, u64 size, struct span_context *sc) { + bool sampled = (sc != NULL && is_sampled(sc)); + if (sampled) { + return bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, data, size); + } + return 0; +} + +#endif \ No newline at end of file diff --git a/internal/include/uprobe.h b/internal/include/uprobe.h index 967822df0..ce19c028c 100644 --- a/internal/include/uprobe.h +++ b/internal/include/uprobe.h @@ -19,6 +19,7 @@ #include "span_context.h" #include "go_context.h" #include "go_types.h" +#include "span_output.h" #define BASE_SPAN_PROPERTIES \ u64 start_time; \ @@ -38,16 +39,15 @@ SEC("uprobe/##name##") int uprobe_##name##_Returns(struct pt_regs *ctx) { \ void *ctx_address = get_Go_context(ctx, context_pos, context_offset, passed_as_arg); \ void *key = get_consistent_key(ctx, ctx_address); \ - void *req_ptr_map = bpf_map_lookup_elem(&uprobe_context_map, &key); \ - if (req_ptr_map == NULL) { \ + event_type *event = bpf_map_lookup_elem(&uprobe_context_map, &key); \ + if (event == NULL) { \ + bpf_printk("event is NULL in ret probe"); \ return 0; \ } \ - event_type tmpReq = {0}; \ - bpf_probe_read(&tmpReq, sizeof(tmpReq), req_ptr_map); \ - tmpReq.end_time = bpf_ktime_get_ns(); \ - bpf_perf_event_output(ctx, &events_map, BPF_F_CURRENT_CPU, &tmpReq, sizeof(tmpReq)); \ + event->end_time = bpf_ktime_get_ns(); \ + output_span_event(ctx, event, sizeof(event_type), &event->sc); \ bpf_map_delete_elem(&uprobe_context_map, &key); \ - stop_tracking_span(&tmpReq.sc, &tmpReq.psc); \ + stop_tracking_span(&event->sc, &event->psc); \ return 0; \ } diff --git a/internal/pkg/instrumentation/bpf/database/sql/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/database/sql/bpf/probe.bpf.c index fab3d75a8..d48585e42 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/database/sql/bpf/probe.bpf.c @@ -35,10 +35,6 @@ struct { __uint(max_entries, MAX_CONCURRENT); } sql_events SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // Injected in init volatile const bool should_include_db_statement; @@ -68,10 +64,9 @@ int uprobe_queryDC(struct pt_regs *ctx) { if (span_ctx != NULL) { // Set the parent context bpf_probe_read(&sql_request.psc, sizeof(sql_request.psc), span_ctx); - copy_byte_arrays(sql_request.psc.TraceID, sql_request.sc.TraceID, TRACE_ID_SIZE); - generate_random_bytes(sql_request.sc.SpanID, SPAN_ID_SIZE); + get_span_context_from_parent(&sql_request.psc, &sql_request.sc); } else { - sql_request.sc = generate_span_context(); + get_root_span_context(&sql_request.sc); } // Get key @@ -112,10 +107,9 @@ int uprobe_execDC(struct pt_regs *ctx) { if (span_ctx != NULL) { // Set the parent context bpf_probe_read(&sql_request.psc, sizeof(sql_request.psc), span_ctx); - copy_byte_arrays(sql_request.psc.TraceID, sql_request.sc.TraceID, TRACE_ID_SIZE); - generate_random_bytes(sql_request.sc.SpanID, SPAN_ID_SIZE); + get_span_context_from_parent(&sql_request.psc, &sql_request.sc); } else { - sql_request.sc = generate_span_context(); + get_root_span_context(&sql_request.sc); } // Get key diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/bpf/probe.bpf.c index dfc3032b2..85d3a89c5 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/bpf/probe.bpf.c @@ -17,6 +17,7 @@ #include "go_context.h" #include "go_types.h" #include "uprobe.h" +#include "span_output.h" char __license[] SEC("license") = "Dual MIT/GPL"; @@ -73,10 +74,6 @@ struct __uint(max_entries, 1); } parent_span_context_storage_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48 struct kafka_header_t { struct go_string key; @@ -163,7 +160,7 @@ int uprobe_FetchMessage(struct pt_regs *ctx) { get_go_string_from_user_ptr((void *)(reader + reader_config_pos + reader_config_group_id_pos), kafka_request->consumer_group, sizeof(kafka_request->consumer_group)); kafka_request->end_time = bpf_ktime_get_ns(); - bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, kafka_request, sizeof(*kafka_request)); + output_span_event(ctx, kafka_request, sizeof(*kafka_request), &kafka_request->sc); stop_tracking_span(&kafka_request->sc, &kafka_request->psc); bpf_map_delete_elem(&kafka_events, &goroutine); @@ -198,10 +195,9 @@ int uprobe_FetchMessage_Returns(struct pt_regs *ctx) { if (parent_span_ctx != NULL) { // Set the parent context bpf_probe_read(&kafka_request->psc, sizeof(kafka_request->psc), parent_span_ctx); - copy_byte_arrays(kafka_request->psc.TraceID, kafka_request->sc.TraceID, TRACE_ID_SIZE); - generate_random_bytes(kafka_request->sc.SpanID, SPAN_ID_SIZE); + get_span_context_from_parent(parent_span_ctx, &kafka_request->sc); } else { - kafka_request->sc = generate_span_context(); + get_root_span_context(&kafka_request->sc); } // Collecting message attributes diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c index dcebb24a4..dec5e8496 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c @@ -17,6 +17,7 @@ #include "go_context.h" #include "go_types.h" #include "uprobe.h" +#include "span_output.h" char __license[] SEC("license") = "Dual MIT/GPL"; @@ -33,7 +34,7 @@ char __license[] SEC("license") = "Dual MIT/GPL"; #define MAX_KEY_SIZE 256 struct message_attributes_t { - unsigned char SpanID[SPAN_ID_SIZE]; + struct span_context sc; char topic[MAX_TOPIC_SIZE]; char key[MAX_KEY_SIZE]; }; @@ -43,7 +44,6 @@ struct kafka_request_t { u64 start_time; u64 end_time; struct span_context psc; - unsigned char TraceID[TRACE_ID_SIZE]; // attributes per message struct message_attributes_t msgs[MAX_BATCH_SIZE]; char global_topic[MAX_TOPIC_SIZE]; @@ -65,10 +65,6 @@ struct __uint(max_entries, 1); } kafka_request_storage_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48 struct kafka_header_t { struct go_string key; @@ -170,11 +166,9 @@ int uprobe_WriteMessages(struct pt_regs *ctx) { // Get parent if exists struct span_context *parent_span_ctx = get_parent_span_context(context_data_ptr); if (parent_span_ctx != NULL) { - // Set the parent context - bpf_probe_read(&kafka_request->psc, sizeof(kafka_request->psc), parent_span_ctx); - copy_byte_arrays(kafka_request->psc.TraceID, kafka_request->TraceID, TRACE_ID_SIZE); + get_span_context_from_parent(parent_span_ctx, &kafka_request->msgs[0].sc); } else { - generate_random_bytes(kafka_request->TraceID, TRACE_ID_SIZE); + get_root_span_context(&kafka_request->msgs[0].sc); } // Try to get a global topic from Writer @@ -182,7 +176,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) { void *msg_ptr = msgs_array; struct kafka_header_t header = {0}; - struct span_context current_sc = {0}; // This is hack to get the message size. This calculation is based on the following assumptions: // 1. "Time" is the last field in the message struct. This looks to be correct for all the versions according to // https://github.com/segmentio/kafka-go/blob/v0.2.3/message.go#L24C2-L24C6 @@ -191,7 +184,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) { // In the future if more libraries will need to get structs sizes we probably want to have similar // mechanism to the one we have for the offsets u16 msg_size = message_time_pos + 8 + 8 + 8; - __builtin_memcpy(current_sc.TraceID, kafka_request->TraceID, TRACE_ID_SIZE); kafka_request->valid_messages = 0; // Iterate over the messages for (u64 i = 0; i < MAX_BATCH_SIZE; i++) { @@ -201,10 +193,16 @@ int uprobe_WriteMessages(struct pt_regs *ctx) { // Optionally collect the topic, and always collect key collect_kafka_attributes(msg_ptr, &kafka_request->msgs[i], !global_topic); // Generate span id for each message - generate_random_bytes(kafka_request->msgs[i].SpanID, SPAN_ID_SIZE); - __builtin_memcpy(current_sc.SpanID, kafka_request->msgs[i].SpanID, SPAN_ID_SIZE); + if (i > 0) { + generate_random_bytes(kafka_request->msgs[i].sc.SpanID, SPAN_ID_SIZE); + // Copy the trace id and trace flags from the first message. This means the sampling decision is done on the first message, + // and all the messages in the batch will have the same trace id and trace flags. + kafka_request->msgs[i].sc.TraceFlags = kafka_request->msgs[0].sc.TraceFlags; + __builtin_memcpy(kafka_request->msgs[i].sc.TraceID, kafka_request->msgs[0].sc.TraceID, TRACE_ID_SIZE); + } + // Build the header - if (build_contxet_header(&header, ¤t_sc) != 0) { + if (build_contxet_header(&header, &kafka_request->msgs[i].sc) != 0) { bpf_printk("uprobe/WriteMessages: Failed to build header"); return 0; } @@ -235,7 +233,7 @@ int uprobe_WriteMessages_Returns(struct pt_regs *ctx) { } kafka_request->end_time = end_time; - bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, kafka_request, sizeof(*kafka_request)); + output_span_event(ctx, kafka_request, sizeof(*kafka_request), &kafka_request->msgs[0].sc); bpf_map_delete_elem(&kafka_events, &key); // don't need to stop tracking the span, as we don't have a context to propagate locally return 0; diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_arm64_bpfel.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_arm64_bpfel.go index f2b3eef5b..d20c7c88a 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_arm64_bpfel.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_arm64_bpfel.go @@ -16,11 +16,10 @@ type bpfKafkaRequestT struct { StartTime uint64 EndTime uint64 Psc bpfSpanContext - TraceID [16]uint8 Msgs [10]struct { - SpanID [8]uint8 - Topic [256]int8 - Key [256]int8 + Sc bpfSpanContext + Topic [256]int8 + Key [256]int8 } GlobalTopic [256]int8 ValidMessages uint64 diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_x86_bpfel.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_x86_bpfel.go index 71f83ba96..bf5a45abf 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_x86_bpfel.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf_x86_bpfel.go @@ -16,11 +16,10 @@ type bpfKafkaRequestT struct { StartTime uint64 EndTime uint64 Psc bpfSpanContext - TraceID [16]uint8 Msgs [10]struct { - SpanID [8]uint8 - Topic [256]int8 - Key [256]int8 + Sc bpfSpanContext + Topic [256]int8 + Key [256]int8 } GlobalTopic [256]int8 ValidMessages uint64 diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go index 4b4ca64a8..f8cf444fa 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go @@ -77,9 +77,9 @@ func New(logger logr.Logger) probe.Probe { } type messageAttributes struct { - SpaID trace.SpanID - Topic [256]byte - Key [256]byte + SpanContext context.EBPFSpanContext + Topic [256]byte + Key [256]byte } // event represents a batch of kafka messages being sent. @@ -87,8 +87,6 @@ type event struct { StartTime uint64 EndTime uint64 ParentSpanContext context.EBPFSpanContext - // Same trace id for all the batch - TraceID trace.TraceID // Message specific attributes Messages [10]messageAttributes // Global topic for the batch @@ -99,7 +97,7 @@ type event struct { func convertEvent(e *event) []*probe.SpanEvent { tsc := trace.SpanContextConfig{ - TraceID: e.TraceID, + TraceID: e.Messages[0].SpanContext.TraceID, TraceFlags: trace.FlagsSampled, } @@ -130,7 +128,7 @@ func convertEvent(e *event) []*probe.SpanEvent { var res []*probe.SpanEvent var msgTopic string for i := uint64(0); i < e.ValidMessages; i++ { - tsc.SpanID = e.Messages[i].SpaID + tsc.SpanID = e.Messages[i].SpanContext.SpanID sc := trace.NewSpanContext(tsc) key := unix.ByteSliceToString(e.Messages[i].Key[:]) diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go index ac43bf56c..f60711790 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe_test.go @@ -24,6 +24,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" ) @@ -36,21 +37,26 @@ func TestProbeConvertEvent(t *testing.T) { got := convertEvent(&event{ StartTime: uint64(start.UnixNano()), EndTime: uint64(end.UnixNano()), - TraceID: traceID, Messages: [10]messageAttributes{ { // topic1 Topic: [256]byte{0x74, 0x6f, 0x70, 0x69, 0x63, 0x31}, // key1 - Key: [256]byte{0x6b, 0x65, 0x79, 0x31}, - SpaID: trace.SpanID{1}, + Key: [256]byte{0x6b, 0x65, 0x79, 0x31}, + SpanContext: context.EBPFSpanContext{ + TraceID: traceID, + SpanID: trace.SpanID{1}, + }, }, { // topic2 Topic: [256]byte{0x74, 0x6f, 0x70, 0x69, 0x63, 0x32}, // key2 - Key: [256]byte{0x6b, 0x65, 0x79, 0x32}, - SpaID: trace.SpanID{2}, + Key: [256]byte{0x6b, 0x65, 0x79, 0x32}, + SpanContext: context.EBPFSpanContext{ + TraceID: traceID, + SpanID: trace.SpanID{2}, + }, }, }, ValidMessages: 2, diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c index 61aa67beb..d60af651f 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/bpf/probe.bpf.c @@ -18,6 +18,7 @@ #include "go_types.h" #include "uprobe.h" #include "otel_types.h" +#include "span_output.h" char __license[] SEC("license") = "Dual MIT/GPL"; @@ -68,10 +69,6 @@ struct __uint(max_entries, 2); } otel_span_storage_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // Injected in init volatile const u64 tracer_delegate_pos; @@ -147,10 +144,9 @@ int uprobe_Start_Returns(struct pt_regs *ctx) { if (span_ctx != NULL) { // Set the parent context bpf_probe_read(&otel_span->psc, sizeof(otel_span->psc), span_ctx); - copy_byte_arrays(otel_span->psc.TraceID, otel_span->sc.TraceID, TRACE_ID_SIZE); - generate_random_bytes(otel_span->sc.SpanID, SPAN_ID_SIZE); + get_span_context_from_parent(&otel_span->psc, &otel_span->sc); } else { - otel_span->sc = generate_span_context(); + get_root_span_context(&otel_span->sc); } bpf_map_update_elem(&active_spans_by_span_ptr, &span_ptr_val, otel_span, 0); @@ -257,7 +253,7 @@ int uprobe_End(struct pt_regs *ctx) { span->end_time = bpf_ktime_get_ns(); stop_tracking_span(&span->sc, &span->psc); - bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, span, sizeof(*span)); + output_span_event(ctx, span, sizeof(*span), &span->sc); bpf_map_delete_elem(&active_spans_by_span_ptr, &non_recording_span_ptr); return 0; diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/bpf/probe.bpf.c index 48edcde0e..5617133ce 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/bpf/probe.bpf.c @@ -53,11 +53,6 @@ struct __uint(max_entries, MAX_CONCURRENT); } streamid_to_span_contexts SEC(".maps"); -struct -{ - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // Injected in init volatile const u64 clientconn_target_ptr_pos; volatile const u64 httpclient_nextid_pos; @@ -111,12 +106,11 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) if (parent_span_ctx != NULL) { bpf_probe_read(&grpcReq.psc, sizeof(grpcReq.psc), parent_span_ctx); - copy_byte_arrays(grpcReq.psc.TraceID, grpcReq.sc.TraceID, TRACE_ID_SIZE); - generate_random_bytes(grpcReq.sc.SpanID, SPAN_ID_SIZE); + get_span_context_from_parent(parent_span_ctx, &grpcReq.sc); } else { - grpcReq.sc = generate_span_context(); + get_root_span_context(&grpcReq.sc); } // Write event diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c index 0714b0ac4..c8748181b 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c @@ -49,8 +49,11 @@ struct struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(key_size, sizeof(u32)); + __uint(value_size, sizeof(struct grpc_request_t)); + __uint(max_entries, 1); +} grpc_storage_map SEC(".maps"); struct hpack_header_field { @@ -91,31 +94,33 @@ int uprobe_server_handleStream(struct pt_regs *ctx) // Get parent context if exists u32 stream_id = 0; bpf_probe_read(&stream_id, sizeof(stream_id), (void *)(stream_ptr + stream_id_pos)); - void *grpcReq_ptr = bpf_map_lookup_elem(&streamid_to_grpc_events, &stream_id); - struct grpc_request_t grpcReq = {}; - if (grpcReq_ptr != NULL) - { - bpf_probe_read(&grpcReq, sizeof(grpcReq), grpcReq_ptr); + struct grpc_request_t *grpcReq = bpf_map_lookup_elem(&streamid_to_grpc_events, &stream_id); + if (grpcReq == NULL) { + // No parent span context, generate new span context + u32 map_id = 0; + grpcReq = bpf_map_lookup_elem(&grpc_storage_map, &map_id); + if (grpcReq == NULL) { + bpf_printk("failed to get grpcReq from storage map"); + return 0; + } + get_root_span_context(&grpcReq->sc); + } else { + // found parent span context + get_span_context_from_parent(&grpcReq->psc, &grpcReq->sc); bpf_map_delete_elem(&streamid_to_grpc_events, &stream_id); - copy_byte_arrays(grpcReq.psc.TraceID, grpcReq.sc.TraceID, TRACE_ID_SIZE); - generate_random_bytes(grpcReq.sc.SpanID, SPAN_ID_SIZE); - } - else - { - grpcReq.sc = generate_span_context(); } - grpcReq.start_time = bpf_ktime_get_ns(); + grpcReq->start_time = bpf_ktime_get_ns(); // Set attributes - if (!get_go_string_from_user_ptr((void *)(stream_ptr + stream_method_ptr_pos), grpcReq.method, sizeof(grpcReq.method))) + if (!get_go_string_from_user_ptr((void *)(stream_ptr + stream_method_ptr_pos), grpcReq->method, sizeof(grpcReq->method))) { bpf_printk("method write failed, aborting ebpf probe"); return 0; } // Write event - bpf_map_update_elem(&grpc_events, &key, &grpcReq, 0); - start_tracking_span(ctx_iface, &grpcReq.sc); + bpf_map_update_elem(&grpc_events, &key, grpcReq, 0); + start_tracking_span(ctx_iface, &grpcReq->sc); return 0; } diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_arm64_bpfel.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_arm64_bpfel.go index 71c75fa73..d2e6ff7b9 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_arm64_bpfel.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_arm64_bpfel.go @@ -83,6 +83,7 @@ type bpfMapSpecs struct { AllocMap *ebpf.MapSpec `ebpf:"alloc_map"` Events *ebpf.MapSpec `ebpf:"events"` GrpcEvents *ebpf.MapSpec `ebpf:"grpc_events"` + GrpcStorageMap *ebpf.MapSpec `ebpf:"grpc_storage_map"` SliceArrayBuffMap *ebpf.MapSpec `ebpf:"slice_array_buff_map"` StreamidToGrpcEvents *ebpf.MapSpec `ebpf:"streamid_to_grpc_events"` TrackedSpans *ebpf.MapSpec `ebpf:"tracked_spans"` @@ -111,6 +112,7 @@ type bpfMaps struct { AllocMap *ebpf.Map `ebpf:"alloc_map"` Events *ebpf.Map `ebpf:"events"` GrpcEvents *ebpf.Map `ebpf:"grpc_events"` + GrpcStorageMap *ebpf.Map `ebpf:"grpc_storage_map"` SliceArrayBuffMap *ebpf.Map `ebpf:"slice_array_buff_map"` StreamidToGrpcEvents *ebpf.Map `ebpf:"streamid_to_grpc_events"` TrackedSpans *ebpf.Map `ebpf:"tracked_spans"` @@ -122,6 +124,7 @@ func (m *bpfMaps) Close() error { m.AllocMap, m.Events, m.GrpcEvents, + m.GrpcStorageMap, m.SliceArrayBuffMap, m.StreamidToGrpcEvents, m.TrackedSpans, diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_x86_bpfel.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_x86_bpfel.go index 8d4332805..508904aee 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_x86_bpfel.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf_x86_bpfel.go @@ -83,6 +83,7 @@ type bpfMapSpecs struct { AllocMap *ebpf.MapSpec `ebpf:"alloc_map"` Events *ebpf.MapSpec `ebpf:"events"` GrpcEvents *ebpf.MapSpec `ebpf:"grpc_events"` + GrpcStorageMap *ebpf.MapSpec `ebpf:"grpc_storage_map"` SliceArrayBuffMap *ebpf.MapSpec `ebpf:"slice_array_buff_map"` StreamidToGrpcEvents *ebpf.MapSpec `ebpf:"streamid_to_grpc_events"` TrackedSpans *ebpf.MapSpec `ebpf:"tracked_spans"` @@ -111,6 +112,7 @@ type bpfMaps struct { AllocMap *ebpf.Map `ebpf:"alloc_map"` Events *ebpf.Map `ebpf:"events"` GrpcEvents *ebpf.Map `ebpf:"grpc_events"` + GrpcStorageMap *ebpf.Map `ebpf:"grpc_storage_map"` SliceArrayBuffMap *ebpf.Map `ebpf:"slice_array_buff_map"` StreamidToGrpcEvents *ebpf.Map `ebpf:"streamid_to_grpc_events"` TrackedSpans *ebpf.Map `ebpf:"tracked_spans"` @@ -122,6 +124,7 @@ func (m *bpfMaps) Close() error { m.AllocMap, m.Events, m.GrpcEvents, + m.GrpcStorageMap, m.SliceArrayBuffMap, m.StreamidToGrpcEvents, m.TrackedSpans, diff --git a/internal/pkg/instrumentation/bpf/net/http/client/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/net/http/client/bpf/probe.bpf.c index 39dc19452..6ce061fc2 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/net/http/client/bpf/probe.bpf.c @@ -3,6 +3,7 @@ #include "go_context.h" #include "go_types.h" #include "uprobe.h" +#include "span_output.h" char __license[] SEC("license") = "Dual MIT/GPL"; @@ -66,10 +67,6 @@ struct { __uint(max_entries, MAX_CONCURRENT); } http_headers SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // Injected in init volatile const u64 method_ptr_pos; volatile const u64 url_ptr_pos; @@ -239,7 +236,7 @@ int uprobe_Transport_roundTrip_Returns(struct pt_regs *ctx) { http_req_span->end_time = end_time; - bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, http_req_span, sizeof(*http_req_span)); + output_span_event(ctx, http_req_span, sizeof(*http_req_span), &http_req_span->sc); stop_tracking_span(&http_req_span->sc, &http_req_span->psc); bpf_map_delete_elem(&http_events, &key); diff --git a/internal/pkg/instrumentation/bpf/net/http/server/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/net/http/server/bpf/probe.bpf.c index 35390da10..00cc93649 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/net/http/server/bpf/probe.bpf.c @@ -17,6 +17,7 @@ #include "go_context.h" #include "go_types.h" #include "uprobe.h" +#include "span_output.h" char __license[] SEC("license") = "Dual MIT/GPL"; @@ -73,11 +74,6 @@ struct __uint(max_entries, 1); } http_server_uprobe_storage_map SEC(".maps"); -struct -{ - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} events SEC(".maps"); - // Injected in init volatile const u64 method_ptr_pos; volatile const u64 url_ptr_pos; @@ -263,9 +259,6 @@ int uprobe_serverHandler_ServeHTTP_Returns(struct pt_regs *ctx) { } struct http_server_span_t *http_server_span = &uprobe_data->span; - if (!is_sampled(&http_server_span->sc)) { - goto done; - } void *resp_ptr = (void *)uprobe_data->resp_ptr; void *req_ptr = NULL; @@ -292,9 +285,8 @@ int uprobe_serverHandler_ServeHTTP_Returns(struct pt_regs *ctx) { // status code bpf_probe_read(&http_server_span->status_code, sizeof(http_server_span->status_code), (void *)(resp_ptr + status_code_pos)); - bpf_perf_event_output(ctx, &events, BPF_F_CURRENT_CPU, http_server_span, sizeof(*http_server_span)); - -done: + output_span_event(ctx, http_server_span, sizeof(*http_server_span), &http_server_span->sc); + stop_tracking_span(&http_server_span->sc, &http_server_span->psc); bpf_map_delete_elem(&http_server_uprobes, &key); return 0; From 615c69e8d42f42c264a6f8acb4a2a3b09a343da3 Mon Sep 17 00:00:00 2001 From: Ron Federman Date: Sat, 6 Jul 2024 17:00:40 +0300 Subject: [PATCH 2/2] small fixes --- .../github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c | 3 ++- .../bpf/google.golang.org/grpc/server/bpf/probe.bpf.c | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c index dec5e8496..d62c50e55 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/bpf/probe.bpf.c @@ -166,7 +166,8 @@ int uprobe_WriteMessages(struct pt_regs *ctx) { // Get parent if exists struct span_context *parent_span_ctx = get_parent_span_context(context_data_ptr); if (parent_span_ctx != NULL) { - get_span_context_from_parent(parent_span_ctx, &kafka_request->msgs[0].sc); + kafka_request->psc = *parent_span_ctx; + get_span_context_from_parent(&kafka_request->psc, &kafka_request->msgs[0].sc); } else { get_root_span_context(&kafka_request->msgs[0].sc); } diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c index c8748181b..84104fd29 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/bpf/probe.bpf.c @@ -107,20 +107,21 @@ int uprobe_server_handleStream(struct pt_regs *ctx) } else { // found parent span context get_span_context_from_parent(&grpcReq->psc, &grpcReq->sc); - bpf_map_delete_elem(&streamid_to_grpc_events, &stream_id); } grpcReq->start_time = bpf_ktime_get_ns(); // Set attributes if (!get_go_string_from_user_ptr((void *)(stream_ptr + stream_method_ptr_pos), grpcReq->method, sizeof(grpcReq->method))) { - bpf_printk("method write failed, aborting ebpf probe"); - return 0; + bpf_printk("Failed to read gRPC method from stream"); + goto done; } // Write event bpf_map_update_elem(&grpc_events, &key, grpcReq, 0); start_tracking_span(ctx_iface, &grpcReq->sc); +done: + bpf_map_delete_elem(&streamid_to_grpc_events, &stream_id); return 0; }