diff --git a/c_test.go b/c_test.go index 90fc26bf..c930cd46 100644 --- a/c_test.go +++ b/c_test.go @@ -15,6 +15,8 @@ import ( "github.com/DoNewsCode/core/di" "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/otgorm" + "github.com/DoNewsCode/core/srvgrpc" + "github.com/DoNewsCode/core/srvhttp" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" @@ -28,6 +30,9 @@ func TestC_Serve(t *testing.T) { WithInline("grpc.addr", ":19999"), ) c.ProvideEssentials() + c.AddModule(srvhttp.HealthCheckModule{}) + c.AddModule(srvgrpc.HealthCheckModule{}) + c.Invoke(func(dispatcher contract.Dispatcher) { dispatcher.Subscribe(events.Listen(events.From(OnHTTPServerStart{}), func(ctx context.Context, start contract.Event) error { atomic.AddInt32(&called, 1) @@ -71,6 +76,9 @@ func TestC_ServeDisable(t *testing.T) { WithInline("cron.disable", "true"), ) c.ProvideEssentials() + c.AddModule(srvhttp.HealthCheckModule{}) + c.AddModule(srvgrpc.HealthCheckModule{}) + c.Invoke(func(dispatcher contract.Dispatcher) { dispatcher.Subscribe(events.Listen(events.From(OnHTTPServerStart{}), func(ctx context.Context, start contract.Event) error { atomic.AddInt32(&called, 1) diff --git a/contract/container.go b/contract/container.go index fb521d89..435d634a 100644 --- a/contract/container.go +++ b/contract/container.go @@ -13,10 +13,10 @@ import ( type Container interface { ApplyRouter(router *mux.Router) ApplyGRPCServer(server *grpc.Server) - Shutdown() - ApplyRunGroup(g *run.Group) - Modules() ifilter.Collection ApplyCron(crontab *cron.Cron) + ApplyRunGroup(g *run.Group) ApplyRootCommand(command *cobra.Command) + Shutdown() + Modules() ifilter.Collection AddModule(module interface{}) } diff --git a/go.mod b/go.mod index 696d4b44..ccf7fa29 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/golang/protobuf v1.4.3 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/go-version v1.3.0 // indirect github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40 diff --git a/go.sum b/go.sum index 72ce62ed..41e61365 100644 --- a/go.sum +++ b/go.sum @@ -250,6 +250,7 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= diff --git a/otgorm/module_test.go b/otgorm/module_test.go index 13c9df83..0897c002 100644 --- a/otgorm/module_test.go +++ b/otgorm/module_test.go @@ -133,9 +133,6 @@ func TestModule_ProvideRunGroup(t *testing.T) { core.WithInline("gorm.default.dsn", "file::memory:?cache=shared"), core.WithInline("gormMetrics.interval", "1ms"), core.WithInline("log.level", "none"), - core.WithInline("http.disable", "true"), - core.WithInline("grpc.disable", "true"), - core.WithInline("cron.disable", "true"), ) c.ProvideEssentials() c.Provide(di.Deps{func() *Gauges { diff --git a/otkafka/module_test.go b/otkafka/module_test.go index 96b9d9e5..10874d17 100644 --- a/otkafka/module_test.go +++ b/otkafka/module_test.go @@ -40,9 +40,6 @@ func TestModule_ProvideRunGroup(t *testing.T) { core.WithInline("kafka.reader.default.topic", "test"), core.WithInline("kafkaMetrics.interval", "10ms"), core.WithInline("log.level", "none"), - core.WithInline("http.disable", "true"), - core.WithInline("grpc.disable", "true"), - core.WithInline("cron.disable", "true"), ) c.ProvideEssentials() c.Provide(di.Deps{func() *ReaderStats { @@ -160,9 +157,6 @@ func TestCollector(t *testing.T) { core.WithInline("kafka.reader.default.topic", "test"), core.WithInline("kafkaMetrics.interval", "1ms"), core.WithInline("log.level", "none"), - core.WithInline("http.disable", "true"), - core.WithInline("grpc.disable", "true"), - core.WithInline("cron.disable", "true"), ) c.ProvideEssentials() c.Provide(di.Deps{func() *ReaderStats { diff --git a/otredis/module_test.go b/otredis/module_test.go index a15f7f3f..e510582d 100644 --- a/otredis/module_test.go +++ b/otredis/module_test.go @@ -42,9 +42,6 @@ func TestModule_ProvideRunGroup(t *testing.T) { core.WithInline("redis.default.addrs", envDefaultRedisAddrs), core.WithInline("redisMetrics.interval", "1ms"), core.WithInline("log.level", "none"), - core.WithInline("http.disable", "true"), - core.WithInline("grpc.disable", "true"), - core.WithInline("cron.disable", "true"), ) c.ProvideEssentials() c.Provide(di.Deps{func() *Gauges { diff --git a/serve.go b/serve.go index a788d55b..a9151379 100644 --- a/serve.go +++ b/serve.go @@ -2,6 +2,7 @@ package core import ( "context" + "fmt" "net" "net/http" "os" @@ -15,6 +16,7 @@ import ( "github.com/DoNewsCode/core/events" "github.com/DoNewsCode/core/logging" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/gorilla/mux" "github.com/oklog/run" "github.com/pkg/errors" @@ -51,7 +53,121 @@ func (s serveModule) ProvideCommand(command *cobra.Command) { command.AddCommand(newServeCmd(s.in)) } -func newServeCmd(p serveIn) *cobra.Command { +type runGroupFunc func(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) + +func (s serveIn) httpServe(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { + if s.Config.Bool("http.disable") { + return nil, nil, nil + } + + if s.HTTPServer == nil { + s.HTTPServer = &http.Server{} + } + router := mux.NewRouter() + s.Container.ApplyRouter(router) + + router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { + tpl, _ := route.GetPathTemplate() + level.Debug(logger).Log("service", "http", "path", tpl) + return nil + }) + + s.HTTPServer.Handler = router + + httpAddr := s.Config.String("http.addr") + ln, err := net.Listen("tcp", httpAddr) + if err != nil { + return nil, nil, errors.Wrap(err, "failed start http server") + } + return func() error { + logger.Infof("http service is listening at %s", ln.Addr()) + s.Dispatcher.Dispatch( + ctx, + events.Of(OnHTTPServerStart{s.HTTPServer, ln}), + ) + defer s.Dispatcher.Dispatch( + ctx, + events.Of(OnHTTPServerShutdown{s.HTTPServer, ln}), + ) + return s.HTTPServer.Serve(ln) + }, func(err error) { + _ = s.HTTPServer.Shutdown(context.Background()) + _ = ln.Close() + }, nil +} + +func (s serveIn) grpcServe(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { + if s.Config.Bool("grpc.disable") { + return nil, nil, nil + } + if s.GRPCServer == nil { + s.GRPCServer = grpc.NewServer() + } + s.Container.ApplyGRPCServer(s.GRPCServer) + + for module, info := range s.GRPCServer.GetServiceInfo() { + for _, method := range info.Methods { + level.Debug(logger).Log("service", "grpc", "path", fmt.Sprintf("%s/%s", module, method.Name)) + } + } + + grpcAddr := s.Config.String("grpc.addr") + ln, err := net.Listen("tcp", grpcAddr) + if err != nil { + return nil, nil, errors.Wrap(err, "failed start grpc server") + } + return func() error { + logger.Infof("gRPC service is listening at %s", ln.Addr()) + s.Dispatcher.Dispatch( + ctx, + events.Of(OnGRPCServerStart{s.GRPCServer, ln}), + ) + defer s.Dispatcher.Dispatch( + ctx, + events.Of(OnGRPCServerShutdown{s.GRPCServer, ln}), + ) + return s.GRPCServer.Serve(ln) + }, func(err error) { + s.GRPCServer.GracefulStop() + _ = ln.Close() + }, nil +} + +func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { + if s.Config.Bool("cron.disable") { + return nil, nil, nil + } + if s.Cron == nil { + s.Cron = cron.New(cron.WithLogger(cronopts.CronLogAdapter{Logging: s.Logger})) + } + s.Container.ApplyCron(s.Cron) + + return func() error { + logger.Infof("cron runner started") + s.Cron.Run() + return nil + }, func(err error) { + <-s.Cron.Stop().Done() + }, nil +} + +func (s serveIn) signalWatch(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) { + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + return func() error { + select { + case n := <-sig: + logger.Errf("signal received: %s", n) + case <-ctx.Done(): + logger.Errf(ctx.Err().Error()) + } + return nil + }, func(err error) { + close(sig) + }, nil +} + +func newServeCmd(s serveIn) *cobra.Command { var serveCmd = &cobra.Command{ Use: "serve", Short: "Start the server", @@ -60,108 +176,40 @@ func newServeCmd(p serveIn) *cobra.Command { var ( g run.Group - l = logging.WithLevel(p.Logger) + l = logging.WithLevel(s.Logger) ) - // Start HTTP server - if !p.Config.Bool("http.disable") { - httpAddr := p.Config.String("http.addr") - ln, err := net.Listen("tcp", httpAddr) - if err != nil { - return errors.Wrap(err, "failed start http server") - } - if p.HTTPServer == nil { - p.HTTPServer = &http.Server{} - } - router := mux.NewRouter() - p.Container.ApplyRouter(router) - p.HTTPServer.Handler = router - g.Add(func() error { - l.Infof("http service is listening at %s", ln.Addr()) - p.Dispatcher.Dispatch( - cmd.Context(), - events.Of(OnHTTPServerStart{p.HTTPServer, ln}), - ) - defer p.Dispatcher.Dispatch( - cmd.Context(), - events.Of(OnHTTPServerShutdown{p.HTTPServer, ln}), - ) - return p.HTTPServer.Serve(ln) - }, func(err error) { - _ = p.HTTPServer.Shutdown(context.Background()) - _ = ln.Close() - }) + for _, m := range s.Container.Modules() { + l.Debugf("load module: %T", m) } - // Start gRPC server - if !p.Config.Bool("grpc.disable") { - grpcAddr := p.Config.String("grpc.addr") - ln, err := net.Listen("tcp", grpcAddr) - if err != nil { - return errors.Wrap(err, "failed start grpc server") - } - if p.GRPCServer == nil { - p.GRPCServer = grpc.NewServer() - } - p.Container.ApplyGRPCServer(p.GRPCServer) - g.Add(func() error { - l.Infof("gRPC service is listening at %s", ln.Addr()) - p.Dispatcher.Dispatch( - cmd.Context(), - events.Of(OnGRPCServerStart{p.GRPCServer, ln}), - ) - defer p.Dispatcher.Dispatch( - cmd.Context(), - events.Of(OnGRPCServerShutdown{p.GRPCServer, ln}), - ) - return p.GRPCServer.Serve(ln) - }, func(err error) { - p.GRPCServer.GracefulStop() - _ = ln.Close() - }) + // Add serve and signalWatch + serves := []runGroupFunc{ + s.httpServe, + s.grpcServe, + s.cronServe, + s.signalWatch, } - // Start cron runner - if !p.Config.Bool("cron.disable") { - if p.Cron == nil { - p.Cron = cron.New(cron.WithLogger(cronopts.CronLogAdapter{Logging: l})) + for _, serve := range serves { + execute, interrupt, err := serve(cmd.Context(), l) + if err != nil { + return err } - - p.Container.ApplyCron(p.Cron) - g.Add(func() error { - l.Info("cron runner started") - p.Cron.Run() - return nil - }, func(err error) { - <-p.Cron.Stop().Done() - }) - } - - // Graceful shutdown - { - sig := make(chan os.Signal, 1) - signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - g.Add(func() error { - select { - case s := <-sig: - l.Errf("signal received: %s", s) - case <-cmd.Context().Done(): - l.Errf(cmd.Context().Err().Error()) - } - return nil - }, func(err error) { - close(sig) - }) + if execute == nil { + continue + } + g.Add(execute, interrupt) } // Additional run groups - p.Container.ApplyRunGroup(&g) + s.Container.ApplyRunGroup(&g) if err := g.Run(); err != nil { return err } - l.Infof("graceful shutdown complete; see you next time :)") + l.Info("graceful shutdown complete; see you next time :)") return nil }, } diff --git a/srvgrpc/healthcheck.go b/srvgrpc/healthcheck.go new file mode 100644 index 00000000..38fab1a0 --- /dev/null +++ b/srvgrpc/healthcheck.go @@ -0,0 +1,17 @@ +package srvgrpc + +import ( + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// HealthCheckModule defines a grpc provider for container.Container. +type HealthCheckModule struct{} + +// ProvideGRPC implements container.GRPCProvider +func (h HealthCheckModule) ProvideGRPC(server *grpc.Server) { + srv := health.NewServer() + srv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(server, srv) +} diff --git a/srvgrpc/healthcheck.proto b/srvgrpc/healthcheck.proto new file mode 100644 index 00000000..56785eab --- /dev/null +++ b/srvgrpc/healthcheck.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package grpc.health.v1; + +message HealthCheckRequest { + string service = 1; +} + +message HealthCheckResponse { + enum ServingStatus { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; // Used only by the Watch method. + } + ServingStatus status = 1; +} + +service Health { + rpc Check(HealthCheckRequest) returns (HealthCheckResponse); + + rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); +} \ No newline at end of file diff --git a/srvgrpc/metrics.go b/srvgrpc/metrics.go new file mode 100644 index 00000000..82f3f060 --- /dev/null +++ b/srvgrpc/metrics.go @@ -0,0 +1,22 @@ +package srvgrpc + +import ( + "github.com/grpc-ecosystem/go-grpc-prometheus" + "google.golang.org/grpc" +) + +// MetricsModule exposes prometheus metrics. Here only provides a simple call, +// more complex use, please refer to github.com/grpc-ecosystem/go-grpc-prometheus. +// +// Need to actively provide grpc.Server: +// opts := []grpc.ServerOption{ +// grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), +// grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), +// } +// server = grpc.NewServer(opts...) +type MetricsModule struct{} + +// ProvideGRPC implements container.GRPCProvider +func (m MetricsModule) ProvideGRPC(server *grpc.Server) { + grpc_prometheus.Register(server) +}