Skip to content

Commit

Permalink
Separate gRPC server from API activation
Browse files Browse the repository at this point in the history
  • Loading branch information
tkxkd0159 committed Jun 12, 2024
1 parent cfbba38 commit 97c524f
Showing 1 changed file with 109 additions and 78 deletions.
187 changes: 109 additions & 78 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return err
}

srvcfg, err := serverconfig.GetConfig(ctx.Viper)
svrCfg, err := serverconfig.GetConfig(ctx.Viper)
if err != nil {
return err
}

if err := srvcfg.ValidateBasic(); err != nil {
if err := svrCfg.ValidateBasic(); err != nil {
ctx.Logger.Error("WARNING: The minimum-gas-prices config in app.toml is set to the empty string. " +
"This defaults to 0 in the current version, but will error in the next version " +
"(SDK v0.45). Please explicitly put the desired minimum-gas-prices in your app.toml.")
Expand All @@ -292,7 +292,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App

if gRPCOnly {
ctx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled")
srvcfg.GRPC.Enable = true
svrCfg.GRPC.Enable = true
} else {
ctx.Logger.Info("starting node with ABCI Tendermint in-process")

Expand Down Expand Up @@ -321,7 +321,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
// Add the tx service to the gRPC router. We only need to register this
// service if API or gRPC is enabled, and avoid doing so in the general
// case, because it spawns a new local tendermint RPC client.
if (srvcfg.API.Enable || srvcfg.GRPC.Enable) && tmNode != nil {
if (svrCfg.API.Enable || svrCfg.GRPC.Enable) && tmNode != nil {
clientCtx = clientCtx.WithClient(local.New(tmNode))

app.RegisterTxService(clientCtx)
Expand All @@ -332,86 +332,25 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}
}

metrics, err := startTelemetry(srvcfg)
metrics, err := startTelemetry(svrCfg)
if err != nil {
return err
}

var (
apiSrv *api.Server
grpcSrv *grpc.Server
)

if srvcfg.API.Enable {
genDoc, err := genDocProvider()
if err != nil {
return err
}

clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)

if srvcfg.GRPC.Enable {
_, port, err := net.SplitHostPort(srvcfg.GRPC.Address)
if err != nil {
return err
}

maxSendMsgSize := srvcfg.GRPC.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := srvcfg.GRPC.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

// If grpc is enabled, configure grpc client for grpc gateway.
grpcClient, err := grpc.NewClient(
grpcAddress,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)

// start grpc server
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, srvcfg.GRPC)
if err != nil {
return err
}
defer grpcSrv.Stop()
}

apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
app.RegisterAPIRoutes(apiSrv, srvcfg.API)
if srvcfg.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
}
errCh := make(chan error)

go func() {
if err := apiSrv.Start(srvcfg); err != nil {
errCh <- err
}
}()
grpcSrv, clientCtx, err := startGrpcServer(svrCfg.GRPC, clientCtx, ctx, app)
if err != nil {
return err
}

select {
case err := <-errCh:
return err
genDoc, err := genDocProvider()
if err != nil {
return err
}
clientCtx.WithChainID(genDoc.ChainID)

case <-time.After(types.ServerStartTime): // assume server started successfully
}
apiSrv, err := startAPIServer(svrCfg, clientCtx, ctx, app, cfg.RootDir, metrics)
if err != nil {
return err
}

// At this point it is safe to block the process if we're in gRPC only mode as
Expand All @@ -430,6 +369,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
cpuProfileCleanup()
}

if grpcSrv != nil {
grpcSrv.Stop()
}

if apiSrv != nil {
_ = apiSrv.Close()
}
Expand All @@ -448,6 +391,94 @@ func genPvFileOnlyWhenKmsAddressEmpty(cfg *config.Config) *pvm.FilePV {
return nil
}

func startGrpcServer(
config serverconfig.GRPCConfig,
clientCtx client.Context,
svrCtx *Context,
app types.Application,
) (*grpc.Server, client.Context, error) {
if !config.Enable {
// return grpcServer as nil if gRPC is disabled
return nil, clientCtx, nil
}
_, _, err := net.SplitHostPort(config.Address)
if err != nil {
return nil, clientCtx, err
}

maxSendMsgSize := config.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
}

maxRecvMsgSize := config.MaxRecvMsgSize
if maxRecvMsgSize == 0 {
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

// if gRPC is enabled, configure gRPC client for gRPC gateway
grpcClient, err := grpc.NewClient(
config.Address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize),
),
)
if err != nil {
return nil, clientCtx, err
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
svrCtx.Logger.Debug("gRPC client assigned to client context", "target", config.Address)

grpcSrv, err := servergrpc.StartGRPCServer(clientCtx, app, config)
if err != nil {
return nil, clientCtx, err
}

return grpcSrv, clientCtx, nil
}

func startAPIServer(
svrCfg serverconfig.Config,
clientCtx client.Context,
svrCtx *Context,
app types.Application,
home string,
metrics *telemetry.Metrics,
) (*api.Server, error) {
if !svrCfg.API.Enable {
return nil, nil
}

clientCtx = clientCtx.WithHomeDir(home)

apiSrv := api.New(clientCtx, svrCtx.Logger.With("module", "api-server"))
app.RegisterAPIRoutes(apiSrv, svrCfg.API)

if svrCfg.Telemetry.Enabled {
apiSrv.SetTelemetry(metrics)
}

errCh := make(chan error)
go func() {
if err := apiSrv.Start(svrCfg); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
return nil, err

case <-time.After(types.ServerStartTime): // assume server started successfully
}

return apiSrv, nil
}

func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
if !cfg.Telemetry.Enabled {
return nil, nil
Expand Down

0 comments on commit 97c524f

Please sign in to comment.