From 5356a868255b7ad27ca418eb846ccb89a98d8bfd Mon Sep 17 00:00:00 2001 From: JungHwan Tony Yun Date: Wed, 9 Mar 2022 22:09:36 +0900 Subject: [PATCH] feat: Modify grpc gateway to be concurrent (#11234) Current grpc happens to be concurrent, while the grpc gateway itself is not, since it always uses abci query. Therefore, as the current queries are not concurrent, throughput has the room for improvement. This PR changes the grpc gateway so that when server is ran by a node daemon, it directly calls grpc to make queries concurrent. Any services that uses grpc gateway could improve throughput by fundamental amount, which has been tested and ensured in the process of running an Osmosis node using the current chagnes. The code base has the following changes: - GRPCClient field has been added to Client Context. - The `Invoke` method in Client Context would use ABCI query when GRPCClient field is set to nil, otherwise use the GRPC Client to return results that have used grpc. - If GRPC is set to enable in `startInProcess`, it sets the GRPC Client field in Client Context. --- CHANGELOG.md | 1 + client/context.go | 10 ++++++++++ client/grpc_query.go | 9 ++++++++- server/start.go | 21 +++++++++++++++++++++ 4 files changed, 40 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac0a13dded..ada2c583a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [\#11240](https://github.com/cosmos/cosmos-sdk/pull/11240) Replace various modules `ModuleCdc` with the global `legacy.Cdc` * [#11179](https://github.com/cosmos/cosmos-sdk/pull/11179) Add state rollback command. * [\#10794](https://github.com/cosmos/cosmos-sdk/pull/10794) ADR-040: Add State Sync to V2 Store +* [\#11234](https://github.com/cosmos/cosmos-sdk/pull/11234) Add `GRPCClient` field to Client Context. If `GRPCClient` field is set to nil, the `Invoke` method would use ABCI query, otherwise use gprc. ### API Breaking Changes diff --git a/client/context.go b/client/context.go index 0357c25d68..380b08924a 100644 --- a/client/context.go +++ b/client/context.go @@ -9,6 +9,8 @@ import ( "sigs.k8s.io/yaml" + "google.golang.org/grpc" + "github.com/gogo/protobuf/proto" rpcclient "github.com/tendermint/tendermint/rpc/client" @@ -23,6 +25,7 @@ import ( type Context struct { FromAddress sdk.AccAddress Client rpcclient.Client + GRPCClient *grpc.ClientConn ChainID string Codec codec.Codec InterfaceRegistry codectypes.InterfaceRegistry @@ -128,6 +131,13 @@ func (ctx Context) WithClient(client rpcclient.Client) Context { return ctx } +// WithGRPCClient returns a copy of the context with an updated GRPC client +// instance. +func (ctx Context) WithGRPCClient(grpcClient *grpc.ClientConn) Context { + ctx.GRPCClient = grpcClient + return ctx +} + // WithUseLedger returns a copy of the context with an updated UseLedger flag. func (ctx Context) WithUseLedger(useLedger bool) Context { ctx.UseLedger = useLedger diff --git a/client/grpc_query.go b/client/grpc_query.go index 4ba896700d..15c0725f2e 100644 --- a/client/grpc_query.go +++ b/client/grpc_query.go @@ -32,7 +32,8 @@ var fallBackCodec = codec.NewProtoCodec(failingInterfaceRegistry{}) func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) { // Two things can happen here: // 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly, - // 2. or we are querying for state, in which case we call ABCI's Query. + // 2-1. or we are querying for state, in which case we call grpc if grpc client set. + // 2-2. or we are querying for state, in which case we call ABCI's Query if grpc client not set. // In both cases, we don't allow empty request args (it will panic unexpectedly). if reflect.ValueOf(req).IsNil() { @@ -55,6 +56,12 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply i return err } + if ctx.GRPCClient != nil { + // Case 2-1. Invoke grpc. + return ctx.GRPCClient.Invoke(grpcCtx, method, req, reply, opts...) + } + + // Case 2-2. Querying state via abci query. reqBz, err := ctx.gRPCCodec().Marshal(req) if err != nil { return err diff --git a/server/start.go b/server/start.go index e63ae27326..52e9e63a3d 100644 --- a/server/start.go +++ b/server/start.go @@ -4,6 +4,7 @@ package server import ( "fmt" + "net" "net/http" "os" "runtime/pprof" @@ -18,6 +19,7 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" tmtypes "github.com/tendermint/tendermint/types" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" @@ -296,6 +298,25 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App WithHomeDir(home). WithChainID(genDoc.ChainID) + if config.GRPC.Enable { + _, port, err := net.SplitHostPort(config.GRPC.Address) + if err != nil { + return err + } + grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) + // If grpc is enabled, configure grpc client for grpc gateway. + grpcClient, err := grpc.Dial( + grpcAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec())), + ) + if err != nil { + return err + } + clientCtx = clientCtx.WithGRPCClient(grpcClient) + ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) + } + apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server")) app.RegisterAPIRoutes(apiSrv, config.API) errCh := make(chan error)