From b2c0bb0da3dd87520fa5fcf574d88c47f5a26a4a Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 20 Mar 2024 23:48:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E7=89=88=20server=20=E5=8C=85?= =?UTF-8?q?=20HTTP=20=E5=9F=BA=E7=A1=80=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 + go.sum | 6 + server/v2/conn.go | 59 +++++++ server/v2/connections.go | 111 +++++++++++++ server/v2/core.go | 9 + server/v2/event_handler.go | 69 -------- server/v2/network.go | 11 ++ server/v2/network/http.go | 53 ++++++ server/v2/network/http_serve.go | 7 + server/v2/network/websocket.go | 57 +++++++ server/v2/options.go | 4 - server/v2/server.go | 59 +++++-- server/v2/server_test.go | 13 +- server/v2/traffickers/http.go | 56 ------- server/v2/traffickers/http_recorder.go | 211 ------------------------ server/v2/traffickers/websocket.go | 64 ------- server/v2/traffickers/websocket_conn.go | 16 -- utils/super/context.go | 20 +++ 18 files changed, 390 insertions(+), 438 deletions(-) create mode 100644 server/v2/conn.go create mode 100644 server/v2/connections.go create mode 100644 server/v2/core.go delete mode 100644 server/v2/event_handler.go create mode 100644 server/v2/network.go create mode 100644 server/v2/network/http.go create mode 100644 server/v2/network/http_serve.go create mode 100644 server/v2/network/websocket.go delete mode 100644 server/v2/options.go delete mode 100644 server/v2/traffickers/http.go delete mode 100644 server/v2/traffickers/http_recorder.go delete mode 100644 server/v2/traffickers/websocket.go delete mode 100644 server/v2/traffickers/websocket_conn.go create mode 100644 utils/super/context.go diff --git a/go.mod b/go.mod index 5719bdd5..1d09c2d7 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,9 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.14.0 // indirect + github.com/gobwas/httphead v0.1.0 // indirect + github.com/gobwas/pool v0.2.1 // indirect + github.com/gobwas/ws v1.3.2 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect diff --git a/go.sum b/go.sum index 3155bcd6..47cfead9 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,12 @@ github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8= github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q= +github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= diff --git a/server/v2/conn.go b/server/v2/conn.go new file mode 100644 index 00000000..f908e655 --- /dev/null +++ b/server/v2/conn.go @@ -0,0 +1,59 @@ +package server + +import ( + "context" + "github.com/kercylan98/minotaur/utils/log" + "net" + "unsafe" +) + +type Conn interface { + net.Conn +} + +type conn struct { + net.Conn + cs *connections + ctx context.Context + cancel context.CancelFunc + idx int +} + +func (c *conn) init(ctx context.Context, cs *connections, conn net.Conn, idx int) *conn { + c.Conn = conn + c.cs = cs + c.ctx, c.cancel = context.WithCancel(ctx) + c.idx = idx + return c +} + +func (c *conn) awaitRead() { + defer func() { _ = c.Close() }() + + const bufferSize = 4096 + buf := make([]byte, bufferSize) // 避免频繁的内存分配,初始化一个固定大小的缓冲区 + for { + select { + case <-c.ctx.Done(): + return + default: + ptr := unsafe.Pointer(&buf[0]) + n, err := c.Read((*[bufferSize]byte)(ptr)[:]) + if err != nil { + log.Error("READ", err) + return + } + + if n > 0 { + if _, err := c.Write(buf[:n]); err != nil { + log.Error("Write", err) + } + } + } + } +} + +func (c *conn) Close() (err error) { + c.cs.Event() <- c + return +} diff --git a/server/v2/connections.go b/server/v2/connections.go new file mode 100644 index 00000000..9681aa73 --- /dev/null +++ b/server/v2/connections.go @@ -0,0 +1,111 @@ +package server + +import ( + "context" + "github.com/kercylan98/minotaur/utils/log" + "net" + "time" +) + +// connections 结构体用于管理连接 +type connections struct { + ctx context.Context // 上下文对象,用于取消连接管理器 + ch chan any // 事件通道,用于接收连接管理器的操作事件 + items []*conn // 连接列表,存储所有打开的连接 + gap []int // 连接空隙,记录已关闭的连接索引,用于重用索引 +} + +// 初始化连接管理器 +func (cs *connections) init(ctx context.Context) *connections { + cs.ctx = ctx + cs.ch = make(chan any, 1024) + cs.items = make([]*conn, 0, 128) + go cs.awaitRun() + return cs +} + +// 清理连接列表中的空隙 +func (cs *connections) clearGap() { + cs.gap = cs.gap[:0] + var gap = make([]int, 0, len(cs.items)) + for i, c := range cs.items { + if c == nil { + continue + } + c.idx = i + gap = append(gap, i) + } + + cs.gap = gap +} + +// 打开新连接 +func (cs *connections) open(c net.Conn) error { + // 如果存在连接空隙,则重用连接空隙中的索引,否则分配新的索引 + var idx int + var reuse bool + if len(cs.gap) > 0 { + idx = cs.gap[0] + cs.gap = cs.gap[1:] + reuse = true + } else { + idx = len(cs.items) + } + conn := new(conn).init(cs.ctx, cs, c, idx) + if reuse { + cs.items[idx] = conn + } else { + cs.items = append(cs.items, conn) + } + go conn.awaitRead() + return nil +} + +// 关闭连接 +func (cs *connections) close(c *conn) error { + if c == nil { + return nil + } + defer c.cancel() + // 如果连接索引是连接列表的最后一个索引,则直接删除连接对象,否则将连接对象置空,并将索引添加到连接空隙中 + if c.idx == len(cs.items)-1 { + cs.items = cs.items[:c.idx] + } else { + cs.items[c.idx] = nil + cs.gap = append(cs.gap, c.idx) + } + return c.Conn.Close() +} + +// 等待连接管理器的事件并处理 +func (cs *connections) awaitRun() { + clearGapTicker := time.NewTicker(time.Second * 30) + defer clearGapTicker.Stop() + + for { + select { + case <-cs.ctx.Done(): + return + case <-clearGapTicker.C: + cs.clearGap() + case a := <-cs.ch: + var err error + + switch v := a.(type) { + case *conn: + err = cs.close(v) + case net.Conn: + err = cs.open(v) + } + + if err != nil { + log.Error("connections.awaitRun", log.Any("err", err)) + } + } + } +} + +// Event 获取连接管理器的事件通道 +func (cs *connections) Event() chan<- any { + return cs.ch +} diff --git a/server/v2/core.go b/server/v2/core.go new file mode 100644 index 00000000..b1a32721 --- /dev/null +++ b/server/v2/core.go @@ -0,0 +1,9 @@ +package server + +type Core interface { + connectionManager +} + +type connectionManager interface { + Event() chan<- any +} diff --git a/server/v2/event_handler.go b/server/v2/event_handler.go deleted file mode 100644 index b21fc149..00000000 --- a/server/v2/event_handler.go +++ /dev/null @@ -1,69 +0,0 @@ -package server - -import ( - "github.com/panjf2000/ants/v2" - "github.com/panjf2000/gnet/v2" - "time" -) - -func newEventHandler(options *Options, trafficker Trafficker) (handler *eventHandler, err error) { - var wp *ants.Pool - if wp, err = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithNonblocking(true)); err != nil { - return - } - handler = &eventHandler{ - options: options, - trafficker: trafficker, - workerPool: wp, - } - return -} - -type ( - Trafficker interface { - OnBoot(options *Options) error - OnTraffic(c gnet.Conn, packet []byte) - } - eventHandler struct { - options *Options - trafficker Trafficker - workerPool *ants.Pool - } -) - -func (e *eventHandler) OnBoot(eng gnet.Engine) (action gnet.Action) { - if err := e.trafficker.OnBoot(e.options); err != nil { - action = gnet.Shutdown - } - return -} - -func (e *eventHandler) OnShutdown(eng gnet.Engine) { - return -} - -func (e *eventHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { - return -} - -func (e *eventHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) { - return -} - -func (e *eventHandler) OnTraffic(c gnet.Conn) (action gnet.Action) { - buf, err := c.Next(-1) - if err != nil { - return - } - var packet = make([]byte, len(buf)) - copy(packet, buf) - err = e.workerPool.Submit(func() { - e.trafficker.OnTraffic(c, packet) - }) - - return -} - -func (e *eventHandler) OnTick() (delay time.Duration, action gnet.Action) { - return -} diff --git a/server/v2/network.go b/server/v2/network.go new file mode 100644 index 00000000..0b2f2871 --- /dev/null +++ b/server/v2/network.go @@ -0,0 +1,11 @@ +package server + +import "context" + +type Network interface { + OnSetup(ctx context.Context, core Core) error + + OnRun(ctx context.Context) error + + OnShutdown() error +} diff --git a/server/v2/network/http.go b/server/v2/network/http.go new file mode 100644 index 00000000..36baac30 --- /dev/null +++ b/server/v2/network/http.go @@ -0,0 +1,53 @@ +package network + +import ( + "context" + "github.com/kercylan98/minotaur/server/v2" + "github.com/pkg/errors" + "net" + "net/http" + "time" +) + +func Http(addr string) server.Network { + return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()}) +} + +func HttpWithHandler[H http.Handler](addr string, handler H) server.Network { + c := &httpCore[H]{ + addr: addr, + handler: handler, + srv: &http.Server{ + Addr: addr, + Handler: handler, + DisableGeneralOptionsHandler: false, + }, + } + return c +} + +type httpCore[H http.Handler] struct { + addr string + handler H + srv *http.Server +} + +func (h *httpCore[H]) OnSetup(ctx context.Context, core server.Core) (err error) { + h.srv.BaseContext = func(listener net.Listener) context.Context { + return ctx + } + return +} + +func (h *httpCore[H]) OnRun(ctx context.Context) (err error) { + if err = h.srv.ListenAndServe(); errors.Is(err, http.ErrServerClosed) { + err = nil + } + return +} + +func (h *httpCore[H]) OnShutdown() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return h.srv.Shutdown(ctx) +} diff --git a/server/v2/network/http_serve.go b/server/v2/network/http_serve.go new file mode 100644 index 00000000..57cfc5ff --- /dev/null +++ b/server/v2/network/http_serve.go @@ -0,0 +1,7 @@ +package network + +import "net/http" + +type HttpServe struct { + *http.ServeMux +} diff --git a/server/v2/network/websocket.go b/server/v2/network/websocket.go new file mode 100644 index 00000000..aa97dea9 --- /dev/null +++ b/server/v2/network/websocket.go @@ -0,0 +1,57 @@ +package network + +import ( + "context" + "fmt" + "github.com/gobwas/ws" + "github.com/kercylan98/minotaur/server/v2" + "net/http" +) + +func WebSocket(addr, pattern string) server.Network { + return WebSocketWithHandler[*HttpServe](addr, &HttpServe{ServeMux: http.NewServeMux()}, func(handler *HttpServe, ws http.HandlerFunc) { + handler.Handle(fmt.Sprintf("GET %s", pattern), ws) + }) +} + +func WebSocketWithHandler[H http.Handler](addr string, handler H, upgraderHandlerFunc WebSocketUpgraderHandlerFunc[H]) server.Network { + c := &websocketCore[H]{ + httpCore: HttpWithHandler(addr, handler).(*httpCore[H]), + upgraderHandlerFunc: upgraderHandlerFunc, + } + return c +} + +type WebSocketUpgraderHandlerFunc[H http.Handler] func(handler H, ws http.HandlerFunc) + +type websocketCore[H http.Handler] struct { + *httpCore[H] + upgraderHandlerFunc WebSocketUpgraderHandlerFunc[H] + core server.Core +} + +func (w *websocketCore[H]) OnSetup(ctx context.Context, core server.Core) (err error) { + w.core = core + if err = w.httpCore.OnSetup(ctx, core); err != nil { + return + } + w.upgraderHandlerFunc(w.handler, w.onUpgrade) + return +} + +func (w *websocketCore[H]) OnRun(ctx context.Context) error { + return w.httpCore.OnRun(ctx) +} + +func (w *websocketCore[H]) OnShutdown() error { + return w.httpCore.OnShutdown() +} + +func (w *websocketCore[H]) onUpgrade(writer http.ResponseWriter, request *http.Request) { + conn, _, _, err := ws.UpgradeHTTP(request, writer) + if err != nil { + return + } + + w.core.Event() <- conn +} diff --git a/server/v2/options.go b/server/v2/options.go deleted file mode 100644 index 01b105dc..00000000 --- a/server/v2/options.go +++ /dev/null @@ -1,4 +0,0 @@ -package server - -type Options struct { -} diff --git a/server/v2/server.go b/server/v2/server.go index 6e0c7225..e8fa6a45 100644 --- a/server/v2/server.go +++ b/server/v2/server.go @@ -1,20 +1,59 @@ package server -import "github.com/panjf2000/gnet/v2" +import ( + "context" + "github.com/kercylan98/minotaur/utils/super" + "sync" +) -func NewServer(trafficker Trafficker) *Server { - srv := &Server{ - trafficker: trafficker, +type Server interface { + Run() error + Shutdown() error +} + +type server struct { + ctx *super.CancelContext + networks []Network + connections *connections +} + +func NewServer(networks ...Network) Server { + srv := &server{ + ctx: super.WithCancelContext(context.Background()), + networks: networks, } + srv.connections = new(connections).init(srv.ctx) return srv } -type Server struct { - trafficker Trafficker +func (s *server) Run() (err error) { + for _, network := range s.networks { + if err = network.OnSetup(s.ctx, s.connections); err != nil { + return + } + } + + var group = new(sync.WaitGroup) + for _, network := range s.networks { + group.Add(1) + go func(ctx *super.CancelContext, group *sync.WaitGroup, network Network) { + defer group.Done() + if err = network.OnRun(ctx); err != nil { + panic(err) + } + }(s.ctx, group, network) + } + group.Wait() + + return } -func (s *Server) Run(protoAddr string) (err error) { - var handler *eventHandler - handler, err = newEventHandler(new(Options), s.trafficker) - return gnet.Run(handler, protoAddr) +func (s *server) Shutdown() (err error) { + defer s.ctx.Cancel() + for _, network := range s.networks { + if err = network.OnShutdown(); err != nil { + return + } + } + return } diff --git a/server/v2/server_test.go b/server/v2/server_test.go index 14d989c1..c2df0d57 100644 --- a/server/v2/server_test.go +++ b/server/v2/server_test.go @@ -3,27 +3,24 @@ package server_test import ( "github.com/gin-gonic/gin" "github.com/kercylan98/minotaur/server/v2" - "github.com/kercylan98/minotaur/server/v2/traffickers" + "github.com/kercylan98/minotaur/server/v2/network" "net/http" "testing" ) func TestNewServer(t *testing.T) { - r := gin.New() + r := gin.Default() r.GET("/", func(context *gin.Context) { context.JSON(200, gin.H{ "ping": "pong", }) }) - srv := server.NewServer(traffickers.WebSocket(r, func(handler *gin.Engine, upgradeHandler func(writer http.ResponseWriter, request *http.Request) error) { + srv := server.NewServer(network.WebSocketWithHandler(":9999", r, func(handler *gin.Engine, ws http.HandlerFunc) { handler.GET("/ws", func(context *gin.Context) { - if err := upgradeHandler(context.Writer, context.Request); err != nil { - context.AbortWithError(500, err) - } + ws(context.Writer, context.Request) }) })) - - if err := srv.Run("tcp://:8080"); err != nil { + if err := srv.Run(); err != nil { panic(err) } } diff --git a/server/v2/traffickers/http.go b/server/v2/traffickers/http.go deleted file mode 100644 index d506eaf3..00000000 --- a/server/v2/traffickers/http.go +++ /dev/null @@ -1,56 +0,0 @@ -package traffickers - -import ( - "bufio" - "bytes" - "github.com/kercylan98/minotaur/server/v2" - "github.com/kercylan98/minotaur/utils/hub" - "github.com/panjf2000/gnet/v2" - netHttp "net/http" -) - -func Http[H netHttp.Handler](handler H) server.Trafficker { - return &http[H]{ - handler: handler, - ncb: func(c gnet.Conn, err error) error { - return nil - }, - } -} - -type http[H netHttp.Handler] struct { - handler H - rwp *hub.ObjectPool[*httpResponseWriter] - ncb func(c gnet.Conn, err error) error -} - -func (h *http[H]) OnBoot(options *server.Options) error { - h.rwp = hub.NewObjectPool[httpResponseWriter](func() *httpResponseWriter { - return new(httpResponseWriter) - }, func(data *httpResponseWriter) { - data.reset() - }) - return nil -} - -func (h *http[H]) OnTraffic(c gnet.Conn, packet []byte) { - var responseWriter *httpResponseWriter - defer func() { - if responseWriter == nil || !responseWriter.isHijack { - _ = c.Close() - } - }() - httpRequest, err := netHttp.ReadRequest(bufio.NewReader(bytes.NewReader(packet))) - if err != nil { - return - } - - responseWriter = h.rwp.Get() - responseWriter.init(c) - - h.handler.ServeHTTP(responseWriter, httpRequest) - if responseWriter.isHijack { - return - } - _ = responseWriter.Result().Write(c) -} diff --git a/server/v2/traffickers/http_recorder.go b/server/v2/traffickers/http_recorder.go deleted file mode 100644 index 031a1b17..00000000 --- a/server/v2/traffickers/http_recorder.go +++ /dev/null @@ -1,211 +0,0 @@ -package traffickers - -import ( - "bufio" - "bytes" - "fmt" - "github.com/panjf2000/gnet/v2" - "io" - "net" - netHttp "net/http" - "net/textproto" - "strconv" - "strings" - - "golang.org/x/net/http/httpguts" -) - -type httpResponseWriter struct { - Code int - HeaderMap netHttp.Header - Body *bytes.Buffer - Flushed bool - - conn *websocketConn - result *netHttp.Response - snapHeader netHttp.Header - wroteHeader bool - isHijack bool -} - -func (rw *httpResponseWriter) init(c gnet.Conn) { - rw.conn = &websocketConn{Conn: c} - rw.Code = 200 - rw.Body = new(bytes.Buffer) - rw.HeaderMap = make(netHttp.Header) - rw.isHijack = false -} - -func (rw *httpResponseWriter) reset() { - rw.conn = nil - rw.Code = 200 - rw.Body = nil - rw.HeaderMap = nil - rw.isHijack = false -} - -func (rw *httpResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { - if !rw.isHijack { - return rw.conn, bufio.NewReadWriter(bufio.NewReader(rw.conn), bufio.NewWriter(rw.conn)), nil - } - return nil, nil, netHttp.ErrHijacked -} - -func (rw *httpResponseWriter) Header() netHttp.Header { - m := rw.HeaderMap - if m == nil { - m = make(netHttp.Header) - rw.HeaderMap = m - } - return m -} - -func (rw *httpResponseWriter) writeHeader(b []byte, str string) { - if rw.wroteHeader { - return - } - if len(str) > 512 { - str = str[:512] - } - - m := rw.Header() - - _, hasType := m["Content-Type"] - hasTE := m.Get("Transfer-Encoding") != "" - if !hasType && !hasTE { - if b == nil { - b = []byte(str) - } - m.Set("Content-Type", netHttp.DetectContentType(b)) - } - - rw.WriteHeader(200) -} - -func (rw *httpResponseWriter) Write(buf []byte) (n int, err error) { - if rw.isHijack { - n = len(buf) - var wait = make(chan error) - if err = rw.conn.AsyncWrite(buf, func(c gnet.Conn, err error) error { - if err != nil { - wait <- err - } - return nil - }); err != nil { - return - } - err = <-wait - return - } - rw.writeHeader(buf, "") - if rw.Body != nil { - rw.Body.Write(buf) - } - return len(buf), nil -} - -func (rw *httpResponseWriter) WriteString(str string) (int, error) { - rw.writeHeader(nil, str) - if rw.Body != nil { - rw.Body.WriteString(str) - } - return len(str), nil -} - -func checkWriteHeaderCode(code int) { - if code < 100 || code > 999 { - panic(fmt.Sprintf("invalid WriteHeader code %v", code)) - } -} - -func (rw *httpResponseWriter) WriteHeader(code int) { - if rw.wroteHeader { - return - } - - checkWriteHeaderCode(code) - rw.Code = code - rw.wroteHeader = true - if rw.HeaderMap == nil { - rw.HeaderMap = make(netHttp.Header) - } - rw.snapHeader = rw.HeaderMap.Clone() -} - -func (rw *httpResponseWriter) Flush() { - if !rw.wroteHeader { - rw.WriteHeader(200) - } - rw.Flushed = true -} - -func (rw *httpResponseWriter) Result() *netHttp.Response { - if rw.result != nil { - return rw.result - } - if rw.snapHeader == nil { - rw.snapHeader = rw.HeaderMap.Clone() - } - res := &netHttp.Response{ - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - StatusCode: rw.Code, - Header: rw.snapHeader, - } - rw.result = res - if res.StatusCode == 0 { - res.StatusCode = 200 - } - res.Status = fmt.Sprintf("%03d %s", res.StatusCode, netHttp.StatusText(res.StatusCode)) - if rw.Body != nil { - res.Body = io.NopCloser(bytes.NewReader(rw.Body.Bytes())) - } else { - res.Body = netHttp.NoBody - } - res.ContentLength = parseContentLength(res.Header.Get("Content-Length")) - - if trailers, ok := rw.snapHeader["Trailer"]; ok { - res.Trailer = make(netHttp.Header, len(trailers)) - for _, k := range trailers { - for _, k := range strings.Split(k, ",") { - k = netHttp.CanonicalHeaderKey(textproto.TrimString(k)) - if !httpguts.ValidTrailerHeader(k) { - // Ignore since forbidden by RFC 7230, section 4.1.2. - continue - } - vv, ok := rw.HeaderMap[k] - if !ok { - continue - } - vv2 := make([]string, len(vv)) - copy(vv2, vv) - res.Trailer[k] = vv2 - } - } - } - for k, vv := range rw.HeaderMap { - if !strings.HasPrefix(k, netHttp.TrailerPrefix) { - continue - } - if res.Trailer == nil { - res.Trailer = make(netHttp.Header) - } - for _, v := range vv { - res.Trailer.Add(strings.TrimPrefix(k, netHttp.TrailerPrefix), v) - } - } - return res -} - -func parseContentLength(cl string) int64 { - cl = textproto.TrimString(cl) - if cl == "" { - return -1 - } - n, err := strconv.ParseUint(cl, 10, 63) - if err != nil { - return -1 - } - return int64(n) -} diff --git a/server/v2/traffickers/websocket.go b/server/v2/traffickers/websocket.go deleted file mode 100644 index 4b8c7b1a..00000000 --- a/server/v2/traffickers/websocket.go +++ /dev/null @@ -1,64 +0,0 @@ -package traffickers - -import ( - "fmt" - ws "github.com/gorilla/websocket" - "github.com/kercylan98/minotaur/server/v2" - "github.com/panjf2000/gnet/v2" - netHttp "net/http" -) - -func WebSocket[H netHttp.Handler](handler H, binder func(handler H, upgradeHandler func(writer netHttp.ResponseWriter, request *netHttp.Request) error)) server.Trafficker { - w := &websocket[H]{ - http: Http(handler).(*http[H]), - binder: binder, - upgrader: &ws.Upgrader{ - ReadBufferSize: 4096, - WriteBufferSize: 4096, - CheckOrigin: func(r *netHttp.Request) bool { - return true - }, - }, - } - binder(handler, w.OnUpgrade) - return w -} - -type websocket[H netHttp.Handler] struct { - *http[H] - binder func(handler H, upgradeHandler func(writer netHttp.ResponseWriter, request *netHttp.Request) error) - upgrader *ws.Upgrader -} - -func (w *websocket[H]) OnBoot(options *server.Options) error { - return w.http.OnBoot(options) -} - -func (w *websocket[H]) OnTraffic(c gnet.Conn, packet []byte) { - w.http.OnTraffic(c, packet) -} - -func (w *websocket[H]) OnUpgrade(writer netHttp.ResponseWriter, request *netHttp.Request) (err error) { - var ( - ip string - conn *ws.Conn - ) - - ip = request.Header.Get("X-Real-IP") - conn, err = w.upgrader.Upgrade(writer, request, nil) - if err != nil { - return - } - - fmt.Println("opened", ip) - go func() { - for { - mt, data, err := conn.ReadMessage() - if err != nil { - continue - } - conn.WriteMessage(mt, data) - } - }() - return nil -} diff --git a/server/v2/traffickers/websocket_conn.go b/server/v2/traffickers/websocket_conn.go deleted file mode 100644 index c27dccb0..00000000 --- a/server/v2/traffickers/websocket_conn.go +++ /dev/null @@ -1,16 +0,0 @@ -package traffickers - -import ( - "github.com/panjf2000/gnet/v2" - "time" -) - -type websocketConn struct { - gnet.Conn - deadline time.Time -} - -func (c *websocketConn) SetDeadline(t time.Time) error { - c.deadline = t - return nil -} diff --git a/utils/super/context.go b/utils/super/context.go new file mode 100644 index 00000000..09a90c2a --- /dev/null +++ b/utils/super/context.go @@ -0,0 +1,20 @@ +package super + +import "context" + +func WithCancelContext(ctx context.Context) *CancelContext { + ctx, cancel := context.WithCancel(ctx) + return &CancelContext{ + Context: ctx, + cancel: cancel, + } +} + +type CancelContext struct { + context.Context + cancel func() +} + +func (c *CancelContext) Cancel() { + c.cancel() +}