-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(admin): add otelgrpc metric interceptor
This change adds a new gRPC interceptor, pkg/rpc/interceptors/otelgrpc. Currently, it only supports UnaryServerInterceptor. It follows [OTel 1.22.0 specification](https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/) except for the unit of rpc.server.duration, which converts from milliseconds to microseconds for high resolution.
- Loading branch information
Showing
5 changed files
with
173 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package interceptors | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"strings" | ||
|
||
"google.golang.org/grpc/peer" | ||
) | ||
|
||
func PeerAddress(ctx context.Context) string { | ||
p, ok := peer.FromContext(ctx) | ||
if !ok { | ||
return "" | ||
} | ||
|
||
host, _, err := net.SplitHostPort(p.Addr.String()) | ||
if err != nil { | ||
return "" | ||
} | ||
|
||
if host == "" { | ||
return "127.0.0.1" | ||
} | ||
return host | ||
} | ||
|
||
// ParseFullMethod returns service and method extracted from gRPC full method | ||
// string, i.e., /package.service/method. | ||
func ParseFullMethod(fullMethod string) (service, method string) { | ||
name, found := strings.CutPrefix(fullMethod, "/") | ||
if !found { | ||
// Invalid format, does not follow `/package.service/method`. | ||
return "", "" | ||
} | ||
service, method, found = strings.Cut(name, "/") | ||
if !found { | ||
// Invalid format, does not follow `/package.service/method`. | ||
return "", "" | ||
} | ||
if strings.Contains(method, "/") { | ||
// Invalid format, does not follow `/package.service/method`. | ||
return "", "" | ||
} | ||
return service, method | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package interceptors | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestParseFullMethod(t *testing.T) { | ||
tcs := []struct { | ||
fullMethod string | ||
service string | ||
method string | ||
}{ | ||
{ | ||
fullMethod: "/varlog.admpb.ClusterManager/ListStorageNodes", | ||
service: "varlog.admpb.ClusterManager", | ||
method: "ListStorageNodes", | ||
}, | ||
{ | ||
fullMethod: "varlog.admpb.ClusterManager/OuterMethod/InnerMethod", | ||
service: "", | ||
method: "", | ||
}, | ||
{ | ||
fullMethod: "/varlog.admpb.ClusterManager/OuterMethod/InnerMethod", | ||
service: "", | ||
method: "", | ||
}, | ||
{ | ||
fullMethod: "varlog.admpb.ClusterManager/ListStorageNodes", | ||
service: "", | ||
method: "", | ||
}, | ||
{ | ||
fullMethod: "/varlog.admpb.ClusterManager", | ||
service: "", | ||
method: "", | ||
}, | ||
{ | ||
fullMethod: "ListStorageNodes", | ||
service: "", | ||
method: "", | ||
}, | ||
} | ||
|
||
for _, tc := range tcs { | ||
tc := tc | ||
t.Run(tc.fullMethod, func(t *testing.T) { | ||
service, method := ParseFullMethod(tc.fullMethod) | ||
require.Equal(t, tc.service, service) | ||
require.Equal(t, tc.method, method) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package otelgrpc | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"go.opentelemetry.io/otel" | ||
"go.opentelemetry.io/otel/attribute" | ||
"go.opentelemetry.io/otel/metric" | ||
semconv "go.opentelemetry.io/otel/semconv/v1.20.0" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
|
||
"github.com/kakao/varlog/pkg/rpc/interceptors" | ||
) | ||
|
||
const ( | ||
instrumentationName = "github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc" | ||
) | ||
|
||
// UnaryServerInterceptor returns a new unary server interceptor that records | ||
// OpenTelemetry metrics for gRPC. | ||
// It follows [OTel 1.22.0](https://opentelemetry.io/docs/specs/otel/metrics/semantic_conventions/rpc-metrics/) | ||
// specification except for the unit of rpc.server.duration, which converts | ||
// from milliseconds to microseconds for high resolution. | ||
func UnaryServerInterceptor(meterProvider metric.MeterProvider) grpc.UnaryServerInterceptor { | ||
meter := meterProvider.Meter( | ||
instrumentationName, | ||
metric.WithSchemaURL(semconv.SchemaURL), | ||
) | ||
rpcServerDuration, err := meter.Int64Histogram( | ||
"rpc.server.duration", | ||
metric.WithDescription("measures duration of inbound RPC in microseconds"), | ||
metric.WithUnit("us"), | ||
) | ||
if err != nil { | ||
otel.Handle(err) | ||
} | ||
|
||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { | ||
code := codes.OK | ||
|
||
defer func(start time.Time) { | ||
elapsedTime := time.Since(start) / time.Microsecond | ||
|
||
attrs := make([]attribute.KeyValue, 0, 4) | ||
attrs = append(attrs, semconv.RPCSystemGRPC, semconv.RPCGRPCStatusCodeKey.Int64(int64(code))) | ||
service, method := interceptors.ParseFullMethod(info.FullMethod) | ||
if service != "" { | ||
attrs = append(attrs, semconv.RPCServiceKey.String(service)) | ||
} | ||
if method != "" { | ||
attrs = append(attrs, semconv.RPCMethodKey.String(method)) | ||
} | ||
|
||
rpcServerDuration.Record(ctx, int64(elapsedTime), metric.WithAttributes(attrs...)) | ||
}(time.Now()) | ||
|
||
resp, err = handler(ctx, req) | ||
if err != nil { | ||
code = status.Code(err) | ||
} | ||
return resp, err | ||
} | ||
} |