forked from grpc-ecosystem/go-grpc-middleware
-
Notifications
You must be signed in to change notification settings - Fork 0
/
payload_interceptors.go
144 lines (128 loc) · 5.62 KB
/
payload_interceptors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package grpc_logrus
import (
"bytes"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/tags/logrus"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
var (
// JsonPbMarshaller is the marshaller used for serializing protobuf messages.
JsonPbMarshaller = &jsonpb.Marshaler{}
)
// PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests.
//
// This *only* works when placed *after* the `grpc_logrus.UnaryServerInterceptor`. However, the logging can be done to a
// separate instance of the logger.
func PayloadUnaryServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !decider(ctx, info.FullMethod, info.Server) {
return handler(ctx, req)
}
// Use the provided logrus.Entry for logging but use the fields from context.
logEntry := entry.WithFields(ctx_logrus.Extract(ctx).Data)
logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
resp, err := handler(ctx, req)
if err == nil {
logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.request.content field")
}
return resp, err
}
}
// PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests.
//
// This *only* works when placed *after* the `grpc_logrus.StreamServerInterceptor`. However, the logging can be done to a
// separate instance of the logger.
func PayloadStreamServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if !decider(stream.Context(), info.FullMethod, srv) {
return handler(srv, stream)
}
// Use the provided logrus.Entry for logging but use the fields from context.
logEntry := entry.WithFields(Extract(stream.Context()).Data)
newStream := &loggingServerStream{ServerStream: stream, entry: logEntry}
return handler(srv, newStream)
}
}
// PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the paylods of requests and responses.
func PayloadUnaryClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if !decider(ctx, method) {
return invoker(ctx, method, req, reply, cc, opts...)
}
logEntry := entry.WithFields(newClientLoggerFields(ctx, method))
logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
err := invoker(ctx, method, req, reply, cc, opts...)
if err == nil {
logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
}
return err
}
}
// PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the paylods of requests and responses.
func PayloadStreamClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if !decider(ctx, method) {
return streamer(ctx, desc, cc, method, opts...)
}
logEntry := entry.WithFields(newClientLoggerFields(ctx, method))
clientStream, err := streamer(ctx, desc, cc, method, opts...)
newStream := &loggingClientStream{ClientStream: clientStream, entry: logEntry}
return newStream, err
}
}
type loggingClientStream struct {
grpc.ClientStream
entry *logrus.Entry
}
func (l *loggingClientStream) SendMsg(m interface{}) error {
err := l.ClientStream.SendMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
}
return err
}
func (l *loggingClientStream) RecvMsg(m interface{}) error {
err := l.ClientStream.RecvMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
}
return err
}
type loggingServerStream struct {
grpc.ServerStream
entry *logrus.Entry
}
func (l *loggingServerStream) SendMsg(m interface{}) error {
err := l.ServerStream.SendMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
}
return err
}
func (l *loggingServerStream) RecvMsg(m interface{}) error {
err := l.ServerStream.RecvMsg(m)
if err == nil {
logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
}
return err
}
func logProtoMessageAsJson(entry *logrus.Entry, pbMsg interface{}, key string, msg string) {
if p, ok := pbMsg.(proto.Message); ok {
entry.WithField(key, &jsonpbMarshalleble{p}).Info(msg)
}
}
type jsonpbMarshalleble struct {
proto.Message
}
func (j *jsonpbMarshalleble) MarshalJSON() ([]byte, error) {
b := &bytes.Buffer{}
if err := JsonPbMarshaller.Marshal(b, j.Message); err != nil {
return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
}
return b.Bytes(), nil
}