diff --git a/config/config_template.toml b/config/config_template.toml index ddd808360..7ed48b8cc 100644 --- a/config/config_template.toml +++ b/config/config_template.toml @@ -60,14 +60,14 @@ Modules = ["epoch", "bucket", "object", "payment"] Dsn = "localhost:3308" [P2PCfg] -ListenAddress = "127.0.0.1:9933" +ListenAddress = "127.0.0.1:9833" P2PPrivateKey = "" Bootstrap = [] PingPeriod = 2 [MetricsCfg] Enabled = false -HTTPAddress = "localhost:9833" +HTTPAddress = "localhost:24036" [LogCfg] Level = "debug" diff --git a/pkg/middleware/grpc/metrics.go b/pkg/middleware/grpc/metrics.go new file mode 100644 index 000000000..bd6133420 --- /dev/null +++ b/pkg/middleware/grpc/metrics.go @@ -0,0 +1,39 @@ +package grpc + +import ( + "runtime/debug" + + openmetrics "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2" + grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" +) + +var ( + gRPCPanicRecoveryHandler = func(p interface{}) (err error) { + metrics.PanicsTotal.WithLabelValues().Inc() + log.Errorw("recovered from panic", "panic", p, "stack", debug.Stack()) + return status.Errorf(codes.Internal, "%s", p) + } +) + +// GetDefaultServerInterceptor returns default gRPC server interceptor +func GetDefaultServerInterceptor() []grpc.ServerOption { + options := []grpc.ServerOption{} + options = append(options, grpc.ChainUnaryInterceptor(openmetrics.UnaryServerInterceptor(metrics.DefaultGRPCServerMetrics), + grpcrecovery.UnaryServerInterceptor(grpcrecovery.WithRecoveryHandler(gRPCPanicRecoveryHandler)))) + options = append(options, grpc.ChainStreamInterceptor(openmetrics.StreamServerInterceptor(metrics.DefaultGRPCServerMetrics))) + return options +} + +// GetDefaultClientInterceptor returns default gRPC client interceptor +func GetDefaultClientInterceptor() []grpc.DialOption { + options := []grpc.DialOption{} + options = append(options, grpc.WithChainUnaryInterceptor(openmetrics.UnaryClientInterceptor(metrics.DefaultGRPCClientMetrics))) + options = append(options, grpc.WithChainStreamInterceptor(openmetrics.StreamClientInterceptor(metrics.DefaultGRPCClientMetrics))) + return options +} diff --git a/pkg/p2p/node.go b/pkg/p2p/node.go index 14d50cfd9..5cdf30fcf 100644 --- a/pkg/p2p/node.go +++ b/pkg/p2p/node.go @@ -5,12 +5,12 @@ import ( "strings" "time" - signerclient "github.com/bnb-chain/greenfield-storage-provider/service/signer/client" + storagetypes "github.com/bnb-chain/greenfield/x/storage/types" ggio "github.com/gogo/protobuf/io" "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" leveldb "github.com/ipfs/go-ds-leveldb" - libp2p "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -19,7 +19,7 @@ import ( "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/pkg/p2p/types" - storagetypes "github.com/bnb-chain/greenfield/x/storage/types" + signerclient "github.com/bnb-chain/greenfield-storage-provider/service/signer/client" ) // Node defines the p2p protocol node, encapsulates the go-lib.p2p @@ -102,7 +102,7 @@ func (n *Node) Name() string { // Start runs background task that trigger broadcast ping request func (n *Node) Start(ctx context.Context) error { - go n.eventloop() + go n.eventLoop() return nil } @@ -168,8 +168,8 @@ func (n *Node) GetApproval(object *storagetypes.ObjectInfo, expectedAccept int, } } -// eventloop run the background task -func (n *Node) eventloop() { +// eventLoop run the background task +func (n *Node) eventLoop() { ticker := time.NewTicker(time.Duration(n.config.PingPeriod) * time.Second) for { select { diff --git a/service/challenge/challenge.go b/service/challenge/challenge.go index 35186b335..86e2e72e1 100644 --- a/service/challenge/challenge.go +++ b/service/challenge/challenge.go @@ -4,15 +4,18 @@ import ( "context" "net" - "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "github.com/bnb-chain/greenfield-storage-provider/model" + "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" "github.com/bnb-chain/greenfield-storage-provider/service/challenge/types" psclient "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client" "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" + utilgrpc "github.com/bnb-chain/greenfield-storage-provider/util/grpc" ) var _ lifecycle.Service = &Challenge{} @@ -23,6 +26,7 @@ type Challenge struct { config *ChallengeConfig spDB sqldb.SPDB pieceStore *psclient.StoreClient + grpcServer *grpc.Server } // NewChallengeService returns an instance of Challenge that implementation of @@ -56,28 +60,34 @@ func (challenge *Challenge) Name() string { // Start the challenge gRPC service func (challenge *Challenge) Start(ctx context.Context) error { errCh := make(chan error) - - go func(errCh chan error) { - lis, err := net.Listen("tcp", challenge.config.GRPCAddress) - errCh <- err - if err != nil { - log.Errorw("failed to listen", "error", err) - return - } - grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(model.MaxCallMsgSize), grpc.MaxSendMsgSize(model.MaxCallMsgSize)) - types.RegisterChallengeServiceServer(grpcServer, challenge) - reflection.Register(grpcServer) - if err = grpcServer.Serve(lis); err != nil { - log.Errorw("failed to serve", "error", err) - return - } - }(errCh) - + go challenge.serve(errCh) err := <-errCh return err } // Stop the challenge gRPC service and recycle the resources func (challenge *Challenge) Stop(ctx context.Context) error { + challenge.grpcServer.GracefulStop() return nil } + +func (challenge *Challenge) serve(errCh chan error) { + lis, err := net.Listen("tcp", challenge.config.GRPCAddress) + errCh <- err + if err != nil { + log.Errorw("failed to listen", "error", err) + return + } + + options := utilgrpc.GetDefaultServerOptions() + if metrics.GetMetrics().Enabled() { + options = append(options, mdgrpc.GetDefaultServerInterceptor()...) + } + challenge.grpcServer = grpc.NewServer(options...) + types.RegisterChallengeServiceServer(challenge.grpcServer, challenge) + reflection.Register(challenge.grpcServer) + if err = challenge.grpcServer.Serve(lis); err != nil { + log.Errorw("failed to serve", "error", err) + return + } +} diff --git a/service/challenge/client/challenge_client.go b/service/challenge/client/challenge_client.go index b0d1790cc..2ea0b0c37 100644 --- a/service/challenge/client/challenge_client.go +++ b/service/challenge/client/challenge_client.go @@ -3,13 +3,14 @@ package client import ( "context" - "github.com/bnb-chain/greenfield-storage-provider/model" - merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" + merrors "github.com/bnb-chain/greenfield-storage-provider/model/errors" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" "github.com/bnb-chain/greenfield-storage-provider/service/challenge/types" + utilgrpc "github.com/bnb-chain/greenfield-storage-provider/util/grpc" ) // ChallengeClient is a challenge gRPC service client wrapper @@ -21,10 +22,11 @@ type ChallengeClient struct { // NewChallengeClient return a ChallengeClient instance func NewChallengeClient(address string) (*ChallengeClient, error) { - conn, err := grpc.DialContext(context.Background(), address, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(model.MaxCallMsgSize))) + options := utilgrpc.GetDefaultClientOptions() + if metrics.GetMetrics().Enabled() { + options = append(options, mdgrpc.GetDefaultClientInterceptor()...) + } + conn, err := grpc.DialContext(context.Background(), address, options...) if err != nil { log.Errorw("failed to dial challenge", "error", err) return nil, err diff --git a/service/downloader/client/downloader_client.go b/service/downloader/client/downloader_client.go index 951b7401c..4ec484e19 100644 --- a/service/downloader/client/downloader_client.go +++ b/service/downloader/client/downloader_client.go @@ -3,13 +3,14 @@ package client import ( "context" - "github.com/bnb-chain/greenfield-storage-provider/model" + storagetypes "github.com/bnb-chain/greenfield/x/storage/types" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" "github.com/bnb-chain/greenfield-storage-provider/service/downloader/types" - storagetypes "github.com/bnb-chain/greenfield/x/storage/types" + utilgrpc "github.com/bnb-chain/greenfield-storage-provider/util/grpc" ) // DownloaderClient is a downloader gRPC service client wrapper @@ -21,10 +22,11 @@ type DownloaderClient struct { // NewDownloaderClient returns a DownloaderClient instance func NewDownloaderClient(address string) (*DownloaderClient, error) { - conn, err := grpc.DialContext(context.Background(), address, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize)), - grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(model.MaxCallMsgSize))) + options := utilgrpc.GetDefaultClientOptions() + if metrics.GetMetrics().Enabled() { + options = append(options, mdgrpc.GetDefaultClientInterceptor()...) + } + conn, err := grpc.DialContext(context.Background(), address, options...) if err != nil { log.Errorw("failed to dial downloader", "error", err) return nil, err diff --git a/service/downloader/downloader.go b/service/downloader/downloader.go index 44f0e6f18..5e381064b 100644 --- a/service/downloader/downloader.go +++ b/service/downloader/downloader.go @@ -4,15 +4,18 @@ import ( "context" "net" - "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" - "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "github.com/bnb-chain/greenfield-storage-provider/model" + "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" "github.com/bnb-chain/greenfield-storage-provider/service/downloader/types" psclient "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client" + "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" + utilgrpc "github.com/bnb-chain/greenfield-storage-provider/util/grpc" ) var _ lifecycle.Service = &Downloader{} @@ -23,6 +26,7 @@ type Downloader struct { config *DownloaderConfig spDB sqldb.SPDB pieceStore *psclient.StoreClient + grpcServer *grpc.Server } // NewDownloaderService returns an instance of Downloader that implementation of @@ -56,28 +60,34 @@ func (downloader *Downloader) Name() string { // Start the downloader gRPC service func (downloader *Downloader) Start(ctx context.Context) error { errCh := make(chan error) - - go func(errCh chan error) { - lis, err := net.Listen("tcp", downloader.config.GRPCAddress) - errCh <- err - if err != nil { - log.Errorw("failed to listen", "error", err) - return - } - grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(model.MaxCallMsgSize), grpc.MaxSendMsgSize(model.MaxCallMsgSize)) - types.RegisterDownloaderServiceServer(grpcServer, downloader) - reflection.Register(grpcServer) - if err = grpcServer.Serve(lis); err != nil { - log.Errorw("failed to serve", "error", err) - return - } - }(errCh) - + go downloader.serve(errCh) err := <-errCh return err } // Stop the downloader gRPC service and recycle the resources func (downloader *Downloader) Stop(ctx context.Context) error { + downloader.grpcServer.GracefulStop() return nil } + +func (downloader *Downloader) serve(errCh chan error) { + lis, err := net.Listen("tcp", downloader.config.GRPCAddress) + errCh <- err + if err != nil { + log.Errorw("failed to listen", "error", err) + return + } + + options := utilgrpc.GetDefaultServerOptions() + if metrics.GetMetrics().Enabled() { + options = append(options, mdgrpc.GetDefaultServerInterceptor()...) + } + downloader.grpcServer = grpc.NewServer(options...) + types.RegisterDownloaderServiceServer(downloader.grpcServer, downloader) + reflection.Register(downloader.grpcServer) + if err = downloader.grpcServer.Serve(lis); err != nil { + log.Errorw("failed to serve", "error", err) + return + } +} diff --git a/service/p2p/client/p2p_client.go b/service/p2p/client/p2p_client.go index d62276e57..82f490a7a 100644 --- a/service/p2p/client/p2p_client.go +++ b/service/p2p/client/p2p_client.go @@ -3,6 +3,8 @@ package client import ( "context" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" storagetypes "github.com/bnb-chain/greenfield/x/storage/types" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -22,6 +24,10 @@ type P2PClient struct { // NewP2PClient return a P2PClient instance func NewP2PClient(address string) (*P2PClient, error) { + options := []grpc.DialOption{} + if metrics.GetMetrics().Enabled() { + options = append(options, mdgrpc.GetDefaultClientInterceptor()...) + } conn, err := grpc.DialContext(context.Background(), address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize)), diff --git a/service/p2p/p2p.go b/service/p2p/p2p.go index 5034ee5f4..f23d72da5 100644 --- a/service/p2p/p2p.go +++ b/service/p2p/p2p.go @@ -8,6 +8,8 @@ import ( "time" "github.com/bnb-chain/greenfield-common/go/hash" + "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -92,11 +94,14 @@ func (p *P2PServer) serve(errCh chan error) { return } - grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(model.MaxCallMsgSize)) - p2ptypes.RegisterP2PServiceServer(grpcServer, p) - p.grpcServer = grpcServer - reflection.Register(grpcServer) - if err := grpcServer.Serve(lis); err != nil { + options := []grpc.ServerOption{} + if metrics.GetMetrics().Enabled() { + options = append(options, mdgrpc.GetDefaultServerInterceptor()...) + } + p.grpcServer = grpc.NewServer(options...) + p2ptypes.RegisterP2PServiceServer(p.grpcServer, p) + reflection.Register(p.grpcServer) + if err := p.grpcServer.Serve(lis); err != nil { log.Errorw("failed to start grpc server", "err", err) return } diff --git a/service/uploader/client/uploader_client.go b/service/uploader/client/uploader_client.go index 63e4dda75..d87a2b6d3 100644 --- a/service/uploader/client/uploader_client.go +++ b/service/uploader/client/uploader_client.go @@ -3,15 +3,14 @@ package client import ( "context" - openmetrics "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2" "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" servicetypes "github.com/bnb-chain/greenfield-storage-provider/service/types" "github.com/bnb-chain/greenfield-storage-provider/service/uploader/types" + utilgrpc "github.com/bnb-chain/greenfield-storage-provider/util/grpc" ) // UploaderClient is an uploader gRPC service client wrapper @@ -22,13 +21,9 @@ type UploaderClient struct { // NewUploaderClient return an UploaderClient instance func NewUploaderClient(address string) (*UploaderClient, error) { - var options []grpc.DialOption - options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials())) - options = append(options, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize))) - options = append(options, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(model.MaxCallMsgSize))) + options := utilgrpc.GetDefaultClientOptions() if metrics.GetMetrics().Enabled() { - options = append(options, grpc.WithChainUnaryInterceptor(openmetrics.UnaryClientInterceptor(metrics.DefaultGRPCClientMetrics))) - options = append(options, grpc.WithChainStreamInterceptor(openmetrics.StreamClientInterceptor(metrics.DefaultGRPCClientMetrics))) + options = append(options, mdgrpc.GetDefaultClientInterceptor()...) } conn, err := grpc.DialContext(context.Background(), address, options...) if err != nil { diff --git a/service/uploader/uploader.go b/service/uploader/uploader.go index b997cf809..1364d34e9 100644 --- a/service/uploader/uploader.go +++ b/service/uploader/uploader.go @@ -3,25 +3,22 @@ package uploader import ( "context" "net" - "runtime/debug" - openmetrics "github.com/grpc-ecosystem/go-grpc-middleware/providers/openmetrics/v2" - grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" lru "github.com/hashicorp/golang-lru" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" - "google.golang.org/grpc/status" "github.com/bnb-chain/greenfield-storage-provider/model" "github.com/bnb-chain/greenfield-storage-provider/pkg/lifecycle" "github.com/bnb-chain/greenfield-storage-provider/pkg/log" "github.com/bnb-chain/greenfield-storage-provider/pkg/metrics" + mdgrpc "github.com/bnb-chain/greenfield-storage-provider/pkg/middleware/grpc" signerclient "github.com/bnb-chain/greenfield-storage-provider/service/signer/client" tasknodeclient "github.com/bnb-chain/greenfield-storage-provider/service/tasknode/client" "github.com/bnb-chain/greenfield-storage-provider/service/uploader/types" psclient "github.com/bnb-chain/greenfield-storage-provider/store/piecestore/client" "github.com/bnb-chain/greenfield-storage-provider/store/sqldb" + utilgrpc "github.com/bnb-chain/greenfield-storage-provider/util/grpc" ) var _ lifecycle.Service = &Uploader{} @@ -104,19 +101,9 @@ func (uploader *Uploader) serve(errCh chan error) { return } - gRPCPanicRecoveryHandler := func(p interface{}) (err error) { - metrics.PanicsTotal.WithLabelValues().Inc() - log.Errorw("recovered from panic", "panic", p, "stack", debug.Stack()) - return status.Errorf(codes.Internal, "%s", p) - } - - var options []grpc.ServerOption - options = append(options, grpc.MaxRecvMsgSize(model.MaxCallMsgSize)) - options = append(options, grpc.MaxSendMsgSize(model.MaxCallMsgSize)) + options := utilgrpc.GetDefaultServerOptions() if metrics.GetMetrics().Enabled() { - options = append(options, grpc.ChainUnaryInterceptor(openmetrics.UnaryServerInterceptor(metrics.DefaultGRPCServerMetrics), - grpcrecovery.UnaryServerInterceptor(grpcrecovery.WithRecoveryHandler(gRPCPanicRecoveryHandler)))) - options = append(options, grpc.ChainStreamInterceptor(openmetrics.StreamServerInterceptor(metrics.DefaultGRPCServerMetrics))) + options = append(options, mdgrpc.GetDefaultServerInterceptor()...) } uploader.grpcServer = grpc.NewServer(options...) types.RegisterUploaderServiceServer(uploader.grpcServer, uploader) diff --git a/util/grpc/options.go b/util/grpc/options.go new file mode 100644 index 000000000..296f97ed2 --- /dev/null +++ b/util/grpc/options.go @@ -0,0 +1,24 @@ +package grpc + +import ( + "github.com/bnb-chain/greenfield-storage-provider/model" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// GetDefaultServerOptions returns default gRPC server options +func GetDefaultServerOptions() []grpc.ServerOption { + options := []grpc.ServerOption{} + options = append(options, grpc.MaxRecvMsgSize(model.MaxCallMsgSize)) + options = append(options, grpc.MaxSendMsgSize(model.MaxCallMsgSize)) + return options +} + +// GetDefaultClientOptions returns default gRPC client options +func GetDefaultClientOptions() []grpc.DialOption { + options := []grpc.DialOption{} + options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials())) + options = append(options, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(model.MaxCallMsgSize))) + options = append(options, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(model.MaxCallMsgSize))) + return options +}