Skip to content

Commit

Permalink
feat: Serve run (#137)
Browse files Browse the repository at this point in the history
* fix: if no serve to run then exit

* Update go.yml

* feat: if no services to run, log warn info

* feat: add runGroup count

* feat: add info log for no service to apply

* feat: add grpc healthcheck

* test: if ETCD_ADDR is empty, then skip test

* feat: srvgrpc add metrics module

* fix: default grpcServe add Interceptor

* feat: add debug info when serve run

* fix: delete notes

* test: use env value

* test: not need to manually disable

* test: replace Skipf to Skip

* fix: merge master conflict

* fix: remove applies count check

* test: remove TestC_NoServe

* test: revoke 9539448 for e13c343

Co-authored-by: 谷溪 <guxi99@gmail.com>
  • Loading branch information
GGXXLL and Reasno authored Jun 10, 2021
1 parent ee103d5 commit 8910fc1
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 103 deletions.
8 changes: 8 additions & 0 deletions c_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/DoNewsCode/core/di"
"github.com/DoNewsCode/core/events"
"github.com/DoNewsCode/core/otgorm"
"github.com/DoNewsCode/core/srvgrpc"
"github.com/DoNewsCode/core/srvhttp"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
Expand All @@ -28,6 +30,9 @@ func TestC_Serve(t *testing.T) {
WithInline("grpc.addr", ":19999"),
)
c.ProvideEssentials()
c.AddModule(srvhttp.HealthCheckModule{})
c.AddModule(srvgrpc.HealthCheckModule{})

c.Invoke(func(dispatcher contract.Dispatcher) {
dispatcher.Subscribe(events.Listen(events.From(OnHTTPServerStart{}), func(ctx context.Context, start contract.Event) error {
atomic.AddInt32(&called, 1)
Expand Down Expand Up @@ -71,6 +76,9 @@ func TestC_ServeDisable(t *testing.T) {
WithInline("cron.disable", "true"),
)
c.ProvideEssentials()
c.AddModule(srvhttp.HealthCheckModule{})
c.AddModule(srvgrpc.HealthCheckModule{})

c.Invoke(func(dispatcher contract.Dispatcher) {
dispatcher.Subscribe(events.Listen(events.From(OnHTTPServerStart{}), func(ctx context.Context, start contract.Event) error {
atomic.AddInt32(&called, 1)
Expand Down
6 changes: 3 additions & 3 deletions contract/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
type Container interface {
ApplyRouter(router *mux.Router)
ApplyGRPCServer(server *grpc.Server)
Shutdown()
ApplyRunGroup(g *run.Group)
Modules() ifilter.Collection
ApplyCron(crontab *cron.Cron)
ApplyRunGroup(g *run.Group)
ApplyRootCommand(command *cobra.Command)
Shutdown()
Modules() ifilter.Collection
AddModule(module interface{})
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/golang/protobuf v1.4.3
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/hashicorp/go-multierror v1.1.0
github.com/hashicorp/go-version v1.3.0 // indirect
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
Expand Down
3 changes: 0 additions & 3 deletions otgorm/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ func TestModule_ProvideRunGroup(t *testing.T) {
core.WithInline("gorm.default.dsn", "file::memory:?cache=shared"),
core.WithInline("gormMetrics.interval", "1ms"),
core.WithInline("log.level", "none"),
core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
)
c.ProvideEssentials()
c.Provide(di.Deps{func() *Gauges {
Expand Down
6 changes: 0 additions & 6 deletions otkafka/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func TestModule_ProvideRunGroup(t *testing.T) {
core.WithInline("kafka.reader.default.topic", "test"),
core.WithInline("kafkaMetrics.interval", "10ms"),
core.WithInline("log.level", "none"),
core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
)
c.ProvideEssentials()
c.Provide(di.Deps{func() *ReaderStats {
Expand Down Expand Up @@ -160,9 +157,6 @@ func TestCollector(t *testing.T) {
core.WithInline("kafka.reader.default.topic", "test"),
core.WithInline("kafkaMetrics.interval", "1ms"),
core.WithInline("log.level", "none"),
core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
)
c.ProvideEssentials()
c.Provide(di.Deps{func() *ReaderStats {
Expand Down
3 changes: 0 additions & 3 deletions otredis/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ func TestModule_ProvideRunGroup(t *testing.T) {
core.WithInline("redis.default.addrs", envDefaultRedisAddrs),
core.WithInline("redisMetrics.interval", "1ms"),
core.WithInline("log.level", "none"),
core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
)
c.ProvideEssentials()
c.Provide(di.Deps{func() *Gauges {
Expand Down
224 changes: 136 additions & 88 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"fmt"
"net"
"net/http"
"os"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/DoNewsCode/core/events"
"github.com/DoNewsCode/core/logging"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/oklog/run"
"github.com/pkg/errors"
Expand Down Expand Up @@ -51,7 +53,121 @@ func (s serveModule) ProvideCommand(command *cobra.Command) {
command.AddCommand(newServeCmd(s.in))
}

func newServeCmd(p serveIn) *cobra.Command {
type runGroupFunc func(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error)

func (s serveIn) httpServe(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) {
if s.Config.Bool("http.disable") {
return nil, nil, nil
}

if s.HTTPServer == nil {
s.HTTPServer = &http.Server{}
}
router := mux.NewRouter()
s.Container.ApplyRouter(router)

router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
tpl, _ := route.GetPathTemplate()
level.Debug(logger).Log("service", "http", "path", tpl)
return nil
})

s.HTTPServer.Handler = router

httpAddr := s.Config.String("http.addr")
ln, err := net.Listen("tcp", httpAddr)
if err != nil {
return nil, nil, errors.Wrap(err, "failed start http server")
}
return func() error {
logger.Infof("http service is listening at %s", ln.Addr())
s.Dispatcher.Dispatch(
ctx,
events.Of(OnHTTPServerStart{s.HTTPServer, ln}),
)
defer s.Dispatcher.Dispatch(
ctx,
events.Of(OnHTTPServerShutdown{s.HTTPServer, ln}),
)
return s.HTTPServer.Serve(ln)
}, func(err error) {
_ = s.HTTPServer.Shutdown(context.Background())
_ = ln.Close()
}, nil
}

func (s serveIn) grpcServe(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) {
if s.Config.Bool("grpc.disable") {
return nil, nil, nil
}
if s.GRPCServer == nil {
s.GRPCServer = grpc.NewServer()
}
s.Container.ApplyGRPCServer(s.GRPCServer)

for module, info := range s.GRPCServer.GetServiceInfo() {
for _, method := range info.Methods {
level.Debug(logger).Log("service", "grpc", "path", fmt.Sprintf("%s/%s", module, method.Name))
}
}

grpcAddr := s.Config.String("grpc.addr")
ln, err := net.Listen("tcp", grpcAddr)
if err != nil {
return nil, nil, errors.Wrap(err, "failed start grpc server")
}
return func() error {
logger.Infof("gRPC service is listening at %s", ln.Addr())
s.Dispatcher.Dispatch(
ctx,
events.Of(OnGRPCServerStart{s.GRPCServer, ln}),
)
defer s.Dispatcher.Dispatch(
ctx,
events.Of(OnGRPCServerShutdown{s.GRPCServer, ln}),
)
return s.GRPCServer.Serve(ln)
}, func(err error) {
s.GRPCServer.GracefulStop()
_ = ln.Close()
}, nil
}

func (s serveIn) cronServe(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) {
if s.Config.Bool("cron.disable") {
return nil, nil, nil
}
if s.Cron == nil {
s.Cron = cron.New(cron.WithLogger(cronopts.CronLogAdapter{Logging: s.Logger}))
}
s.Container.ApplyCron(s.Cron)

return func() error {
logger.Infof("cron runner started")
s.Cron.Run()
return nil
}, func(err error) {
<-s.Cron.Stop().Done()
}, nil
}

func (s serveIn) signalWatch(ctx context.Context, logger logging.LevelLogger) (func() error, func(err error), error) {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
return func() error {
select {
case n := <-sig:
logger.Errf("signal received: %s", n)
case <-ctx.Done():
logger.Errf(ctx.Err().Error())
}
return nil
}, func(err error) {
close(sig)
}, nil
}

func newServeCmd(s serveIn) *cobra.Command {
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start the server",
Expand All @@ -60,108 +176,40 @@ func newServeCmd(p serveIn) *cobra.Command {

var (
g run.Group
l = logging.WithLevel(p.Logger)
l = logging.WithLevel(s.Logger)
)

// Start HTTP server
if !p.Config.Bool("http.disable") {
httpAddr := p.Config.String("http.addr")
ln, err := net.Listen("tcp", httpAddr)
if err != nil {
return errors.Wrap(err, "failed start http server")
}
if p.HTTPServer == nil {
p.HTTPServer = &http.Server{}
}
router := mux.NewRouter()
p.Container.ApplyRouter(router)
p.HTTPServer.Handler = router
g.Add(func() error {
l.Infof("http service is listening at %s", ln.Addr())
p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnHTTPServerStart{p.HTTPServer, ln}),
)
defer p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnHTTPServerShutdown{p.HTTPServer, ln}),
)
return p.HTTPServer.Serve(ln)
}, func(err error) {
_ = p.HTTPServer.Shutdown(context.Background())
_ = ln.Close()
})
for _, m := range s.Container.Modules() {
l.Debugf("load module: %T", m)
}

// Start gRPC server
if !p.Config.Bool("grpc.disable") {
grpcAddr := p.Config.String("grpc.addr")
ln, err := net.Listen("tcp", grpcAddr)
if err != nil {
return errors.Wrap(err, "failed start grpc server")
}
if p.GRPCServer == nil {
p.GRPCServer = grpc.NewServer()
}
p.Container.ApplyGRPCServer(p.GRPCServer)
g.Add(func() error {
l.Infof("gRPC service is listening at %s", ln.Addr())
p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnGRPCServerStart{p.GRPCServer, ln}),
)
defer p.Dispatcher.Dispatch(
cmd.Context(),
events.Of(OnGRPCServerShutdown{p.GRPCServer, ln}),
)
return p.GRPCServer.Serve(ln)
}, func(err error) {
p.GRPCServer.GracefulStop()
_ = ln.Close()
})
// Add serve and signalWatch
serves := []runGroupFunc{
s.httpServe,
s.grpcServe,
s.cronServe,
s.signalWatch,
}

// Start cron runner
if !p.Config.Bool("cron.disable") {
if p.Cron == nil {
p.Cron = cron.New(cron.WithLogger(cronopts.CronLogAdapter{Logging: l}))
for _, serve := range serves {
execute, interrupt, err := serve(cmd.Context(), l)
if err != nil {
return err
}

p.Container.ApplyCron(p.Cron)
g.Add(func() error {
l.Info("cron runner started")
p.Cron.Run()
return nil
}, func(err error) {
<-p.Cron.Stop().Done()
})
}

// Graceful shutdown
{
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
g.Add(func() error {
select {
case s := <-sig:
l.Errf("signal received: %s", s)
case <-cmd.Context().Done():
l.Errf(cmd.Context().Err().Error())
}
return nil
}, func(err error) {
close(sig)
})
if execute == nil {
continue
}
g.Add(execute, interrupt)
}

// Additional run groups
p.Container.ApplyRunGroup(&g)
s.Container.ApplyRunGroup(&g)

if err := g.Run(); err != nil {
return err
}

l.Infof("graceful shutdown complete; see you next time :)")
l.Info("graceful shutdown complete; see you next time :)")
return nil
},
}
Expand Down
17 changes: 17 additions & 0 deletions srvgrpc/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package srvgrpc

import (
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// HealthCheckModule defines a grpc provider for container.Container.
type HealthCheckModule struct{}

// ProvideGRPC implements container.GRPCProvider
func (h HealthCheckModule) ProvideGRPC(server *grpc.Server) {
srv := health.NewServer()
srv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(server, srv)
}
Loading

0 comments on commit 8910fc1

Please sign in to comment.