Skip to content

Commit

Permalink
Add integration module for gRPC-go (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
gorexlv authored Mar 16, 2020
1 parent 2fd68d5 commit 9e57090
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 17 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ atlassian-ide-plugin.xml
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

### macOS template
.DS_Store
Expand Down Expand Up @@ -63,4 +63,4 @@ Temporary Items
*.log

# coverage file
coverage.html
coverage.html
78 changes: 78 additions & 0 deletions adapter/grpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package grpc

import (
"context"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"google.golang.org/grpc"
)

// SentinelUnaryClientIntercept returns new grpc.UnaryClientInterceptor instance
func SentinelUnaryClientIntercept(opts ...Option) grpc.UnaryClientInterceptor {
options := evaluateOptions(opts)
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// method as resource name by default
resourceName := method
if options.unaryClientResourceExtract != nil {
resourceName = options.unaryClientResourceExtract(ctx, method, req, cc)
}

entry, err := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if err != nil {
if options.unaryClientBlockFallback != nil {
return options.unaryClientBlockFallback(ctx, method, req, cc, err)
}
return err
}
defer entry.Exit()

return invoker(ctx, method, req, reply, cc, opts...)
}
}

// SentinelStreamClientIntercept returns new grpc.StreamClientInterceptor instance
func SentinelStreamClientIntercept(opts ...Option) grpc.StreamClientInterceptor {
options := evaluateOptions(opts)
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
// method as resource name by default
resourceName := method
if options.streamClientResourceExtract != nil {
resourceName = options.streamClientResourceExtract(ctx, desc, cc, method)
}

entry, err := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if err != nil { // blocked
if options.streamClientBlockFallback != nil {
return options.streamClientBlockFallback(ctx, desc, cc, method, err)
}
return nil, err
}

defer entry.Exit()

return streamer(ctx, desc, cc, method, opts...)
}
}
97 changes: 97 additions & 0 deletions adapter/grpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package grpc

import (
"context"
"errors"
"testing"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

func TestUnaryClientIntercept(t *testing.T) {
const errMsgFake = "fake error"
interceptor := SentinelUnaryClientIntercept()
invoker := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn,
opts ...grpc.CallOption) error {
return errors.New(errMsgFake)
}
method := "/grpc.testing.TestService/UnaryCall"
t.Run("success", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/UnaryCall",
MetricType: flow.QPS,
Count: 1,
ControlBehavior: flow.Reject,
},
})
assert.Nil(t, err)
err = interceptor(nil, method, nil, nil, nil, invoker)
assert.EqualError(t, err, errMsgFake)
t.Run("second fail", func(t *testing.T) {
err = interceptor(nil, method, nil, nil, nil, invoker)
assert.IsType(t, &base.BlockError{}, err)
})
})

t.Run("fail", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/UnaryCall",
MetricType: flow.QPS,
Count: 0,
ControlBehavior: flow.Reject,
},
})
assert.Nil(t, err)
err = interceptor(nil, method, nil, nil, nil, invoker)
assert.IsType(t, &base.BlockError{}, err)
})
}

func TestStreamClientIntercept(t *testing.T) {
const errMsgFake = "fake error"
interceptor := SentinelStreamClientIntercept()
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, errors.New(errMsgFake)
}
method := "/grpc.testing.TestService/StreamingOutputCall"
t.Run("success", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/StreamingOutputCall",
MetricType: flow.QPS,
Count: 1,
ControlBehavior: flow.Reject,
},
})
assert.Nil(t, err)
rep, err := interceptor(nil, nil, nil, method, streamer)
assert.EqualError(t, err, errMsgFake)
assert.Nil(t, rep)
t.Run("second fail", func(t *testing.T) {
rep, err := interceptor(nil, nil, nil, method, streamer)
assert.IsType(t, &base.BlockError{}, err)
assert.Nil(t, rep)
})
})

t.Run("fail", func(t *testing.T) {
var _, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "/grpc.testing.TestService/StreamingOutputCall",
MetricType: flow.QPS,
Count: 0,
ControlBehavior: flow.Reject,
},
})
assert.Nil(t, err)
rep, err := interceptor(nil, nil, nil, method, streamer)
assert.IsType(t, &base.BlockError{}, err)
assert.Nil(t, rep)
})
}
90 changes: 90 additions & 0 deletions adapter/grpc/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package grpc

import (
"context"

"github.com/alibaba/sentinel-golang/core/base"
"google.golang.org/grpc"
)

type (
Option func(*options)

options struct {
unaryClientResourceExtract func(context.Context, string, interface{}, *grpc.ClientConn) string
unaryServerResourceExtract func(context.Context, interface{}, *grpc.UnaryServerInfo) string

streamClientResourceExtract func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string) string
streamServerResourceExtract func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo) string

unaryClientBlockFallback func(context.Context, string, interface{}, *grpc.ClientConn, *base.BlockError) error
unaryServerBlockFallback func(context.Context, interface{}, *grpc.UnaryServerInfo, *base.BlockError) (interface{}, error)

streamClientBlockFallback func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string, *base.BlockError) (grpc.ClientStream, error)
streamServerBlockFallback func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, *base.BlockError) error
}
)

// WithUnaryClientResourceExtractor set unaryClientResourceExtract
func WithUnaryClientResourceExtractor(fn func(context.Context, string, interface{}, *grpc.ClientConn) string) Option {
return func(opts *options) {
opts.unaryClientResourceExtract = fn
}
}

// WithUnaryServerResourceExtractor set unaryServerResourceExtract
func WithUnaryServerResourceExtractor(fn func(context.Context, interface{}, *grpc.UnaryServerInfo) string) Option {
return func(opts *options) {
opts.unaryServerResourceExtract = fn
}
}

// WithStreamClientResourceExtractor set streamClientResourceExtract
func WithStreamClientResourceExtractor(fn func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string) string) Option {
return func(opts *options) {
opts.streamClientResourceExtract = fn
}
}

// WithStreamServerResourceExtractor set streamServerResourceExtract
func WithStreamServerResourceExtractor(fn func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo) string) Option {
return func(opts *options) {
opts.streamServerResourceExtract = fn
}
}

// WithUnaryClientBlockFallback set unaryClientBlockFallback
func WithUnaryClientBlockFallback(fn func(context.Context, string, interface{}, *grpc.ClientConn, *base.BlockError) error) Option {
return func(opts *options) {
opts.unaryClientBlockFallback = fn
}
}

// WithUnaryServerBlockFallback set unaryServerBlockFallback
func WithUnaryServerBlockFallback(fn func(context.Context, interface{}, *grpc.UnaryServerInfo, *base.BlockError) (interface{}, error)) Option {
return func(opts *options) {
opts.unaryServerBlockFallback = fn
}
}

// WithStreamClientBlockFallback set streamClientBlockFallback
func WithStreamClientBlockFallback(fn func(context.Context, *grpc.StreamDesc, *grpc.ClientConn, string, *base.BlockError) (grpc.ClientStream, error)) Option {
return func(opts *options) {
opts.streamClientBlockFallback = fn
}
}

// WithStreamServerBlockFallback set streamServerBlockFallback
func WithStreamServerBlockFallback(fn func(interface{}, grpc.ServerStream, *grpc.StreamServerInfo, *base.BlockError) error) Option {
return func(opts *options) {
opts.streamServerBlockFallback = fn
}
}

func evaluateOptions(opts []Option) *options {
optCopy := &options{}
for _, o := range opts {
o(optCopy)
}
return optCopy
}
69 changes: 69 additions & 0 deletions adapter/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package grpc

import (
"context"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"google.golang.org/grpc"
)

// SentinelUnaryServerIntercept implements gRPC unary server interceptor interface
func SentinelUnaryServerIntercept(opts ...Option) grpc.UnaryServerInterceptor {
options := evaluateOptions(opts)
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// method as resource name by default
resourceName := info.FullMethod
if options.unaryServerResourceExtract != nil {
resourceName = options.unaryServerResourceExtract(ctx, req, info)
}
entry, err := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
)
if err != nil {
if options.unaryServerBlockFallback != nil {
return options.unaryServerBlockFallback(ctx, req, info, err)
}
return nil, err
}
defer entry.Exit()
return handler(ctx, req)
}
}

// SentinelStreamServerIntercept implements gRPC stream server interceptor interface
func SentinelStreamServerIntercept(opts ...Option) grpc.StreamServerInterceptor {
options := evaluateOptions(opts)
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
// method as resource name by default
resourceName := info.FullMethod
if options.streamServerResourceExtract != nil {
resourceName = options.streamServerResourceExtract(srv, ss, info)
}
entry, err := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Inbound),
)
if err != nil { // blocked
if options.streamServerBlockFallback != nil {
return options.streamServerBlockFallback(srv, ss, info, err)
}
return err
}
defer entry.Exit()
return handler(srv, ss)
}
}
Loading

0 comments on commit 9e57090

Please sign in to comment.