Skip to content

Commit

Permalink
other: server 包部分内容可读性优化,增加健壮度
Browse files Browse the repository at this point in the history
  • Loading branch information
kercylan98 committed Dec 29, 2023
1 parent 5b53e8a commit 472fdc3
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 26 deletions.
5 changes: 3 additions & 2 deletions server/listener.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"github.com/kercylan98/minotaur/utils/super"
"github.com/xtaci/kcp-go/v5"
"net"
"sync"
Expand All @@ -21,14 +22,14 @@ func (l *listener) init() *listener {

func (l *listener) Accept() (net.Conn, error) {
l.once.Do(func() {
l.state <- nil
super.TryWriteChannel(l.state, nil)
})
return l.Listener.Accept()
}

func (l *listener) AcceptKCP() (*kcp.UDPSession, error) {
l.once.Do(func() {
l.state <- nil
super.TryWriteChannel(l.state, nil)
})
return l.kcpListener.AcceptKCP()
}
14 changes: 7 additions & 7 deletions server/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (n Network) gNetMode(state chan<- error, srv *Server) {
gnet.WithTicker(true),
gnet.WithMulticore(true),
); err != nil {
srv.gServer.state <- err
super.TryWriteChannel(srv.gServer.state, err)
}
}(srv)
}
Expand All @@ -143,7 +143,7 @@ func (n Network) grpcMode(state chan<- error, srv *Server) {
lis := (&listener{srv: srv, Listener: l, state: state}).init()
go func(srv *Server, lis *listener) {
if err = srv.grpcServer.Serve(lis); err != nil {
lis.state <- err
super.TryWriteChannel(lis.state, err)
}
}(srv, lis)
}
Expand All @@ -152,7 +152,7 @@ func (n Network) grpcMode(state chan<- error, srv *Server) {
func (n Network) kcpMode(state chan<- error, srv *Server) {
l, err := kcp.ListenWithOptions(srv.addr, nil, 0, 0)
if err != nil {
state <- err
super.TryWriteChannel(state, err)
return
}
lis := (&listener{srv: srv, kcpListener: l, state: state}).init()
Expand Down Expand Up @@ -195,7 +195,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) {
srv.httpServer.Addr = srv.addr
l, err := net.Listen(string(NetworkTcp), srv.addr)
if err != nil {
state <- err
super.TryWriteChannel(state, err)
return
}
gin.SetMode(gin.ReleaseMode)
Expand All @@ -215,7 +215,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) {
err = lis.srv.httpServer.Serve(lis)
}
if err != nil {
lis.state <- err
super.TryWriteChannel(lis.state, err)
}
}((&listener{srv: srv, Listener: l, state: state}).init())
}
Expand All @@ -224,7 +224,7 @@ func (n Network) httpMode(state chan<- error, srv *Server) {
func (n Network) websocketMode(state chan<- error, srv *Server) {
l, err := net.Listen(string(NetworkTcp), srv.addr)
if err != nil {
state <- err
super.TryWriteChannel(state, err)
return
}
var pattern string
Expand Down Expand Up @@ -302,7 +302,7 @@ func (n Network) websocketMode(state chan<- error, srv *Server) {
err = http.Serve(lis, nil)
}
if err != nil {
lis.state <- err
super.TryWriteChannel(lis.state, err)
}
}((&listener{srv: srv, Listener: l, state: state}).init())
}
30 changes: 13 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (srv *Server) Ticker() *timer.Ticker {

// Shutdown 主动停止运行服务器
func (srv *Server) Shutdown() {
srv.systemSignal <- syscall.SIGQUIT
super.TryWriteChannel[os.Signal](srv.systemSignal, syscall.SIGQUIT)
}

// shutdown 停止运行服务器
Expand All @@ -204,11 +204,7 @@ func (srv *Server) shutdown(err error) {
if srv.multiple == nil {
srv.OnStopEvent()
}
defer func() {
if srv.multipleRuntimeErrorChan != nil {
srv.multipleRuntimeErrorChan <- err
}
}()
defer super.TryWriteChannel(srv.multipleRuntimeErrorChan, err)
srv.cancel()
if srv.gServer != nil {
if shutdownErr := gnet.Stop(context.Background(), fmt.Sprintf("%s://%s", srv.network, srv.addr)); err != nil {
Expand Down Expand Up @@ -263,7 +259,7 @@ func (srv *Server) shutdown(err error) {
log.Info("Server", log.Any("network", srv.network), log.String("listen", srv.addr),
log.String("action", "shutdown"), log.String("state", "normal"))
}
srv.closeChannel <- struct{}{}
super.TryWriteChannel(srv.closeChannel, struct{}{})
}

// GRPCServer 当网络类型为 NetworkGRPC 时将被允许获取 grpc 服务器,否则将会发生 panic
Expand Down Expand Up @@ -456,27 +452,27 @@ func (srv *Server) low(message *Message, present time.Time, expect time.Duration
}

// dispatchMessage 消息分发
func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
func (srv *Server) dispatchMessage(dispatcherIns *dispatcher, msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
)
if srv.deadlockDetect > 0 {
ctx, cancel = context.WithTimeout(context.Background(), srv.deadlockDetect)
go func(ctx context.Context, msg *Message) {
go func(ctx context.Context, srv *Server, msg *Message) {
select {
case <-ctx.Done():
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("SuspectedDeadlock", msg))
srv.OnDeadlockDetectEvent(msg)
}
}
}(ctx, msg)
}(ctx, srv, msg)
}

present := time.Now()
if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync {
defer func(msg *Message) {
defer func(cancel context.CancelFunc, srv *Server, dispatcherIns *dispatcher, msg *Message, present time.Time) {
super.Handle(cancel)
if err := super.RecoverTransform(recover()); err != nil {
stack := string(debug.Stack())
Expand All @@ -485,7 +481,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
srv.OnMessageErrorEvent(msg, err)
}
if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback {
dispatcher.antiUnique(msg.name)
dispatcherIns.antiUnique(msg.name)
}

srv.low(msg, present, time.Millisecond*100)
Expand All @@ -494,7 +490,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {
srv.messagePool.Release(msg)
}
}(msg)
}(cancel, srv, dispatcherIns, msg, present)
} else {
if cancel != nil {
defer cancel()
Expand All @@ -512,10 +508,10 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
msg.ordinaryHandler()
case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync:
if err := srv.ants.Submit(func() {
defer func() {
defer func(cancel context.CancelFunc, srv *Server, dispatcherIns *dispatcher, msg *Message, present time.Time) {
if err := super.RecoverTransform(recover()); err != nil {
if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync {
dispatcher.antiUnique(msg.name)
dispatcherIns.antiUnique(msg.name)
}
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack))
Expand All @@ -529,7 +525,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {
srv.messagePool.Release(msg)
}
}()
}(cancel, srv, dispatcherIns, msg, present)
var err error
if msg.exceptionHandler != nil {
err = msg.exceptionHandler()
Expand All @@ -550,7 +546,7 @@ func (srv *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
srv.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler)
return
}
dispatcher.antiUnique(msg.name)
dispatcherIns.antiUnique(msg.name)
if err != nil {
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack())))
}
Expand Down

0 comments on commit 472fdc3

Please sign in to comment.