Skip to content

Commit

Permalink
Support gRPC status for versions >=1.40
Browse files Browse the repository at this point in the history
  • Loading branch information
damemi committed Nov 1, 2024
1 parent 77e8de9 commit fd51eb9
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ volatile const u64 error_status_pos;
volatile const u64 status_s_pos;
volatile const u64 status_code_pos;

volatile const bool write_status_supported;

// This instrumentation attaches uprobe to the following function:
// func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
SEC("uprobe/ClientConn_Invoke")
Expand Down Expand Up @@ -122,6 +124,9 @@ int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
return 0;
}

if(!write_status_supported) {
goto done;
}
// Getting the returned response (error)
// The status code is embedded 3 layers deep:
// Invoke() error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
"strings"

"github.com/cilium/ebpf"
"github.com/hashicorp/go-version"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sys/unix"

"go.opentelemetry.io/auto/internal/pkg/inject"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/context"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/probe"
"go.opentelemetry.io/auto/internal/pkg/instrumentation/utils"
"go.opentelemetry.io/auto/internal/pkg/process"
"go.opentelemetry.io/auto/internal/pkg/structfield"
)

Expand All @@ -29,6 +32,22 @@ const (
pkg = "google.golang.org/grpc"
)

type writeStatusConst struct{}

var writeStatus = false

func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option, error) {
writeStatusVersion := version.Must(version.NewVersion("1.40.0"))
ver, ok := td.Libraries[pkg]
if !ok {
return nil, fmt.Errorf("unknown module version: %s", pkg)
}
if ver.GreaterThanOrEqual(writeStatusVersion) {
writeStatus = true
}
return inject.WithKeyValue("write_status_supported", writeStatus), nil
}

// New returns a new [probe.Probe].
func New(logger *slog.Logger) probe.Probe {
id := probe.ID{
Expand All @@ -41,6 +60,7 @@ func New(logger *slog.Logger) probe.Probe {
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
writeStatusConst{},
probe.StructFieldConst{
Key: "clientconn_target_ptr_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc", "ClientConn", "target"),
Expand Down Expand Up @@ -124,8 +144,6 @@ func convertEvent(e *event) []*probe.SpanEvent {
semconv.RPCServiceKey.String(method),
semconv.ServerAddress(target))

attrs = append(attrs, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))

sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: e.SpanContext.TraceID,
SpanID: e.SpanContext.SpanID,
Expand Down Expand Up @@ -155,8 +173,12 @@ func convertEvent(e *event) []*probe.SpanEvent {
TracerSchema: semconv.SchemaURL,
}

if e.StatusCode > 0 {
event.Status = probe.Status{Code: codes.Error}
if writeStatus {
event.Attributes = append(event.Attributes, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))

if e.StatusCode > 0 {
event.Status = probe.Status{Code: codes.Error}
}
}

return []*probe.SpanEvent{event}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ volatile const bool is_new_frame_pos;
volatile const u64 status_s_pos;
volatile const u64 status_code_pos;

volatile const bool write_status_supported;

static __always_inline long dummy_extract_span_context_from_headers(void *stream_id, struct span_context *parent_span_context) {
return 0;
}
Expand Down Expand Up @@ -175,6 +177,10 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx)
// https://github.com/grpc/grpc-go/blob/bcf9171a20e44ed81a6eb152e3ca9e35b2c02c5d/internal/transport/http2_server.go#L1049
SEC("uprobe/http2Server_WriteStatus")
int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
if(!write_status_supported) {
bpf_printk("status probe not supported for this version of gRPC");
return 0;
}
struct go_iface go_context = {0};
get_Go_context(ctx, 2, stream_ctx_pos, true, &go_context);
void *key = get_consistent_key(ctx, go_context.data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func New(logger *slog.Logger) probe.Probe {
Consts: []probe.Const{
probe.RegistersABIConst{},
probe.AllocationConst{},
writeStatusConst{},
probe.StructFieldConst{
Key: "stream_method_ptr_pos",
Val: structfield.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "method"),
Expand Down Expand Up @@ -112,6 +113,22 @@ func (c framePosConst) InjectOption(td *process.TargetDetails) (inject.Option, e
return inject.WithKeyValue("is_new_frame_pos", ver.GreaterThanOrEqual(paramChangeVer)), nil
}

type writeStatusConst struct{}

var writeStatus = false

func (w writeStatusConst) InjectOption(td *process.TargetDetails) (inject.Option, error) {
writeStatusVersion := version.Must(version.NewVersion("1.40.0"))
ver, ok := td.Libraries[pkg]
if !ok {
return nil, fmt.Errorf("unknown module version: %s", pkg)
}
if ver.GreaterThanOrEqual(writeStatusVersion) {
writeStatus = true
}
return inject.WithKeyValue("write_status_supported", writeStatus), nil
}

// event represents an event in the gRPC server during a gRPC request.
type event struct {
context.BaseSpanProperties
Expand Down Expand Up @@ -141,29 +158,33 @@ func convertEvent(e *event) []*probe.SpanEvent {
pscPtr = nil
}

attrs := []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
semconv.RPCServiceKey.String(method),
}
event := &probe.SpanEvent{
SpanName: method,
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
Attributes: []attribute.KeyValue{
semconv.RPCSystemKey.String("grpc"),
semconv.RPCServiceKey.String(method),
semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)),
},
SpanName: method,
StartTime: utils.BootOffsetToTime(e.StartTime),
EndTime: utils.BootOffsetToTime(e.EndTime),
Attributes: attrs,
ParentSpanContext: pscPtr,
SpanContext: &sc,
TracerSchema: semconv.SchemaURL,
}

// Set server status codes per semconv:
// See https://github.com/open-telemetry/semantic-conventions/blob/02ecf0c71e9fa74d09d81c48e04a132db2b7060b/docs/rpc/grpc.md#grpc-status
if e.StatusCode == int32(codes.Unknown) ||
e.StatusCode == int32(codes.DeadlineExceeded) ||
e.StatusCode == int32(codes.Unimplemented) ||
e.StatusCode == int32(codes.Internal) ||
e.StatusCode == int32(codes.Unavailable) ||
e.StatusCode == int32(codes.DataLoss) {
event.Status = probe.Status{Code: otelcodes.Error}
if writeStatus {
event.Attributes = append(event.Attributes, semconv.RPCGRPCStatusCodeKey.Int(int(e.StatusCode)))
// Set server status codes per semconv:
// See https://github.com/open-telemetry/semantic-conventions/blob/02ecf0c71e9fa74d09d81c48e04a132db2b7060b/docs/rpc/grpc.md#grpc-status
if e.StatusCode == int32(codes.Unknown) ||
e.StatusCode == int32(codes.DeadlineExceeded) ||
e.StatusCode == int32(codes.Unimplemented) ||
e.StatusCode == int32(codes.Internal) ||
e.StatusCode == int32(codes.Unavailable) ||
e.StatusCode == int32(codes.DataLoss) {
event.Status = probe.Status{Code: otelcodes.Error}
}
}

return []*probe.SpanEvent{event}
}

0 comments on commit fd51eb9

Please sign in to comment.