From 191cb2449b2a4aa6f90ba1e0cb07ba0d0ccef83b Mon Sep 17 00:00:00 2001 From: Lukasz Cwik Date: Wed, 25 Oct 2023 10:25:43 -0700 Subject: [PATCH] [CLOB-930] Add ctx.Done() support to shutdown, make start-up wait time configurable, and add support for domain sockets for gRPC server and gRPC web server. ctx.Done() support and was added in 0.50 with https://github.com/cosmos/cosmos-sdk/pull/15041 server start up time was removed in 0.50 with https://github.com/cosmos/cosmos-sdk/pull/15041 --- server/grpc/grpc_web.go | 20 +++++++- server/grpc/server.go | 23 ++++++--- server/start.go | 101 ++++++++++++++++++++++----------------- server/types/app.go | 7 ++- server/util.go | 11 +++-- testutil/network/util.go | 2 +- 6 files changed, 103 insertions(+), 61 deletions(-) diff --git a/server/grpc/grpc_web.go b/server/grpc/grpc_web.go index 99040ae263025..6942cb64c3409 100644 --- a/server/grpc/grpc_web.go +++ b/server/grpc/grpc_web.go @@ -2,7 +2,9 @@ package grpc import ( "fmt" + "net" "net/http" + "strings" "time" "github.com/improbable-eng/grpc-web/go/grpcweb" @@ -23,16 +25,30 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err ) } + var proto, addr string + parts := strings.SplitN(config.GRPCWeb.Address, "://", 2) + // Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify + // the network to use. + if len(parts) != 2 { + proto = "tcp" + addr = config.GRPCWeb.Address + } else { + proto, addr = parts[0], parts[1] + } + listener, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + wrappedServer := grpcweb.WrapServer(grpcSrv, options...) grpcWebSrv := &http.Server{ - Addr: config.GRPCWeb.Address, Handler: wrappedServer, ReadHeaderTimeout: 500 * time.Millisecond, } errCh := make(chan error) go func() { - if err := grpcWebSrv.ListenAndServe(); err != nil { + if err := grpcWebSrv.Serve(listener); err != nil { errCh <- fmt.Errorf("[grpc] failed to serve: %w", err) } }() diff --git a/server/grpc/server.go b/server/grpc/server.go index 79a9be3dca241..07b633dd67248 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -3,6 +3,7 @@ package grpc import ( "fmt" "net" + "strings" "time" "google.golang.org/grpc" @@ -18,7 +19,7 @@ import ( ) // StartGRPCServer starts a gRPC server on the given address. -func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { +func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, net.Addr, error) { maxSendMsgSize := cfg.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize @@ -53,16 +54,26 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config InterfaceRegistry: clientCtx.InterfaceRegistry, }) if err != nil { - return nil, err + return nil, nil, err } // Reflection allows external clients to see what services and methods // the gRPC server exposes. gogoreflection.Register(grpcSrv) - listener, err := net.Listen("tcp", cfg.Address) + var proto, addr string + parts := strings.SplitN(cfg.Address, "://", 2) + // Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify + // the network to use. + if len(parts) != 2 { + proto = "tcp" + addr = cfg.Address + } else { + proto, addr = parts[0], parts[1] + } + listener, err := net.Listen(proto, addr) if err != nil { - return nil, err + return nil, nil, err } errCh := make(chan error) @@ -75,10 +86,10 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config select { case err := <-errCh: - return nil, err + return nil, nil, err case <-time.After(types.ServerStartTime): // assume server started successfully - return grpcSrv, nil + return grpcSrv, listener.Addr(), nil } } diff --git a/server/start.go b/server/start.go index f3f4f657d6743..a3a5ccd3b7984 100644 --- a/server/start.go +++ b/server/start.go @@ -3,6 +3,7 @@ package server // DONTCOVER import ( + "context" "errors" "fmt" "net" @@ -141,14 +142,14 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. withTM, _ := cmd.Flags().GetBool(flagWithTendermint) if !withTM { serverCtx.Logger.Info("starting ABCI without Tendermint") - return wrapCPUProfile(serverCtx, func() error { - return startStandAlone(serverCtx, appCreator) + return wrapCPUProfile(cmd.Context(), serverCtx, func() error { + return startStandAlone(cmd.Context(), serverCtx, appCreator) }) } // amino is needed here for backwards compatibility of REST routes - err = wrapCPUProfile(serverCtx, func() error { - return startInProcess(serverCtx, clientCtx, appCreator) + err = wrapCPUProfile(cmd.Context(), serverCtx, func() error { + return startInProcess(cmd.Context(), serverCtx, clientCtx, appCreator) }) errCode, ok := err.(ErrorCode) if !ok { @@ -206,7 +207,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. return cmd } -func startStandAlone(ctx *Context, appCreator types.AppCreator) error { +func startStandAlone(parentCtx context.Context, ctx *Context, appCreator types.AppCreator) error { addr := ctx.Viper.GetString(flagAddress) transport := ctx.Viper.GetString(flagTransport) home := ctx.Viper.GetString(flags.FlagHome) @@ -260,10 +261,10 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error { }() // Wait for SIGINT or SIGTERM signal - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } -func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { +func startInProcess(parentCtx context.Context, ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { cfg := ctx.Config home := cfg.RootDir @@ -354,6 +355,32 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } + var ( + grpcSrv *grpc.Server + grpcSrvAddr net.Addr + grpcWebSrv *http.Server + ) + + if config.GRPC.Enable { + grpcSrv, grpcSrvAddr, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC) + if err != nil { + return err + } + defer grpcSrv.Stop() + if config.GRPCWeb.Enable { + grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config) + if err != nil { + ctx.Logger.Error("failed to start grpc-web http server: ", err) + return err + } + defer func() { + if err := grpcWebSrv.Close(); err != nil { + ctx.Logger.Error("failed to close grpc-web http server: ", err) + } + }() + } + } + var apiSrv *api.Server if config.API.Enable { genDoc, err := genDocProvider() @@ -364,11 +391,6 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) if config.GRPC.Enable { - _, port, err := net.SplitHostPort(config.GRPC.Address) - if err != nil { - return err - } - maxSendMsgSize := config.GRPC.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize @@ -379,11 +401,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize } - grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) - + grpcSrvAddrString := fmt.Sprintf("%s://%s", grpcSrvAddr.Network(), grpcSrvAddr.String()) // If grpc is enabled, configure grpc client for grpc gateway. grpcClient, err := grpc.Dial( - grpcAddress, + grpcSrvAddrString, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), @@ -396,7 +417,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App } clientCtx = clientCtx.WithGRPCClient(grpcClient) - ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) + ctx.Logger.Debug("grpc client assigned to client context", "target", grpcSrvAddrString) } apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server")) @@ -420,36 +441,26 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App } } - var ( - grpcSrv *grpc.Server - grpcWebSrv *http.Server - ) - - if config.GRPC.Enable { - grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC) - if err != nil { - return err - } - defer grpcSrv.Stop() - if config.GRPCWeb.Enable { - grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config) - if err != nil { - ctx.Logger.Error("failed to start grpc-web http server: ", err) - return err - } - defer func() { - if err := grpcWebSrv.Close(); err != nil { - ctx.Logger.Error("failed to close grpc-web http server: ", err) - } - }() - } - } - // At this point it is safe to block the process if we're in gRPC only mode as // we do not need to start Rosetta or handle any Tendermint related processes. if gRPCOnly { + // Fix application shutdown + defer func() { + _ = app.Close() + + if traceWriterCleanup != nil { + traceWriterCleanup() + } + + if apiSrv != nil { + _ = apiSrv.Close() + } + + ctx.Logger.Info("exiting...") + }() + // wait for signal capture and gracefully return - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } var rosettaSrv crgserver.Server @@ -520,7 +531,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App }() // wait for signal capture and gracefully return - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { @@ -531,7 +542,7 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { } // wrapCPUProfile runs callback in a goroutine, then wait for quit signals. -func wrapCPUProfile(ctx *Context, callback func() error) error { +func wrapCPUProfile(parentCtx context.Context, ctx *Context, callback func() error) error { if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" { f, err := os.Create(cpuProfile) if err != nil { @@ -564,5 +575,5 @@ func wrapCPUProfile(ctx *Context, callback func() error) error { case <-time.After(types.ServerStartTime): } - return WaitForQuitSignals() + return WaitForQuitSignals(parentCtx) } diff --git a/server/types/app.go b/server/types/app.go index 727f767fc35e6..caa735d327309 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -2,9 +2,6 @@ package types import ( "encoding/json" - "io" - "time" - dbm "github.com/cometbft/cometbft-db" abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/log" @@ -12,6 +9,8 @@ import ( tmtypes "github.com/cometbft/cometbft/types" "github.com/cosmos/gogoproto/grpc" "github.com/spf13/cobra" + "io" + "time" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/server/api" @@ -22,7 +21,7 @@ import ( // ServerStartTime defines the time duration that the server need to stay running after startup // for the startup be considered successful -const ServerStartTime = 5 * time.Second +var ServerStartTime = 5 * time.Second type ( // AppOptions defines an interface that is passed into an application diff --git a/server/util.go b/server/util.go index 58323b0d2108f..7e4e16983d018 100644 --- a/server/util.go +++ b/server/util.go @@ -1,6 +1,7 @@ package server import ( + "context" "errors" "fmt" "io" @@ -381,11 +382,15 @@ func TrapSignal(cleanupFunc func()) { } // WaitForQuitSignals waits for SIGINT and SIGTERM and returns. -func WaitForQuitSignals() ErrorCode { +func WaitForQuitSignals(ctx context.Context) error { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigs - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} + select { + case sig := <-sigs: + return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} + case <-ctx.Done(): + return nil + } } // GetAppDBBackend gets the backend type to use for the application DBs. diff --git a/testutil/network/util.go b/testutil/network/util.go index b7e7e7475acd8..cb5f754461de4 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -96,7 +96,7 @@ func startInProcess(cfg Config, val *Validator) error { } if val.AppConfig.GRPC.Enable { - grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) + grpcSrv, _, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) if err != nil { return err }