From 539d9e979585b58e9fe1b431d442e52da8697c8e Mon Sep 17 00:00:00 2001 From: Ron Federman Date: Sun, 30 Jun 2024 10:58:53 +0300 Subject: [PATCH] Refactor probe loading to use ebpf.Collection --- .../instrumentation/bpf/database/sql/probe.go | 84 ++-------- .../segmentio/kafka-go/consumer/probe.go | 45 +----- .../segmentio/kafka-go/producer/probe.go | 45 +----- .../otel/traceglobal/probe.go | 136 ++-------------- .../google.golang.org/grpc/client/probe.go | 72 ++------- .../google.golang.org/grpc/server/probe.go | 62 +------ .../bpf/net/http/client/probe.go | 68 +------- .../bpf/net/http/server/probe.go | 44 +---- internal/pkg/instrumentation/probe/probe.go | 152 +++++++++++++----- internal/pkg/instrumentation/utils/ebpf.go | 8 +- 10 files changed, 177 insertions(+), 539 deletions(-) diff --git a/internal/pkg/instrumentation/bpf/database/sql/probe.go b/internal/pkg/instrumentation/bpf/database/sql/probe.go index d6de1f9fa..bbceb7178 100644 --- a/internal/pkg/instrumentation/bpf/database/sql/probe.go +++ b/internal/pkg/instrumentation/bpf/database/sql/probe.go @@ -18,8 +18,6 @@ import ( "os" "strconv" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.21.0" @@ -28,7 +26,6 @@ import ( "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" - "go.opentelemetry.io/auto/internal/pkg/process" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64,arm64 -cc clang -cflags $CFLAGS bpf ./bpf/probe.bpf.c @@ -58,89 +55,26 @@ func New(logger logr.Logger) probe.Probe { Val: shouldIncludeDBStatement(), }, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "database/sql.(*DB).queryDC", - Fn: uprobeQueryDC, - Optional: true, + Sym: "database/sql.(*DB).queryDC", + EntryProbe: "uprobe_queryDC", + ReturnProbe: "uprobe_queryDC_Returns", + Optional: true, }, { - Sym: "database/sql.(*DB).execDC", - Fn: uprobeExecDC, - Optional: true, + Sym: "database/sql.(*DB).execDC", + EntryProbe: "uprobe_execDC", + ReturnProbe: "uprobe_execDC_Returns", + Optional: true, }, }, - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()) - }, SpecFn: loadBpf, ProcessFn: convertEvent, } } -func uprobeQueryDC(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeQueryDC, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeQueryDC_Returns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - -func uprobeExecDC(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeExecDC, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeExecDC_Returns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - // event represents an event in an SQL database // request-response. type event struct { diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go index f7de027ab..c665129d4 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer/probe.go @@ -16,10 +16,7 @@ package consumer import ( "fmt" - "os" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" @@ -28,7 +25,6 @@ import ( "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" - "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/structfield" ) @@ -80,51 +76,18 @@ func New(logger logr.Logger) probe.Probe { Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"), }, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage", - Fn: uprobeFetchMessage, + Sym: "github.com/segmentio/kafka-go.(*Reader).FetchMessage", + EntryProbe: "uprobe_FetchMessage", + ReturnProbe: "uprobe_FetchMessage_Returns", }, }, - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()*100) - }, SpecFn: loadBpf, ProcessFn: convertEvent, } } -func uprobeFetchMessage(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset} - l, err := exec.Uprobe("", obj.UprobeFetchMessage, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeFetchMessageReturns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - // event represents a kafka message received by the consumer. type event struct { context.BaseSpanProperties diff --git a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go index e2620d1e0..4b4ca64a8 100644 --- a/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go +++ b/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer/probe.go @@ -16,10 +16,7 @@ package producer import ( "fmt" - "os" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" @@ -28,7 +25,6 @@ import ( "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" - "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/structfield" ) @@ -68,51 +64,18 @@ func New(logger logr.Logger) probe.Probe { Val: structfield.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"), }, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages", - Fn: uprobeWriteMessages, + Sym: "github.com/segmentio/kafka-go.(*Writer).WriteMessages", + EntryProbe: "uprobe_WriteMessages", + ReturnProbe: "uprobe_WriteMessages_Returns", }, }, - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()*100) - }, SpecFn: loadBpf, ProcessFn: convertEvent, } } -func uprobeWriteMessages(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset} - l, err := exec.Uprobe("", obj.UprobeWriteMessages, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeWriteMessagesReturns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - type messageAttributes struct { SpaID trace.SpanID Topic [256]byte diff --git a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go index f0b090971..78eea4045 100644 --- a/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go +++ b/internal/pkg/instrumentation/bpf/go.opentelemetry.io/otel/traceglobal/probe.go @@ -17,13 +17,10 @@ package global import ( "encoding/binary" "math" - "os" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/structfield" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "golang.org/x/sys/unix" @@ -32,7 +29,6 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" - "go.opentelemetry.io/auto/internal/pkg/process" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64,arm64 -cc clang -cflags $CFLAGS bpf ./bpf/probe.bpf.c @@ -95,139 +91,37 @@ func New(logger logr.Logger) probe.Probe { Val: structfield.NewID("go.opentelemetry.io/otel", "go.opentelemetry.io/otel/internal/global", "tracer", "delegate"), }, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start", - Fn: uprobeTracerStart, + Sym: "go.opentelemetry.io/otel/internal/global.(*tracer).Start", + EntryProbe: "uprobe_Start", + ReturnProbe: "uprobe_Start_Returns", }, { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).End", - Fn: uprobeSpanEnd, + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).End", + EntryProbe: "uprobe_End", }, { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetAttributes", - Fn: uprobeSetAttributes, - Optional: true, + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetAttributes", + EntryProbe: "uprobe_SetAttributes", + Optional: true, }, { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetStatus", - Fn: uprobeSetStatus, - Optional: true, + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetStatus", + EntryProbe: "uprobe_SetStatus", + Optional: true, }, { - Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetName", - Fn: uprobeSpanSetName, - Optional: true, + Sym: "go.opentelemetry.io/otel/internal/global.(*nonRecordingSpan).SetName", + EntryProbe: "uprobe_SetName", + Optional: true, }, }, - - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()*8) - }, SpecFn: loadBpf, ProcessFn: convertEvent, } } -func uprobeTracerStart(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeStart, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeStartReturns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - -func uprobeSetAttributes(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeSetAttributes, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - return links, nil -} - -func uprobeSpanSetName(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeSetName, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - return links, nil -} - -func uprobeSetStatus(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeSetStatus, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - return links, nil -} - -func uprobeSpanEnd(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeEnd, opts) - if err != nil { - return nil, err - } - - links := []link.Link{l} - - return links, nil -} - type attributeKeyVal struct { ValLength uint16 Vtype uint8 diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go index 878ff5f76..6ad4e50ca 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/client/probe.go @@ -16,13 +16,10 @@ package grpc import ( "fmt" - "os" "strconv" "strings" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" @@ -32,7 +29,6 @@ import ( "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" - "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/structfield" ) @@ -72,27 +68,20 @@ func New(logger logr.Logger) probe.Probe { Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "headerFrame", "streamID"), }, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "google.golang.org/grpc.(*ClientConn).Invoke", - Fn: uprobeInvoke, - }, { - Sym: "google.golang.org/grpc/internal/transport.(*http2Client).NewStream", - Fn: func(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - prog := obj.UprobeHttp2ClientNewStream - return uprobeFn(name, exec, target, prog) - }, - }, { - Sym: "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler", - Fn: func(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - prog := obj.UprobeLoopyWriterHeaderHandler - return uprobeFn(name, exec, target, prog) - }, + Sym: "google.golang.org/grpc.(*ClientConn).Invoke", + EntryProbe: "uprobe_ClientConn_Invoke", + ReturnProbe: "uprobe_ClientConn_Invoke_Returns", + }, + { + Sym: "google.golang.org/grpc/internal/transport.(*http2Client).NewStream", + EntryProbe: "uprobe_http2Client_NewStream", + }, + { + Sym: "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler", + EntryProbe: "uprobe_LoopyWriter_HeaderHandler", }, - }, - - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()) }, SpecFn: verifyAndLoadBpf, ProcessFn: convertEvent, @@ -107,43 +96,6 @@ func verifyAndLoadBpf() (*ebpf.CollectionSpec, error) { return loadBpf() } -func uprobeFn(name string, exec *link.Executable, target *process.TargetDetails, prog *ebpf.Program) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", prog, opts) - if err != nil { - return nil, err - } - return []link.Link{l}, nil -} - -func uprobeInvoke(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - links, err := uprobeFn(name, exec, target, obj.UprobeClientConnInvoke) - if err != nil { - return nil, err - } - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeClientConnInvokeReturns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - // event represents an event in the gRPC client during a gRPC request. type event struct { context.BaseSpanProperties diff --git a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go index 51dfae61b..74052564c 100644 --- a/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go +++ b/internal/pkg/instrumentation/bpf/google.golang.org/grpc/server/probe.go @@ -16,10 +16,7 @@ package server import ( "fmt" - "os" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "github.com/hashicorp/go-version" "go.opentelemetry.io/otel/attribute" @@ -75,20 +72,17 @@ func New(logger logr.Logger) probe.Probe { }, framePosConst{}, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "google.golang.org/grpc.(*Server).handleStream", - Fn: uprobeHandleStream, + Sym: "google.golang.org/grpc.(*Server).handleStream", + EntryProbe: "uprobe_server_handleStream", + ReturnProbe: "uprobe_server_handleStream_Returns", }, { - Sym: "google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders", - Fn: uprobeOperateHeaders, + Sym: "google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders", + EntryProbe: "uprobe_http2Server_operateHeader", }, }, - - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()) - }, SpecFn: loadBpf, ProcessFn: convertEvent, } @@ -113,50 +107,6 @@ func (c framePosConst) InjectOption(td *process.TargetDetails) (inject.Option, e return inject.WithKeyValue("is_new_frame_pos", ver.GreaterThanOrEqual(paramChangeVer)), nil } -func uprobeHandleStream(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeServerHandleStream, opts) - if err != nil { - return nil, err - } - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeServerHandleStreamReturns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - -func uprobeOperateHeaders(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeHttp2ServerOperateHeader, opts) - if err != nil { - return nil, err - } - return []link.Link{l}, nil -} - // event represents an event in the gRPC server during a gRPC request. type event struct { context.BaseSpanProperties diff --git a/internal/pkg/instrumentation/bpf/net/http/client/probe.go b/internal/pkg/instrumentation/bpf/net/http/client/probe.go index 90f96b6fc..de0050f21 100644 --- a/internal/pkg/instrumentation/bpf/net/http/client/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/client/probe.go @@ -21,8 +21,6 @@ import ( "strings" "github.com/cilium/ebpf" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -34,7 +32,6 @@ import ( "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" - "go.opentelemetry.io/auto/internal/pkg/process" "go.opentelemetry.io/auto/internal/pkg/structfield" ) @@ -53,10 +50,11 @@ func New(logger logr.Logger) probe.Probe { InstrumentedPkg: pkg, } - uprobes := []probe.Uprobe[bpfObjects]{ + uprobes := []probe.Uprobe{ { - Sym: "net/http.(*Transport).roundTrip", - Fn: uprobeRoundTrip, + Sym: "net/http.(*Transport).roundTrip", + EntryProbe: "uprobe_Transport_roundTrip", + ReturnProbe: "uprobe_Transport_roundTrip_Returns", }, } @@ -64,9 +62,9 @@ func New(logger logr.Logger) probe.Probe { // probe which writes the data in the outgoing buffer. if utils.SupportsContextPropagation() { uprobes = append(uprobes, - probe.Uprobe[bpfObjects]{ - Sym: "net/http.Header.writeSubset", - Fn: uprobeWriteSubset, + probe.Uprobe{ + Sym: "net/http.Header.writeSubset", + EntryProbe: "uprobe_writeSubset", // We mark this probe as dependent on roundTrip, so we don't accidentally // enable this bpf program, if the executable has compiled in writeSubset, // but doesn't have any http roundTrip. @@ -170,10 +168,7 @@ func New(logger logr.Logger) probe.Probe { Val: structfield.NewID("std", "net/url", "URL", "Host"), }, }, - Uprobes: uprobes, - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()) - }, + Uprobes: uprobes, SpecFn: verifyAndLoadBpf, ProcessFn: convertEvent, } @@ -188,53 +183,6 @@ func verifyAndLoadBpf() (*ebpf.CollectionSpec, error) { return loadBpf() } -func uprobeRoundTrip(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeTransportRoundTrip, opts) - if err != nil { - return nil, err - } - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeTransportRoundTripReturns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - -func uprobeWriteSubset(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset} - l, err := exec.Uprobe("", obj.UprobeWriteSubset, opts) - if err != nil { - return nil, err - } - - return []link.Link{l}, nil -} - -// event represents an event in an HTTP server during an HTTP -// request-response. type event struct { context.BaseSpanProperties Host [128]byte diff --git a/internal/pkg/instrumentation/bpf/net/http/server/probe.go b/internal/pkg/instrumentation/bpf/net/http/server/probe.go index dba1a0208..dccc489d2 100644 --- a/internal/pkg/instrumentation/bpf/net/http/server/probe.go +++ b/internal/pkg/instrumentation/bpf/net/http/server/probe.go @@ -15,11 +15,8 @@ package server import ( - "os" "strings" - "github.com/cilium/ebpf/link" - "github.com/cilium/ebpf/perf" "github.com/go-logr/logr" "github.com/hashicorp/go-version" "go.opentelemetry.io/otel/attribute" @@ -114,16 +111,13 @@ func New(logger logr.Logger) probe.Probe { }, patternPathSupportedConst{}, }, - Uprobes: []probe.Uprobe[bpfObjects]{ + Uprobes: []probe.Uprobe{ { - Sym: "net/http.serverHandler.ServeHTTP", - Fn: uprobeServeHTTP, + Sym: "net/http.serverHandler.ServeHTTP", + EntryProbe: "uprobe_serverHandler_ServeHTTP", + ReturnProbe: "uprobe_serverHandler_ServeHTTP_Returns", }, }, - - ReaderFn: func(obj bpfObjects) (*perf.Reader, error) { - return perf.NewReader(obj.Events, os.Getpagesize()) - }, SpecFn: loadBpf, ProcessFn: convertEvent, } @@ -141,36 +135,6 @@ func (c patternPathSupportedConst) InjectOption(td *process.TargetDetails) (inje return inject.WithKeyValue("pattern_path_supported", isPatternPathSupported), nil } -func uprobeServeHTTP(name string, exec *link.Executable, target *process.TargetDetails, obj *bpfObjects) ([]link.Link, error) { - offset, err := target.GetFunctionOffset(name) - if err != nil { - return nil, err - } - - opts := &link.UprobeOptions{Address: offset, PID: target.PID} - l, err := exec.Uprobe("", obj.UprobeServerHandlerServeHTTP, opts) - if err != nil { - return nil, err - } - links := []link.Link{l} - - retOffsets, err := target.GetFunctionReturns(name) - if err != nil { - return nil, err - } - - for _, ret := range retOffsets { - opts := &link.UprobeOptions{Address: ret} - l, err := exec.Uprobe("", obj.UprobeServerHandlerServeHTTP_Returns, opts) - if err != nil { - return nil, err - } - links = append(links, l) - } - - return links, nil -} - // event represents an event in an HTTP server during an HTTP // request-response. type event struct { diff --git a/internal/pkg/instrumentation/probe/probe.go b/internal/pkg/instrumentation/probe/probe.go index f2d0fcdbd..367f74c42 100644 --- a/internal/pkg/instrumentation/probe/probe.go +++ b/internal/pkg/instrumentation/probe/probe.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "os" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" @@ -68,21 +69,28 @@ type Base[BPFObj any, BPFEvent any] struct { Consts []Const // Uprobes is a the collection of eBPF programs that need to be attached to // the target process. - Uprobes []Uprobe[BPFObj] + Uprobes []Uprobe - // ReaderFn is a creation function for a perf.Reader based on the passed - // BPFObj related to the probe. - ReaderFn func(BPFObj) (*perf.Reader, error) // SpecFn is a creation function for an eBPF CollectionSpec related to the // probe. SpecFn func() (*ebpf.CollectionSpec, error) // ProcessFn processes probe events into a uniform Event type. ProcessFn func(*BPFEvent) []*SpanEvent - reader *perf.Reader - closers []io.Closer + reader *perf.Reader + collection *ebpf.Collection + closers []io.Closer } +const ( + // The default size of the perf buffer in pages. + // We will need to make this configurable in the future. + PerfBufferDefaultSizeInPages = 128 + // The default name of the eBPF map used to pass events from the eBPF program + // to userspace. + DefaultBufferMapName = "events" +) + // Manifest returns the Probe's instrumentation Manifest. func (i *Base[BPFObj, BPFEvent]) Manifest() Manifest { structfields := consts(i.Consts).structFields() @@ -107,12 +115,17 @@ func (i *Base[BPFObj, BPFEvent]) Load(exec *link.Executable, td *process.TargetD return err } - obj, err := i.buildObj(exec, td, spec) + i.collection, err = i.buildEBPFCollection(td, spec) if err != nil { return err } - i.reader, err = i.ReaderFn(*obj) + err = i.loadUprobes(exec, td) + if err != nil { + return err + } + + err = i.initReader() if err != nil { return err } @@ -129,7 +142,38 @@ func (i *Base[BPFObj, BPFEvent]) injectConsts(td *process.TargetDetails, spec *e return inject.Constants(spec, opts...) } -func (i *Base[BPFObj, BPFEvent]) buildObj(exec *link.Executable, td *process.TargetDetails, spec *ebpf.CollectionSpec) (*BPFObj, error) { +func (i *Base[BPFObj, BPFEvent]) loadUprobes(exec *link.Executable, td *process.TargetDetails) error { + for _, up := range i.Uprobes { + links, err := up.load(exec, td, i.collection) + if err != nil { + if up.Optional { + i.Logger.Info("failed to attach optional uprobe", "probe", i.ID, "symbol", up.Sym, "error", err) + continue + } + return err + } + for _, l := range links { + i.closers = append(i.closers, l) + } + } + return nil +} + +func (i *Base[BPFObj, BPFEvent]) initReader() error { + buf, ok := i.collection.Maps[DefaultBufferMapName] + if !ok { + return fmt.Errorf("%s map not found", DefaultBufferMapName) + } + var err error + i.reader, err = perf.NewReader(buf, PerfBufferDefaultSizeInPages*os.Getpagesize()) + if err != nil { + return err + } + i.closers = append(i.closers, i.reader) + return nil +} + +func (i *Base[BPFObj, BPFEvent]) buildEBPFCollection(td *process.TargetDetails, spec *ebpf.CollectionSpec) (*ebpf.Collection, error) { obj := new(BPFObj) if c, ok := ((interface{})(obj)).(io.Closer); ok { i.closers = append(i.closers, c) @@ -140,26 +184,12 @@ func (i *Base[BPFObj, BPFEvent]) buildObj(exec *link.Executable, td *process.Tar PinPath: bpffs.PathForTargetApplication(td), }, } - err := utils.LoadEBPFObjects(spec, obj, sOpts) + c, err := utils.InitializeEBPFCollection(spec, sOpts) if err != nil { return nil, err } - for _, up := range i.Uprobes { - links, err := up.Fn(up.Sym, exec, td, obj) - if err != nil { - if up.Optional { - i.Logger.Info("failed to attach optional uprobe", "probe", i.ID, "symbol", up.Sym, "error", err) - continue - } - return nil, err - } - for _, l := range links { - i.closers = append(i.closers, l) - } - } - - return obj, nil + return c, nil } // Run runs the events processing loop. @@ -206,6 +236,9 @@ func (i *Base[BPFObj, BPFEvent]) processRecord(record perf.Record) ([]*SpanEvent // Close stops the Probe. func (i *Base[BPFObj, BPFEvent]) Close() error { + if i.collection != nil { + i.collection.Close() + } var err error for _, c := range i.closers { err = errors.Join(err, c.Close()) @@ -216,28 +249,65 @@ func (i *Base[BPFObj, BPFEvent]) Close() error { return err } -// UprobeFunc is a function that will attach a eBPF program to a perf event -// that fires when the given symbol starts executing in exec. -// -// It is expected the symbol belongs to are shared library and its offset can -// be determined using target. -// -// Losing the reference to the resulting Link (up) will close the Uprobe and -// prevent further execution of prog. The Link must be Closed during program -// shutdown to avoid leaking system resources. -type UprobeFunc[BPFObj any] func(symbol string, exec *link.Executable, target *process.TargetDetails, obj *BPFObj) ([]link.Link, error) - // Uprobe is an eBPF program that is attached in the entry point and/or the return of a function. -type Uprobe[BPFObj any] struct { +type Uprobe struct { // Sym is the symbol name of the function to attach the eBPF program to. Sym string - // Fn is the function that will attach the eBPF program to the function. - Fn UprobeFunc[BPFObj] // Optional is a boolean flag informing if the Uprobe is optional. If the // Uprobe is optional and fails to attach, the error is logged and // processing continues. - Optional bool - DependsOn []string + Optional bool + // EntryProbe is the name of the eBPF program to attach to the entry of the + // function specified by Sym. If EntryProbe is empty, no eBPF program will be attached to the entry of the function. + EntryProbe string + // ReturnProbe is the name of the eBPF program to attach to the return of the + // function specified by Sym. If ReturnProbe is empty, no eBPF program will be attached to the return of the function. + ReturnProbe string + DependsOn []string +} + +func (u *Uprobe) load(exec *link.Executable, target *process.TargetDetails, c *ebpf.Collection) ([]link.Link, error) { + offset, err := target.GetFunctionOffset(u.Sym) + if err != nil { + return nil, err + } + + var links []link.Link + + if u.EntryProbe != "" { + entryProg, ok := c.Programs[u.EntryProbe] + if !ok { + return nil, fmt.Errorf("entry probe %s not found", u.EntryProbe) + } + opts := &link.UprobeOptions{Address: offset, PID: target.PID} + l, err := exec.Uprobe("", entryProg, opts) + if err != nil { + return nil, err + } + links = append(links, l) + } + + if u.ReturnProbe != "" { + retProg, ok := c.Programs[u.ReturnProbe] + if !ok { + return nil, fmt.Errorf("return probe %s not found", u.ReturnProbe) + } + retOffsets, err := target.GetFunctionReturns(u.Sym) + if err != nil { + return nil, err + } + + for _, ret := range retOffsets { + opts := &link.UprobeOptions{Address: ret, PID: target.PID} + l, err := exec.Uprobe("", retProg, opts) + if err != nil { + return nil, err + } + links = append(links, l) + } + } + + return links, nil } // Const is an constant that needs to be injected into an eBPF program. diff --git a/internal/pkg/instrumentation/utils/ebpf.go b/internal/pkg/instrumentation/utils/ebpf.go index dd582222e..1767ab566 100644 --- a/internal/pkg/instrumentation/utils/ebpf.go +++ b/internal/pkg/instrumentation/utils/ebpf.go @@ -28,9 +28,9 @@ const ( showVerifierLogEnvVar = "OTEL_GO_AUTO_SHOW_VERIFIER_LOG" ) -// LoadEBPFObjects loads eBPF objects from the given spec into the given interface. +// InitializeEBPFCollection loads eBPF objects from the given spec and returns a collection corresponding to the spec. // If the environment variable OTEL_GO_AUTO_SHOW_VERIFIER_LOG is set to true, the verifier log will be printed. -func LoadEBPFObjects(spec *ebpf.CollectionSpec, to interface{}, opts *ebpf.CollectionOptions) error { +func InitializeEBPFCollection(spec *ebpf.CollectionSpec, opts *ebpf.CollectionOptions) (*ebpf.Collection, error) { // Getting full verifier log is expensive, so we only do it if the user explicitly asks for it. showVerifierLogs := shouldShowVerifierLogs() if showVerifierLogs { @@ -38,7 +38,7 @@ func LoadEBPFObjects(spec *ebpf.CollectionSpec, to interface{}, opts *ebpf.Colle opts.Programs.LogLevel = ebpf.LogLevelInstruction | ebpf.LogLevelBranch | ebpf.LogLevelStats } - err := spec.LoadAndAssign(to, opts) + c, err := ebpf.NewCollectionWithOptions(spec, *opts) if err != nil && showVerifierLogs { var ve *ebpf.VerifierError if errors.As(err, &ve) { @@ -46,7 +46,7 @@ func LoadEBPFObjects(spec *ebpf.CollectionSpec, to interface{}, opts *ebpf.Colle } } - return err + return c, err } // shouldShowVerifierLogs returns if the user has configured verifier logs to be emitted.