Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Modify grpc gateway to be concurrent #11234

Merged
merged 5 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (gov) [\#11036](https://github.com/cosmos/cosmos-sdk/pull/11036) Add in-place migrations for 0.43->0.46. Add a `migrate v0.46` CLI command for v0.43->0.46 JSON genesis migration.
* [\#11006](https://github.com/cosmos/cosmos-sdk/pull/11006) Add `debug pubkey-raw` command to allow inspecting of pubkeys in legacy bech32 format
* (x/authz) [\#10714](https://github.com/cosmos/cosmos-sdk/pull/10714) Add support for pruning expired authorizations
* [\#11234](https://github.com/cosmos/cosmos-sdk/pull/11234) Add `GRPCClient` field to Client Context. If `GRPCClient` field is set to nil, the `Invoke` method would use ABCI query, otherwise use gprc.

### API Breaking Changes

Expand Down
10 changes: 10 additions & 0 deletions client/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"sigs.k8s.io/yaml"

"google.golang.org/grpc"

Comment on lines +12 to +13
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"google.golang.org/grpc"
"google.golang.org/grpc"

"github.com/gogo/protobuf/proto"
rpcclient "github.com/tendermint/tendermint/rpc/client"

Expand All @@ -23,6 +25,7 @@ import (
type Context struct {
FromAddress sdk.AccAddress
Client rpcclient.Client
GRPCClient *grpc.ClientConn
ChainID string
Codec codec.Codec
InterfaceRegistry codectypes.InterfaceRegistry
Expand Down Expand Up @@ -128,6 +131,13 @@ func (ctx Context) WithClient(client rpcclient.Client) Context {
return ctx
}

// WithGRPCClient returns a copy of the context with an updated GRPC client
// instance.
func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context {
ctx.GRPCClient = grpcClient
return ctx
}

// WithUseLedger returns a copy of the context with an updated UseLedger flag.
func (ctx Context) WithUseLedger(useLedger bool) Context {
ctx.UseLedger = useLedger
Expand Down
93 changes: 51 additions & 42 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ var fallBackCodec = codec.NewProtoCodec(failingInterfaceRegistry{})
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.
// 2-1. or we are querying for state, in which case we call ABCI's Query if grpc client not set.
// 2-2. or we are querying for state, in which case we call grpc if grpc client set.

// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
Expand All @@ -55,56 +56,64 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i
return err
}

reqBz, err := ctx.gRPCCodec().Marshal(req)
if err != nil {
return err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if ctx.GRPCClient == nil {
// Case 2-1. Querying state via abci query.
reqBz, err := ctx.gRPCCodec().Marshal(req)
if err != nil {
return err
}
if height < 0 {
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
}
// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
}
if height < 0 {
return sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

res, err := ctx.QueryABCI(abciReq)
if err != nil {
return err
}
abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
Height: ctx.Height,
}

err = ctx.gRPCCodec().Unmarshal(res.Value, reply)
if err != nil {
return err
}
res, err := ctx.QueryABCI(abciReq)
if err != nil {
return err
}

// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
err = ctx.gRPCCodec().Unmarshal(res.Value, reply)
if err != nil {
return err
}

*header.HeaderAddr = md
// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = md
}
} else {
// Case 2-2. Invoke grpc.
if err = ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...); err != nil {
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
return err
}
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
}

if ctx.InterfaceRegistry != nil {
Expand Down
10 changes: 10 additions & 0 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
WithHomeDir(home).
WithChainID(genDoc.ChainID)

if config.GRPC.Enable {
// If grpc is enabled, configure grpc client for grpc gateway.
grpcClient, err := grpc.Dial(config.GRPC.Address, grpc.WithInsecure())
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", config.GRPC.Address)
}

apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
app.RegisterAPIRoutes(apiSrv, config.API)
errCh := make(chan error)
Expand Down