diff --git a/internal/admin/admin.go b/internal/admin/admin.go index 413aac41a..9d24e48ce 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -19,8 +19,10 @@ import ( "github.com/kakao/varlog/internal/admin/snwatcher" "github.com/kakao/varlog/pkg/rpc/interceptors/logging" + "github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/netutil" + "github.com/kakao/varlog/pkg/util/telemetry" "github.com/kakao/varlog/pkg/verrors" "github.com/kakao/varlog/proto/admpb" "github.com/kakao/varlog/proto/mrpb" @@ -72,6 +74,7 @@ func New(ctx context.Context, opts ...Option) (*Admin, error) { grpcServer := grpc.NewServer( grpc.ChainUnaryInterceptor( logging.UnaryServerInterceptor(cfg.logger), + otelgrpc.UnaryServerInterceptor(telemetry.GetGlobalMeterProvider()), ), ) diff --git a/pkg/rpc/interceptors/context.go b/pkg/rpc/interceptors/context.go new file mode 100644 index 000000000..c99839f74 --- /dev/null +++ b/pkg/rpc/interceptors/context.go @@ -0,0 +1,46 @@ +package interceptors + +import ( + "context" + "net" + "strings" + + "google.golang.org/grpc/peer" +) + +func PeerAddress(ctx context.Context) string { + p, ok := peer.FromContext(ctx) + if !ok { + return "" + } + + host, _, err := net.SplitHostPort(p.Addr.String()) + if err != nil { + return "" + } + + if host == "" { + return "127.0.0.1" + } + return host +} + +// ParseFullMethod returns service and method extracted from gRPC full method +// string, i.e., /package.service/method. +func ParseFullMethod(fullMethod string) (service, method string) { + name, found := strings.CutPrefix(fullMethod, "/") + if !found { + // Invalid format, does not follow `/package.service/method`. + return "", "" + } + service, method, found = strings.Cut(name, "/") + if !found { + // Invalid format, does not follow `/package.service/method`. + return "", "" + } + if strings.Contains(method, "/") { + // Invalid format, does not follow `/package.service/method`. + return "", "" + } + return service, method +} diff --git a/pkg/rpc/interceptors/context_test.go b/pkg/rpc/interceptors/context_test.go new file mode 100644 index 000000000..5c8acb499 --- /dev/null +++ b/pkg/rpc/interceptors/context_test.go @@ -0,0 +1,55 @@ +package interceptors + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseFullMethod(t *testing.T) { + tcs := []struct { + fullMethod string + service string + method string + }{ + { + fullMethod: "/varlog.admpb.ClusterManager/ListStorageNodes", + service: "varlog.admpb.ClusterManager", + method: "ListStorageNodes", + }, + { + fullMethod: "varlog.admpb.ClusterManager/OuterMethod/InnerMethod", + service: "", + method: "", + }, + { + fullMethod: "/varlog.admpb.ClusterManager/OuterMethod/InnerMethod", + service: "", + method: "", + }, + { + fullMethod: "varlog.admpb.ClusterManager/ListStorageNodes", + service: "", + method: "", + }, + { + fullMethod: "/varlog.admpb.ClusterManager", + service: "", + method: "", + }, + { + fullMethod: "ListStorageNodes", + service: "", + method: "", + }, + } + + for _, tc := range tcs { + tc := tc + t.Run(tc.fullMethod, func(t *testing.T) { + service, method := ParseFullMethod(tc.fullMethod) + require.Equal(t, tc.service, service) + require.Equal(t, tc.method, method) + }) + } +} diff --git a/pkg/rpc/interceptors/logging/interceptor.go b/pkg/rpc/interceptors/logging/interceptor.go index 2137aa4fb..a81e9a88e 100644 --- a/pkg/rpc/interceptors/logging/interceptor.go +++ b/pkg/rpc/interceptors/logging/interceptor.go @@ -3,13 +3,13 @@ package logging import ( "context" "fmt" - "net" "time" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/peer" "google.golang.org/grpc/status" + + "github.com/kakao/varlog/pkg/rpc/interceptors" ) // UnaryServerInterceptor returns a new unary server interceptor that logs a @@ -31,27 +31,10 @@ func UnaryServerInterceptor(logger *zap.Logger) grpc.UnaryServerInterceptor { zap.Duration("duration", duration), zap.Stringer("request", req.(fmt.Stringer)), zap.Stringer("response", resp.(fmt.Stringer)), - zap.String("peer", peerAddr(ctx)), + zap.String("peer", interceptors.PeerAddress(ctx)), zap.Error(err), ) } return resp, err } } - -func peerAddr(ctx context.Context) string { - p, ok := peer.FromContext(ctx) - if !ok { - return "" - } - - host, _, err := net.SplitHostPort(p.Addr.String()) - if err != nil { - return "" - } - - if host == "" { - return "127.0.0.1" - } - return host -} diff --git a/pkg/rpc/interceptors/otelgrpc/interceptor.go b/pkg/rpc/interceptors/otelgrpc/interceptor.go new file mode 100644 index 000000000..970491f66 --- /dev/null +++ b/pkg/rpc/interceptors/otelgrpc/interceptor.go @@ -0,0 +1,66 @@ +package otelgrpc + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/kakao/varlog/pkg/rpc/interceptors" +) + +const ( + instrumentationName = "github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc" +) + +// UnaryServerInterceptor returns a new unary server interceptor that records +// OpenTelemetry metrics for gRPC. +// It follows [OTel 1.22.0](https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/) +// specification except for the unit of rpc.server.duration, which converts +// from milliseconds to microseconds for high resolution. +func UnaryServerInterceptor(meterProvider metric.MeterProvider) grpc.UnaryServerInterceptor { + meter := meterProvider.Meter( + instrumentationName, + metric.WithSchemaURL(semconv.SchemaURL), + ) + rpcServerDuration, err := meter.Int64Histogram( + "rpc.server.duration", + metric.WithDescription("measures duration of inbound RPC in microseconds"), + metric.WithUnit("us"), + ) + if err != nil { + otel.Handle(err) + } + + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + code := codes.OK + + defer func(start time.Time) { + elapsedTime := time.Since(start) / time.Microsecond + + attrs := make([]attribute.KeyValue, 0, 4) + attrs = append(attrs, semconv.RPCSystemGRPC, semconv.RPCGRPCStatusCodeKey.Int64(int64(code))) + service, method := interceptors.ParseFullMethod(info.FullMethod) + if service != "" { + attrs = append(attrs, semconv.RPCServiceKey.String(service)) + } + if method != "" { + attrs = append(attrs, semconv.RPCMethodKey.String(method)) + } + + rpcServerDuration.Record(ctx, int64(elapsedTime), metric.WithAttributes(attrs...)) + }(time.Now()) + + resp, err = handler(ctx, req) + if err != nil { + code = status.Code(err) + } + return resp, err + } +}