From 5ef43167e2857d8610ab681dc37bcd46e97ba919 Mon Sep 17 00:00:00 2001 From: Ehsan Noureddin Moosa Date: Sun, 13 Feb 2022 18:03:31 +0400 Subject: [PATCH] implement websocket for fasthttp bundle, still does not pass stress test --- exmples/mixed-jsonrpc-rest/bench.sh | 2 +- exmples/mixed-jsonrpc-rest/server/main.go | 4 +-- go.mod | 2 +- go.sum | 2 ++ std/bundle/fasthttp/bundle.go | 38 ++++++++++++++++++----- std/bundle/fasthttp/conn_ws.go | 2 +- 6 files changed, 37 insertions(+), 13 deletions(-) diff --git a/exmples/mixed-jsonrpc-rest/bench.sh b/exmples/mixed-jsonrpc-rest/bench.sh index 773d693c..4cbc9157 100644 --- a/exmples/mixed-jsonrpc-rest/bench.sh +++ b/exmples/mixed-jsonrpc-rest/bench.sh @@ -1,3 +1,3 @@ #tcpkali --ws --dump-one -c 1000 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:7080 #tcpkali --ws -c 1000 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:7080 -T 30 -tcpkali --ws -c 100 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:7080 -T 30 \ No newline at end of file +tcpkali --ws -c 100 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:80 -T 30 \ No newline at end of file diff --git a/exmples/mixed-jsonrpc-rest/server/main.go b/exmples/mixed-jsonrpc-rest/server/main.go index 31a1b1d8..7e9d5fd0 100644 --- a/exmples/mixed-jsonrpc-rest/server/main.go +++ b/exmples/mixed-jsonrpc-rest/server/main.go @@ -19,11 +19,11 @@ func main() { ), ronykit.RegisterBundle( fastws.MustNew( - fastws.Listen("tcp4://0.0.0.0:7080"), + fastws.Listen("tcp4://0.0.0.0:80"), fastws.WithPredicateKey("cmd"), ), fasthttp.MustNew( - fasthttp.Listen(":7070"), + fasthttp.Listen(":81"), ), ), ronykit.RegisterService(NewSample().Desc().Generate()), diff --git a/go.mod b/go.mod index e325123c..e90d7945 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,9 @@ go 1.17 require ( github.com/brianvoe/gofakeit/v6 v6.10.0 github.com/fasthttp/router v1.4.6 + github.com/fasthttp/websocket v1.4.6 github.com/gobwas/ws v1.1.0 github.com/goccy/go-json v0.9.4 - github.com/gorilla/websocket v1.4.1 github.com/panjf2000/gnet v1.6.4 github.com/smartystreets/goconvey v1.7.2 github.com/valyala/fasthttp v1.33.0 diff --git a/go.sum b/go.sum index 2e5368b0..d3315b89 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fasthttp/router v1.4.6 h1:KfETdHGBnvoBfBHeRe/8TVYz8Bp/mASBVC5UXO9CpZI= github.com/fasthttp/router v1.4.6/go.mod h1:Iv800u3hYFNuBBcmJNs/VBVpub+JfBihGBp5spSocbw= +github.com/fasthttp/websocket v1.4.6 h1:Zi0Z6sUUvLmtxXd/kMLVBVJPvck3bqPqMWUbwhUy0R8= +github.com/fasthttp/websocket v1.4.6/go.mod h1:n0BlOQvJdPbTuBkZT0O5+jk/sp/1/VCzquR1BehI2F4= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= diff --git a/std/bundle/fasthttp/bundle.go b/std/bundle/fasthttp/bundle.go index 10159f9a..35d9abc5 100644 --- a/std/bundle/fasthttp/bundle.go +++ b/std/bundle/fasthttp/bundle.go @@ -1,12 +1,13 @@ package fasthttp import ( + "encoding/json" "fmt" "net" "sync" "github.com/fasthttp/router" - "github.com/goccy/go-json" + "github.com/fasthttp/websocket" "github.com/ronaksoft/ronykit" "github.com/ronaksoft/ronykit/utils" "github.com/valyala/fasthttp" @@ -28,9 +29,10 @@ type bundle struct { httpMux *mux - wsRoutes map[string]*routeData - predicateKey string - wsEndpoint string + wsRoutes map[string]*routeData + wsSwitchProtocol websocket.FastHTTPUpgrader + wsEndpoint string + predicateKey string } func New(opts ...Option) (*bundle, error) { @@ -52,7 +54,7 @@ func New(opts ...Option) (*bundle, error) { if r.wsEndpoint != "" { entryRouter := router.New() entryRouter.GET(r.wsEndpoint, r.wsHandler) - entryRouter.Handle(router.MethodWild, "/", r.httpHandler) + entryRouter.NotFound = r.httpHandler r.srv.Handler = entryRouter.Handler } else { r.srv.Handler = r.httpHandler @@ -85,7 +87,27 @@ func (b *bundle) httpHandler(ctx *fasthttp.RequestCtx) { b.connPool.Put(c) } -func (b *bundle) wsHandler(ctx *fasthttp.RequestCtx) {} +func (b *bundle) wsHandler(ctx *fasthttp.RequestCtx) { + _ = b.wsSwitchProtocol.Upgrade(ctx, + func(conn *websocket.Conn) { + wsc := &wsConn{ + kv: map[string]string{}, + id: 0, + clientIP: conn.RemoteAddr().String(), + c: conn, + } + b.d.OnOpen(wsc) + for { + _, in, err := conn.ReadMessage() + if err != nil { + break + } + go b.d.OnMessage(wsc, in) + } + b.d.OnClose(wsc.id) + }, + ) +} func (b *bundle) Register(svc ronykit.Service) { for _, contract := range svc.Contracts() { @@ -145,13 +167,13 @@ func (b *bundle) Dispatch(c ronykit.Conn, in []byte) (ronykit.DispatchFunc, erro case *httpConn: return b.dispatchHTTP(c, in) case *wsConn: - return b.dispatchWS(c, in) + return b.dispatchWS(in) default: panic("BUG!! incorrect connection") } } -func (b *bundle) dispatchWS(c *wsConn, in []byte) (ronykit.DispatchFunc, error) { +func (b *bundle) dispatchWS(in []byte) (ronykit.DispatchFunc, error) { inputMsgContainer := &incomingMessage{} err := json.Unmarshal(in, inputMsgContainer) if err != nil { diff --git a/std/bundle/fasthttp/conn_ws.go b/std/bundle/fasthttp/conn_ws.go index 73b6002a..6f902f8f 100644 --- a/std/bundle/fasthttp/conn_ws.go +++ b/std/bundle/fasthttp/conn_ws.go @@ -3,7 +3,7 @@ package fasthttp import ( "sync" - "github.com/gorilla/websocket" + "github.com/fasthttp/websocket" ) type wsConn struct {