Skip to content

Commit

Permalink
feat(admin): add otelgrpc metric interceptor (#509)
Browse files Browse the repository at this point in the history
### What this PR does

This change adds a new gRPC interceptor, pkg/rpc/interceptors/otelgrpc. Currently, it only supports UnaryServerInterceptor. It follows [OTel 1.22.0 specification](https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/) except for the unit of rpc.server.duration, which converts from milliseconds to microseconds for high resolution.
  • Loading branch information
ijsong committed Jul 28, 2023
2 parents e9b46ad + d9ca9aa commit db7a1a2
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 20 deletions.
3 changes: 3 additions & 0 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (

"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/pkg/rpc/interceptors/logging"
"github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/netutil"
"github.com/kakao/varlog/pkg/util/telemetry"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/admpb"
"github.com/kakao/varlog/proto/mrpb"
Expand Down Expand Up @@ -72,6 +74,7 @@ func New(ctx context.Context, opts ...Option) (*Admin, error) {
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
logging.UnaryServerInterceptor(cfg.logger),
otelgrpc.UnaryServerInterceptor(telemetry.GetGlobalMeterProvider()),
),
)

Expand Down
46 changes: 46 additions & 0 deletions pkg/rpc/interceptors/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package interceptors

import (
"context"
"net"
"strings"

"google.golang.org/grpc/peer"
)

func PeerAddress(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return ""
}

host, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return ""
}

if host == "" {
return "127.0.0.1"
}
return host
}

// ParseFullMethod returns service and method extracted from gRPC full method
// string, i.e., /package.service/method.
func ParseFullMethod(fullMethod string) (service, method string) {
name, found := strings.CutPrefix(fullMethod, "/")
if !found {
// Invalid format, does not follow `/package.service/method`.
return "", ""
}
service, method, found = strings.Cut(name, "/")
if !found {
// Invalid format, does not follow `/package.service/method`.
return "", ""
}
if strings.Contains(method, "/") {
// Invalid format, does not follow `/package.service/method`.
return "", ""
}
return service, method
}
55 changes: 55 additions & 0 deletions pkg/rpc/interceptors/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package interceptors

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestParseFullMethod(t *testing.T) {
tcs := []struct {
fullMethod string
service string
method string
}{
{
fullMethod: "/varlog.admpb.ClusterManager/ListStorageNodes",
service: "varlog.admpb.ClusterManager",
method: "ListStorageNodes",
},
{
fullMethod: "varlog.admpb.ClusterManager/OuterMethod/InnerMethod",
service: "",
method: "",
},
{
fullMethod: "/varlog.admpb.ClusterManager/OuterMethod/InnerMethod",
service: "",
method: "",
},
{
fullMethod: "varlog.admpb.ClusterManager/ListStorageNodes",
service: "",
method: "",
},
{
fullMethod: "/varlog.admpb.ClusterManager",
service: "",
method: "",
},
{
fullMethod: "ListStorageNodes",
service: "",
method: "",
},
}

for _, tc := range tcs {
tc := tc
t.Run(tc.fullMethod, func(t *testing.T) {
service, method := ParseFullMethod(tc.fullMethod)
require.Equal(t, tc.service, service)
require.Equal(t, tc.method, method)
})
}
}
23 changes: 3 additions & 20 deletions pkg/rpc/interceptors/logging/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package logging
import (
"context"
"fmt"
"net"
"time"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/rpc/interceptors"
)

// UnaryServerInterceptor returns a new unary server interceptor that logs a
Expand All @@ -31,27 +31,10 @@ func UnaryServerInterceptor(logger *zap.Logger) grpc.UnaryServerInterceptor {
zap.Duration("duration", duration),
zap.Stringer("request", req.(fmt.Stringer)),
zap.Stringer("response", resp.(fmt.Stringer)),
zap.String("peer", peerAddr(ctx)),
zap.String("peer", interceptors.PeerAddress(ctx)),
zap.Error(err),
)
}
return resp, err
}
}

func peerAddr(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
return ""
}

host, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return ""
}

if host == "" {
return "127.0.0.1"
}
return host
}
66 changes: 66 additions & 0 deletions pkg/rpc/interceptors/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package otelgrpc

import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/rpc/interceptors"
)

const (
instrumentationName = "github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc"
)

// UnaryServerInterceptor returns a new unary server interceptor that records
// OpenTelemetry metrics for gRPC.
// It follows [OTel 1.22.0](https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/)
// specification except for the unit of rpc.server.duration, which converts
// from milliseconds to microseconds for high resolution.
func UnaryServerInterceptor(meterProvider metric.MeterProvider) grpc.UnaryServerInterceptor {
meter := meterProvider.Meter(
instrumentationName,
metric.WithSchemaURL(semconv.SchemaURL),
)
rpcServerDuration, err := meter.Int64Histogram(
"rpc.server.duration",
metric.WithDescription("measures duration of inbound RPC in microseconds"),
metric.WithUnit("us"),
)
if err != nil {
otel.Handle(err)
}

return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
code := codes.OK

defer func(start time.Time) {
elapsedTime := time.Since(start) / time.Microsecond

attrs := make([]attribute.KeyValue, 0, 4)
attrs = append(attrs, semconv.RPCSystemGRPC, semconv.RPCGRPCStatusCodeKey.Int64(int64(code)))
service, method := interceptors.ParseFullMethod(info.FullMethod)
if service != "" {
attrs = append(attrs, semconv.RPCServiceKey.String(service))
}
if method != "" {
attrs = append(attrs, semconv.RPCMethodKey.String(method))
}

rpcServerDuration.Record(ctx, int64(elapsedTime), metric.WithAttributes(attrs...))
}(time.Now())

resp, err = handler(ctx, req)
if err != nil {
code = status.Code(err)
}
return resp, err
}
}

0 comments on commit db7a1a2

Please sign in to comment.