Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup server logic #15041

Merged
merged 19 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package grpc

import (
"context"
"fmt"
"net"
"time"

"cosmossdk.io/log"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -17,8 +18,9 @@ import (
_ "github.com/cosmos/cosmos-sdk/types/tx/amino" // Import amino.proto file for reflection
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -46,6 +48,7 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() {
modes[m.String()] = (int32)(m)
}

return modes
}(),
ChainID: clientCtx.ChainID,
Expand All @@ -60,25 +63,45 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return grpcSrv, nil
}

// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
}

errCh := make(chan error)
errCh := make(chan error, 1)
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
logger.Info("starting gRPC server...", "address", cfg.Address)
errCh <- grpcSrv.Serve(listener)
}()

select {
case err := <-errCh:
return nil, err
// Start a block loop to wait for an indication to stop the server or that
// the server failed to start properly.
for {
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC server.

logger.Info("stopping gRPC server...", "address", cfg.Address)
grpcSrv.GracefulStop()
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

case <-time.After(types.ServerStartTime):
// assume server started successfully
return grpcSrv, nil
return nil

case err := <-errCh:
logger.Error("failed to start gRPC server", "err", err)
return err
}
}
}
41 changes: 25 additions & 16 deletions server/start.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"fmt"
"net"
"os"
Expand All @@ -14,6 +15,7 @@ import (
pvm "github.com/cometbft/cometbft/privval"
"github.com/cometbft/cometbft/proxy"
"github.com/cometbft/cometbft/rpc/client/local"
"github.com/neilotoole/errgroup"
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/grpc"
Expand Down Expand Up @@ -260,33 +262,34 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
return WaitForQuitSignals()
}

func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := ctx.Config
func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := svrCtx.Config
home := cfg.RootDir

db, err := openDB(home, GetAppDBBackend(ctx.Viper))
db, err := openDB(home, GetAppDBBackend(svrCtx.Viper))
if err != nil {
return err
}

traceWriterFile := ctx.Viper.GetString(flagTraceStore)
traceWriterFile := svrCtx.Viper.GetString(flagTraceStore)
traceWriter, err := openTraceWriter(traceWriterFile)
if err != nil {
return err
}

// Clean up the traceWriter when the server is shutting down.
// clean up the traceWriter when the server is shutting down
var traceWriterCleanup func()

// if flagTraceStore is not used then traceWriter is nil
if traceWriter != nil {
traceWriterCleanup = func() {
if err = traceWriter.Close(); err != nil {
ctx.Logger.Error("failed to close trace writer", "err", err)
svrCtx.Logger.Error("failed to close trace writer", "err", err)
}
}
}

config, err := serverconfig.GetConfig(ctx.Viper)
config, err := serverconfig.GetConfig(svrCtx.Viper)
if err != nil {
return err
}
Expand All @@ -295,24 +298,25 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return err
}

app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper)
app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper)

nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile())
if err != nil {
return err
}

genDocProvider := node.DefaultGenesisDocProviderFunc(cfg)

var (
tmNode *node.Node
gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly)
gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly)
)

if gRPCOnly {
ctx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
svrCtx.Logger.Info("starting node in gRPC only mode; CometBFT is disabled")
config.GRPC.Enable = true
} else {
ctx.Logger.Info("starting node with ABCI CometBFT in-process")
svrCtx.Logger.Info("starting node with ABCI CometBFT in-process")

tmNode, err = node.NewNode(
cfg,
Expand All @@ -322,7 +326,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
genDocProvider,
node.DefaultDBProvider,
node.DefaultMetricsProvider(cfg.Instrumentation),
ctx.Logger,
svrCtx.Logger,
)
if err != nil {
return err
Expand Down Expand Up @@ -356,6 +360,9 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
grpcSrv *grpc.Server
)

ctx, cancel := context.WithCancel(context.Background())
Fixed Show fixed Hide fixed
g, ctx := errgroup.WithContext(ctx)
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed

if config.API.Enable {
genDoc, err := genDocProvider()
if err != nil {
Expand All @@ -382,7 +389,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// If grpc is enabled, configure grpc client for grpc gateway.
// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.Dial(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -397,18 +404,19 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
svrCtx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)

// start grpc server
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}

defer grpcSrv.Stop()
}

// configure api server
apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"), grpcSrv)
apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server"), grpcSrv)
app.RegisterAPIRoutes(apiSrv, config.API)
if config.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
Expand Down Expand Up @@ -460,7 +468,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
_ = apiSrv.Close()
}

ctx.Logger.Info("exiting...")
svrCtx.Logger.Info("exiting...")
}()

// wait for signal capture and gracefully return
Expand All @@ -471,6 +479,7 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil
}

return telemetry.New(cfg.Telemetry)
}

Expand Down