Skip to content

Commit

Permalink
feat(server): support for hot updates
Browse files Browse the repository at this point in the history
test/zerodowntime
  • Loading branch information
kl7sn committed Feb 5, 2025
1 parent 6e6304f commit cb93cf6
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 30 deletions.
59 changes: 50 additions & 9 deletions ego.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package ego
import (
"context"
"os"
"strconv"
"strings"
"sync"
"time"

"go.uber.org/zap"

// econf/file package should be imported first
_ "github.com/gotomicro/ego/core/econf/file"
"github.com/gotomicro/ego/core/eflag"
Expand All @@ -17,7 +20,6 @@ import (
"github.com/gotomicro/ego/server"
"github.com/gotomicro/ego/task/ecron"
"github.com/gotomicro/ego/task/ejob"
"go.uber.org/zap"
)

// Ego 分为三大部分
Expand All @@ -34,13 +36,14 @@ type Ego struct {
cancel func() // cancel

// 第二部分 运行程序
inits []func() error // 系统初始化函数
invokers []func() error // 用户初始化函数
servers []server.Server // 服务
orderServers []server.OrderServer // 有顺序的服务,需要监听health。成功后,才启动下一步
crons []ecron.Ecron // 定时任务
jobs map[string]ejob.Ejob // 短时任务
registerer eregistry.Registry // 注册中心
inits []func() error // 系统初始化函数
invokers []func() error // 用户初始化函数
servers []server.Server // 服务
orderServers []server.OrderServer // 有顺序的服务,需要监听health。成功后,才启动下一步
reloadServers []server.ReloadServer // 支持热更新的服务
crons []ecron.Ecron // 定时任务
jobs map[string]ejob.Ejob // 短时任务
registerer eregistry.Registry // 注册中心

// 第三部分 可选方法
opts opts
Expand All @@ -51,6 +54,7 @@ type Ego struct {
type stopInfo struct {
stopStartTime time.Time
isGracefulStop bool
isReload bool
}

type opts struct {
Expand Down Expand Up @@ -168,6 +172,14 @@ func (e *Ego) OrderServe(s ...server.OrderServer) *Ego {
return e
}

// ReloadServe 设置服务
func (e *Ego) ReloadServe(s ...server.ReloadServer) *Ego {
e.smu.Lock()
defer e.smu.Unlock()
e.reloadServers = append(e.reloadServers, s...)
return e
}

// Cron 设置定时任务
func (e *Ego) Cron(w ...ecron.Ecron) *Ego {
e.crons = append(e.crons, w...)
Expand Down Expand Up @@ -224,6 +236,7 @@ func (e *Ego) Run() error {

// 当没有job,才启动服务
if len(e.jobs) == 0 {
_ = e.startReloadServers(e.ctx)
_ = e.startServers(e.ctx)
}

Expand Down Expand Up @@ -251,7 +264,7 @@ func (e *Ego) Run() error {
}

// Stop 停止程序
func (e *Ego) Stop(ctx context.Context, isGraceful bool) (err error) {
func (e *Ego) Stop(ctx context.Context, isGraceful, isReload bool) (err error) {
// 运行停止前清理
runSerialFuncLogError(e.opts.beforeStopClean)

Expand All @@ -274,6 +287,29 @@ func (e *Ego) Stop(ctx context.Context, isGraceful bool) (err error) {
})
}(s)
}
if isReload {
// 启动子进程
e.logger.Info("[ego] ... fork child start!", elog.FieldComponent("app"))
pid, err := e.forkChild()
if err != nil {
e.logger.Error("fork child process error", elog.FieldComponent("app"), elog.FieldErr(err))
return err
}
e.SdNotify("MAINPID=" + strconv.Itoa(pid))
e.logger.Info("[ego] ... done!", elog.FieldComponent("app"))
}

e.logger.Info("[ego] ... reloadServers start!", elog.FieldComponent("app"))

for _, s := range e.reloadServers {
func(s server.ReloadServer) {
e.cycle.Run(func() error {
return s.GracefulStop(ctx)
})
}(s)
}
e.logger.Info("[ego] ... reloadServers done!", elog.FieldComponent("app"))

} else {
for _, s := range e.servers {
func(s server.Server) {
Expand All @@ -285,6 +321,11 @@ func (e *Ego) Stop(ctx context.Context, isGraceful bool) (err error) {
e.cycle.Run(s.Stop)
}(s)
}
for _, s := range e.reloadServers {
func(s server.ReloadServer) {
e.cycle.Run(s.Stop)
}(s)
}
}
e.smu.RLock()

Expand Down
103 changes: 101 additions & 2 deletions ego_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"os/exec"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

sentinelmetrics "github.com/alibaba/sentinel-golang/metrics"
"github.com/coreos/go-systemd/daemon"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/automaxprocs/maxprocs"
"golang.org/x/sync/errgroup"

Expand All @@ -33,18 +38,20 @@ func (e *Ego) waitSignals() {
sig := make(chan os.Signal, 2)
signal.Notify(
sig,
e.opts.shutdownSignals...,
append(e.opts.shutdownSignals, syscall.SIGUSR1)...,
)

go func() {
s := <-sig
// 区分强制退出、优雅退出
grace := s != syscall.SIGQUIT
reload := s == syscall.SIGUSR1
go func() {
// todo 父节点传context待考虑
e.stopInfo = stopInfo{
stopStartTime: time.Now(),
isGracefulStop: grace,
isReload: reload,
}
stopCtx, cancel := context.WithTimeoutCause(context.Background(), e.opts.stopTimeout, fmt.Errorf("stop timeout %v", e.opts.stopTimeout))

Expand All @@ -53,7 +60,7 @@ func (e *Ego) waitSignals() {
cancel()
}()

_ = e.Stop(stopCtx, grace)
_ = e.Stop(stopCtx, grace, reload)
<-stopCtx.Done()
// 记录服务器关闭时候,由于关闭过慢,无法正常关闭,被强制cancel
if errors.Is(stopCtx.Err(), context.DeadlineExceeded) {
Expand Down Expand Up @@ -89,6 +96,28 @@ func (e *Ego) startServers(ctx context.Context) error {
return nil
}

func (e *Ego) startReloadServers(ctx context.Context) error {
// start multi servers
for _, s := range e.reloadServers {
s := s
e.cycle.Run(func() (err error) {
_ = s.Init()
err = e.registerer.RegisterService(ctx, s.Info())
if err != nil {
e.logger.Error("register service err", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldErr(err))
}
defer func() {
_ = e.registerer.UnregisterService(ctx, s.Info())
}()
e.logger.Info("start server", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldAddr(s.Info().Label()))
defer e.logger.Info("stop server", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldErr(err), elog.FieldAddr(s.Info().Label()))
err = s.Start()
return
})
}
return nil
}

func (e *Ego) startOrderServers(ctx context.Context) (err error, isNeedStop bool) {
// start order servers
for _, s := range e.orderServers {
Expand Down Expand Up @@ -290,6 +319,76 @@ func initMaxProcs() error {
return nil
}

func (e *Ego) SdNotify(notify string) {
ok, err := daemon.SdNotify(false, notify)
if !ok {
if err != nil {
// notification supported, but failure happened (e.g. error connecting to NOTIFY_SOCKET or while sending data)
elog.EgoLogger.Error("systemd notification failed", elog.FieldComponent("app"), elog.FieldErr(err))
return
}
// notification not supported (i.e. NOTIFY_SOCKET is unset)
elog.EgoLogger.Info("systemd notification not supported", elog.FieldComponent("app"))
return
}
elog.EgoLogger.Info("systemd notification success", elog.FieldComponent("app"))
}

func (e *Ego) forkChild() (int, error) {
var args []string
var extraFiles []*os.File
var fnames []string
path := os.Args[0]

if len(os.Args) > 1 {
args = os.Args[1:]
}

var lc int
for _, ln := range e.reloadServers {
tl, ok := ln.Listener().(*net.TCPListener)
if !ok {
elog.EgoLogger.Panic("listener is not tcp listener", elog.FieldComponent("app"))
}
f, err := tl.File()
if err != nil {
elog.EgoLogger.Panic("get listener file failed", elog.FieldComponent("app"), elog.FieldErr(err))
}
fnames = append(fnames, f.Name())

elog.EgoLogger.Info("set ExtraFiles", elog.FieldComponent("app"), elog.Any("ExtraFiles", f.Name()))

extraFiles = append(extraFiles, f)
lc++
}

if err := os.Setenv("LISTEN_FDS", fmt.Sprintf("%d", lc)); err != nil {
elog.EgoLogger.Panic("set env LISTEN_FDS failed", elog.FieldComponent("app"), elog.FieldErr(err))
}
if err := os.Setenv("LISTEN_FDNAMES", strings.Join(fnames, ":")); err != nil {
elog.EgoLogger.Panic("set env LISTEN_FDNAMES failed", elog.FieldComponent("app"), elog.FieldErr(err))
}
if err := os.Setenv("FORK_CHILD", "1"); err != nil {
elog.EgoLogger.Panic("set env FORK_CHILD failed", elog.FieldComponent("app"), elog.FieldErr(err))
}
if !lo.Contains(args, "-reload") {
args = append(args, "-reload")
}
cmd := exec.Command(path, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = extraFiles
cmd.Env = os.Environ()
elog.EgoLogger.Info("cmd", elog.FieldComponent("app"), elog.Any("Path", cmd.Path), elog.Any("Args", cmd.Args))

err := cmd.Start()
if err != nil {
return 0, err
}

return cmd.Process.Pid, nil
}

// printBanner init
func (e *Ego) printBanner() error {
if e.opts.disableBanner {
Expand Down
7 changes: 4 additions & 3 deletions ego_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/gotomicro/ego/core/econf"
"github.com/gotomicro/ego/server"
"github.com/stretchr/testify/assert"
)

func TestEgoRun(t *testing.T) {
Expand All @@ -20,7 +21,7 @@ func TestEgoRun(t *testing.T) {
app.Serve(svc)
go func() {
time.Sleep(time.Millisecond * 100)
err := app.Stop(context.Background(), false)
err := app.Stop(context.Background(), false, false)
assert.Nil(t, err)
}()
err := app.Run()
Expand All @@ -35,7 +36,7 @@ func TestEgoRun(t *testing.T) {
app.Serve(svc)
go func() {
time.Sleep(time.Millisecond * 100)
err := app.Stop(context.Background(), false)
err := app.Stop(context.Background(), false, false)
assert.Nil(t, err)
}()
err := app.Run()
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/RaMin0/gin-health-check v0.0.0-20180807004848-a677317b3f01
github.com/alibaba/sentinel-golang v1.0.3
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7
github.com/dave/dst v0.26.2
github.com/davecgh/go-spew v1.1.1
github.com/fasthttp/websocket v1.5.2
Expand All @@ -22,6 +23,7 @@ require (
github.com/modern-go/reflect2 v1.0.2
github.com/prometheus/client_golang v1.12.1
github.com/robfig/cron/v3 v3.0.1
github.com/samber/lo v1.39.0
github.com/spf13/cast v1.4.1
github.com/stretchr/testify v1.8.4
github.com/wk8/go-ordered-map v1.0.0
Expand Down Expand Up @@ -78,7 +80,6 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/shirou/gopsutil/v3 v3.21.6 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
Expand Down
Loading

0 comments on commit cb93cf6

Please sign in to comment.