Skip to content

Commit

Permalink
implement websocket for fasthttp bundle, still does not pass stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannm committed Feb 13, 2022
1 parent 562db44 commit 5ef4316
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 13 deletions.
2 changes: 1 addition & 1 deletion exmples/mixed-jsonrpc-rest/bench.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#tcpkali --ws --dump-one -c 1000 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:7080
#tcpkali --ws -c 1000 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:7080 -T 30
tcpkali --ws -c 100 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:7080 -T 30
tcpkali --ws -c 100 -m '{"hdr": {"cmd": "echoRequest"}, "payload": {"randomID": 1234}}' -r 100 127.0.0.1:80 -T 30
4 changes: 2 additions & 2 deletions exmples/mixed-jsonrpc-rest/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ func main() {
),
ronykit.RegisterBundle(
fastws.MustNew(
fastws.Listen("tcp4://0.0.0.0:7080"),
fastws.Listen("tcp4://0.0.0.0:80"),
fastws.WithPredicateKey("cmd"),
),
fasthttp.MustNew(
fasthttp.Listen(":7070"),
fasthttp.Listen(":81"),
),
),
ronykit.RegisterService(NewSample().Desc().Generate()),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ go 1.17
require (
github.com/brianvoe/gofakeit/v6 v6.10.0
github.com/fasthttp/router v1.4.6
github.com/fasthttp/websocket v1.4.6
github.com/gobwas/ws v1.1.0
github.com/goccy/go-json v0.9.4
github.com/gorilla/websocket v1.4.1
github.com/panjf2000/gnet v1.6.4
github.com/smartystreets/goconvey v1.7.2
github.com/valyala/fasthttp v1.33.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fasthttp/router v1.4.6 h1:KfETdHGBnvoBfBHeRe/8TVYz8Bp/mASBVC5UXO9CpZI=
github.com/fasthttp/router v1.4.6/go.mod h1:Iv800u3hYFNuBBcmJNs/VBVpub+JfBihGBp5spSocbw=
github.com/fasthttp/websocket v1.4.6 h1:Zi0Z6sUUvLmtxXd/kMLVBVJPvck3bqPqMWUbwhUy0R8=
github.com/fasthttp/websocket v1.4.6/go.mod h1:n0BlOQvJdPbTuBkZT0O5+jk/sp/1/VCzquR1BehI2F4=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
Expand Down
38 changes: 30 additions & 8 deletions std/bundle/fasthttp/bundle.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package fasthttp

import (
"encoding/json"
"fmt"
"net"
"sync"

"github.com/fasthttp/router"
"github.com/goccy/go-json"
"github.com/fasthttp/websocket"
"github.com/ronaksoft/ronykit"
"github.com/ronaksoft/ronykit/utils"
"github.com/valyala/fasthttp"
Expand All @@ -28,9 +29,10 @@ type bundle struct {

httpMux *mux

wsRoutes map[string]*routeData
predicateKey string
wsEndpoint string
wsRoutes map[string]*routeData
wsSwitchProtocol websocket.FastHTTPUpgrader
wsEndpoint string
predicateKey string
}

func New(opts ...Option) (*bundle, error) {
Expand All @@ -52,7 +54,7 @@ func New(opts ...Option) (*bundle, error) {
if r.wsEndpoint != "" {
entryRouter := router.New()
entryRouter.GET(r.wsEndpoint, r.wsHandler)
entryRouter.Handle(router.MethodWild, "/", r.httpHandler)
entryRouter.NotFound = r.httpHandler
r.srv.Handler = entryRouter.Handler
} else {
r.srv.Handler = r.httpHandler
Expand Down Expand Up @@ -85,7 +87,27 @@ func (b *bundle) httpHandler(ctx *fasthttp.RequestCtx) {
b.connPool.Put(c)
}

func (b *bundle) wsHandler(ctx *fasthttp.RequestCtx) {}
func (b *bundle) wsHandler(ctx *fasthttp.RequestCtx) {
_ = b.wsSwitchProtocol.Upgrade(ctx,
func(conn *websocket.Conn) {
wsc := &wsConn{
kv: map[string]string{},
id: 0,
clientIP: conn.RemoteAddr().String(),
c: conn,
}
b.d.OnOpen(wsc)
for {
_, in, err := conn.ReadMessage()
if err != nil {
break
}
go b.d.OnMessage(wsc, in)
}
b.d.OnClose(wsc.id)
},
)
}

func (b *bundle) Register(svc ronykit.Service) {
for _, contract := range svc.Contracts() {
Expand Down Expand Up @@ -145,13 +167,13 @@ func (b *bundle) Dispatch(c ronykit.Conn, in []byte) (ronykit.DispatchFunc, erro
case *httpConn:
return b.dispatchHTTP(c, in)
case *wsConn:
return b.dispatchWS(c, in)
return b.dispatchWS(in)
default:
panic("BUG!! incorrect connection")
}
}

func (b *bundle) dispatchWS(c *wsConn, in []byte) (ronykit.DispatchFunc, error) {
func (b *bundle) dispatchWS(in []byte) (ronykit.DispatchFunc, error) {
inputMsgContainer := &incomingMessage{}
err := json.Unmarshal(in, inputMsgContainer)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion std/bundle/fasthttp/conn_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package fasthttp
import (
"sync"

"github.com/gorilla/websocket"
"github.com/fasthttp/websocket"
)

type wsConn struct {
Expand Down

0 comments on commit 5ef4316

Please sign in to comment.