Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: gRPC instrument #4

Merged
merged 5 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ go.work
# test output
coverage-diff.html
coverage-diff.xml
coverage.txt
coverage*.*
3 changes: 2 additions & 1 deletion metrics/common/rpc_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package common
const (
RPCSDKGin = "github.com/gin-gonic/gin"
RPCSDKResty = "github.com/go-resty/resty/v2"
RPCSDKGRPC = "google.golang.org/grpc"
)

const (
RPCProtocolHTTP = "http"
RPCProtocolRPC = "rpc"
RPCProtocolGRPC = "grpc"
)
8 changes: 6 additions & 2 deletions metrics/common/rpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
const (
DefaultRPCReceiveRequestMetricName = "apm_rpc_receive_request_duration_seconds"
DefaultRPCSendRequestMetricName = "apm_rpc_send_request_duration_seconds"

DefaultGRPCReceiveMessageTotalName = "apm_grpc_receive_message_total"
DefaultGRPCSendMessageTotalName = "apm_grpc_send_message_total"
)

var (
Expand All @@ -20,7 +23,7 @@ var (
NativeHistogramMinResetDuration: 5 * time.Minute,
NativeHistogramMaxZeroThreshold: 0.05,
NativeHistogramMaxBucketNumber: 20,
}, []string{"sdk", "request_protocol", "endpoint", "rpc_status", "response_code"})
}, []string{"sdk", "request_protocol", "endpoint", "rpc_status_code", "http_status_code"})

DefaultRPCSendRequestMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: DefaultRPCSendRequestMetricName,
Expand All @@ -30,11 +33,12 @@ var (
NativeHistogramMinResetDuration: 5 * time.Minute,
NativeHistogramMaxZeroThreshold: 0.05,
NativeHistogramMaxBucketNumber: 20,
}, []string{"sdk", "request_protocol", "endpoint", "rpc_status", "response_code"})
}, []string{"sdk", "request_protocol", "endpoint", "rpc_status_code", "http_status_code"})
)

func init() {
prometheus.MustRegister(
// common request
DefaultRPCReceiveRequestMetric,
DefaultRPCSendRequestMetric,
)
Expand Down
4 changes: 2 additions & 2 deletions metrics/gin/gin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func NewMetricsMiddleware() gin.HandlerFunc {
"sdk": common.RPCSDKGin,
"request_protocol": common.RPCProtocolHTTP,
"endpoint": endpoint,
"rpc_status": strconv.Itoa(int(grpc.OK)),
"response_code": strconv.Itoa(responseCode),
"rpc_status_code": strconv.Itoa(int(grpc.OK)),
"http_status_code": strconv.Itoa(responseCode),
}).Observe(latency.Seconds())
return
}
Expand Down
Empty file added metrics/grpc/README.md
Empty file.
220 changes: 220 additions & 0 deletions metrics/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package grpc

import (
"context"
"io"
"strconv"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/quwan-sre/observability-go-contrib/metrics/common"
)

func NewUnaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
startTime := time.Now()

defer func() {
latency := time.Now().Sub(startTime)
code := codes.OK

if err != nil {
s, _ := status.FromError(err)
code = s.Code()
}

common.DefaultRPCReceiveRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKGRPC,
"request_protocol": common.RPCProtocolGRPC,
"endpoint": ParseFullMethod(info.FullMethod),
"rpc_status_code": strconv.Itoa(int(code)),
"http_status_code": "0",
}).Observe(latency.Seconds())
}()

resp, err = handler(ctx, req)

return resp, err
}
}

func NewStreamServerInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
err = handler(srv, &wrapServerStream{ss, ParseFullMethod(info.FullMethod)})
return err
}
}

func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
startTime := time.Now()

defer func() {
latency := time.Now().Sub(startTime)
code := codes.OK

if err != nil {
s, _ := status.FromError(err)
code = s.Code()
}

common.DefaultRPCSendRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKGRPC,
"request_protocol": common.RPCProtocolGRPC,
"endpoint": ParseFullMethod(method),
"rpc_status_code": strconv.Itoa(int(code)),
"http_status_code": "0",
}).Observe(latency.Seconds())
}()

err = invoker(ctx, method, req, reply, cc, opts...)
return err
}
}

func NewStreamClientInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
s, err := streamer(ctx, desc, cc, method, opts...)
return &wrapClientStream{
ClientStream: s,
method: ParseFullMethod(method),
}, err
}
}

// wrapClientStream wraps grpc.ClientStream to record each Sent/Recv of message in histogram.
type wrapClientStream struct {
grpc.ClientStream
method string
}

func (w *wrapClientStream) SendMsg(m interface{}) error {
startTime := time.Now()
var err error

defer func() {
latency := time.Now().Sub(startTime)
code := codes.OK

if err != nil && err != io.EOF {
s, _ := status.FromError(err)
code = s.Code()
}

Check warning on line 107 in metrics/grpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

metrics/grpc/grpc.go#L105-L107

Added lines #L105 - L107 were not covered by tests

common.DefaultRPCSendRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKGRPC,
"request_protocol": common.RPCProtocolGRPC,
"endpoint": w.method,
"rpc_status_code": strconv.Itoa(int(code)),
"http_status_code": "0",
}).Observe(latency.Seconds())
}()

err = w.ClientStream.SendMsg(m)
return err
}

func (w *wrapClientStream) RecvMsg(m interface{}) error {
var err error
startTime := time.Now()

defer func() {
latency := time.Now().Sub(startTime)
code := codes.OK

if err != nil && err != io.EOF {
s, _ := status.FromError(err)
code = s.Code()
}

Check warning on line 133 in metrics/grpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

metrics/grpc/grpc.go#L131-L133

Added lines #L131 - L133 were not covered by tests

common.DefaultRPCReceiveRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKGRPC,
"request_protocol": common.RPCProtocolGRPC,
"endpoint": w.method,
"rpc_status_code": strconv.Itoa(int(code)),
"http_status_code": "0",
}).Observe(latency.Seconds())
}()

err = w.ClientStream.RecvMsg(m)
return err
}

// wrapServerStream wraps grpc.ServerStream to record each Sent/Recv of message in histogram.
type wrapServerStream struct {
grpc.ServerStream
method string
}

func (w *wrapServerStream) SendMsg(m interface{}) error {
startTime := time.Now()
var err error

defer func() {
latency := time.Now().Sub(startTime)
code := codes.OK

if err != nil && err != io.EOF {
s, _ := status.FromError(err)
code = s.Code()
}

Check warning on line 165 in metrics/grpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

metrics/grpc/grpc.go#L163-L165

Added lines #L163 - L165 were not covered by tests

common.DefaultRPCSendRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKGRPC,
"request_protocol": common.RPCProtocolGRPC,
"endpoint": w.method,
"rpc_status_code": strconv.Itoa(int(code)),
"http_status_code": "0",
}).Observe(latency.Seconds())
}()

err = w.ServerStream.SendMsg(m)
return err
}

func (w *wrapServerStream) RecvMsg(m interface{}) error {
var err error
startTime := time.Now()

defer func() {
latency := time.Now().Sub(startTime)
code := codes.OK

if err != nil && err != io.EOF {
s, _ := status.FromError(err)
code = s.Code()
}

Check warning on line 191 in metrics/grpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

metrics/grpc/grpc.go#L189-L191

Added lines #L189 - L191 were not covered by tests

common.DefaultRPCReceiveRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKGRPC,
"request_protocol": common.RPCProtocolGRPC,
"endpoint": w.method,
"rpc_status_code": strconv.Itoa(int(code)),
"http_status_code": "0",
}).Observe(latency.Seconds())
}()

err = w.ServerStream.RecvMsg(m)
return err
}

// ParseFullMethod returns a "service/method" as endpoint following the OpenTelemetry semantic
// conventions.
//
// Parsing is consistent with grpc-go implementation:
// https://github.com/grpc/grpc-go/blob/v1.57.0/internal/grpcutil/method.go#L26-L39
func ParseFullMethod(fullMethod string) string {
if !strings.HasPrefix(fullMethod, "/") {
// Invalid format, does not follow `/package.service/method`.
return fullMethod
}

Check warning on line 215 in metrics/grpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

metrics/grpc/grpc.go#L213-L215

Added lines #L213 - L215 were not covered by tests
name := fullMethod[1:]

// return in service/method format
return name
}
6 changes: 3 additions & 3 deletions metrics/resty/resty.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ func NewAfterResponse() func(c *resty.Client, r *resty.Response) error {
latency := time.Now().Sub(t)
endpoint := ""
if req.RawRequest != nil && req.RawRequest.URL != nil {
endpoint = req.RawRequest.URL.Path
endpoint = req.RawRequest.URL.Host + req.RawRequest.URL.Path
}

common.DefaultRPCSendRequestMetric.With(prometheus.Labels{
"sdk": common.RPCSDKResty,
"request_protocol": common.RPCProtocolHTTP,
"endpoint": endpoint,
"rpc_status": strconv.Itoa(int(grpc.OK)),
"response_code": strconv.Itoa(r.StatusCode()),
"rpc_status_code": strconv.Itoa(int(grpc.OK)),
"http_status_code": strconv.Itoa(r.StatusCode()),
}).Observe(latency.Seconds())
return nil
}
Expand Down
Loading