Skip to content

Commit

Permalink
Update gnet to v2.6.2 and add WebSocket stability check
Browse files Browse the repository at this point in the history
Upgraded gnet dependency from v2.6.1 to v2.6.2 across relevant modules. Introduced a new test for WebSocket stability and implemented a timeout mechanism for WebSocket requests. Adjusted Echo service response handling to improve clarity.
  • Loading branch information
ehsannm committed Nov 28, 2024
1 parent 63bb1bc commit 2cbc4e1
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 15 deletions.
2 changes: 1 addition & 1 deletion std/gateways/fastws/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/clubpay/ronykit/kit v0.17.19
github.com/gobwas/ws v1.4.0
github.com/panjf2000/gnet/v2 v2.6.1
github.com/panjf2000/gnet/v2 v2.6.2
)

require (
Expand Down
4 changes: 2 additions & 2 deletions std/gateways/fastws/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ github.com/onsi/ginkgo/v2 v2.20.0 h1:PE84V2mHqoT1sglvHc8ZdQtPcwmvvt29WLEEO3xmdZw
github.com/onsi/gomega v1.34.1 h1:EUMJIKUjM8sKjYbtxQI9A4z2o+rruxnzNvpknOXie6k=
github.com/panjf2000/ants/v2 v2.10.0 h1:zhRg1pQUtkyRiOFo2Sbqwjp0GfBNo9cUY2/Grpx1p+8=
github.com/panjf2000/ants/v2 v2.10.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/panjf2000/gnet/v2 v2.6.1 h1:IPkv4ZHwdPSqdKve6A4v0PsbieeK6gooIhFfk5GFBgo=
github.com/panjf2000/gnet/v2 v2.6.1/go.mod h1:HpNv+iQrIOeil1eyhdnKDlui7jivyMf0K3xwaeHKnh8=
github.com/panjf2000/gnet/v2 v2.6.2 h1:f6WOlfiaMtblK5RvuiXiAraDlawS0RvoI2LSE4ZaAWc=
github.com/panjf2000/gnet/v2 v2.6.2/go.mod h1:HpNv+iQrIOeil1eyhdnKDlui7jivyMf0K3xwaeHKnh8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
Expand Down
17 changes: 15 additions & 2 deletions stub/stub_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stub

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -188,7 +189,7 @@ func (wCtx *WebsocketCtx) receiver(c *websocket.Conn) {
continue
}

// if this is a reply message we return it to the pending channel
// if this is a reply message, we return it to the pending channel
wCtx.pendingMtx.Lock()
ch, ok := wCtx.pending[rpcIn.GetID()]
wCtx.pendingMtx.Unlock()
Expand Down Expand Up @@ -322,6 +323,9 @@ type WebsocketRequest struct {
// If this is nil, the response will be ignored. However, the response will be caught by
// the default handler if it is set.
Callback RPCMessageHandler
// Timeout if is set, then the callback will be called with ErrTimeout, in case of we didn't
// receive the response in time.
Timeout time.Duration
}

const (
Expand Down Expand Up @@ -369,26 +373,34 @@ func (wCtx *WebsocketCtx) Do(ctx context.Context, req WebsocketRequest) error {
outC.Release()

if req.Callback != nil {
go wCtx.waitForMessage(ctx, req.ID, req.ResMsg, req.Callback)
go wCtx.waitForMessage(ctx, req.ID, req.ResMsg, req.Callback, req.Timeout)
}

return nil
}

func (wCtx *WebsocketCtx) waitForMessage(
ctx context.Context, id string, res kit.Message, cb RPCMessageHandler,
timeout time.Duration,
) {
resCh := make(chan kit.IncomingRPCContainer, 1)
wCtx.pendingMtx.Lock()
wCtx.pending[id] = resCh
wCtx.pendingMtx.Unlock()

if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

select {
case c := <-resCh:
err := c.ExtractMessage(res)
cb(ctx, res, c.GetHdrMap(), err)

case <-ctx.Done():
cb(ctx, res, nil, ErrTimeout)
}

wCtx.pendingMtx.Lock()
Expand All @@ -412,4 +424,5 @@ func (c containerTraceCarrier) Set(key string, value string) {
var (
ErrBadHandshake = websocket.ErrBadHandshake
_ = ErrBadHandshake
ErrTimeout = errors.New("timeout")
)
16 changes: 8 additions & 8 deletions testenv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ module ronykit/testenv
go 1.22

require (
github.com/clubpay/ronykit/kit v0.17.10
github.com/clubpay/ronykit/std/clusters/p2pcluster v0.17.10
github.com/clubpay/ronykit/std/clusters/rediscluster v0.17.10
github.com/clubpay/ronykit/std/gateways/fasthttp v0.17.10
github.com/clubpay/ronykit/std/gateways/fastws v0.17.10
github.com/clubpay/ronykit/stub v0.17.10
github.com/clubpay/ronykit/kit v0.17.19
github.com/clubpay/ronykit/std/clusters/p2pcluster v0.17.19
github.com/clubpay/ronykit/std/clusters/rediscluster v0.17.19
github.com/clubpay/ronykit/std/gateways/fasthttp v0.17.19
github.com/clubpay/ronykit/std/gateways/fastws v0.17.19
github.com/clubpay/ronykit/stub v0.17.19
github.com/orlangure/gnomock v0.31.0
github.com/redis/go-redis/v9 v9.6.1
github.com/redis/go-redis/v9 v9.7.0
github.com/smartystreets/goconvey v1.8.1
go.uber.org/fx v1.22.2
go.uber.org/fx v1.23.0
)

2 changes: 1 addition & 1 deletion testenv/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var EchoService kit.ServiceBuilder = desc.NewService("EchoService").
func(ctx *kit.Context) {
req, _ := ctx.In().GetMsg().(*EchoRequest)

ctx.Out().
ctx.In().Reply().
SetMsg(
&EchoResponse{
Embedded: req.Embedded,
Expand Down
68 changes: 67 additions & 1 deletion testenv/stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"fmt"
"math/rand"
"net/http"
"sync"
"testing"
"time"

"ronykit/testenv/services"

Expand Down Expand Up @@ -175,7 +177,8 @@ func stubWithAutoRun2(t *testing.T, opt fx.Option) func(c C) {
func TestWebsocket(t *testing.T) {
Convey("Websocket", t, func(c C) {
testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){
"Websocket Stub [Connect, Reconnect, Disconnect]": stubWebsocket,
//"Websocket Stub [Connect, Reconnect, Disconnect]": stubWebsocket,
"Stability Check": stubWebsocketStability,
}
for title, fn := range testCases {
Convey(title+"FastHTTP",
Expand Down Expand Up @@ -238,6 +241,69 @@ func stubWebsocket(t *testing.T, opt fx.Option) func(c C) {
}
}

func stubWebsocketStability(t *testing.T, opt fx.Option) func(c C) {
ctx := context.Background()

return func(c C) {
Prepare(
t, c,
fx.Options(
opt,
),
)

time.Sleep(1 * time.Second)
wsCtx := stub.New("127.0.0.1:8082", stub.WithLogger(&kitLogger{})).
Websocket(
stub.WithPredicateKey("cmd"),
stub.WithPingTime(time.Second*5),
)

err := wsCtx.Connect(ctx, "/agent/ws")
c.So(err, ShouldBeNil)

wg := sync.WaitGroup{}
for range 200 {
X := utils.RandomID(10)
XP := utils.RandomID(10)

// Set Key to instance 1
resp := &services.EchoResponse{}

wg.Add(1)
err = wsCtx.Do(
ctx,
stub.WebsocketRequest{
Predicate: "echo",
MessageType: stub.WebsocketText,
ReqMsg: &services.EchoRequest{
Embedded: services.Embedded{
X: X,
XP: XP,
},
Input: XP,
},
ResMsg: resp,
ReqHdr: nil,
Callback: func(ctx context.Context, msg kit.Message, hdr stub.Header, err error) {
if err != nil {
c.So(err, ShouldEqual, stub.ErrTimeout)
} else {
c.So(resp.X, ShouldEqual, X)
c.So(resp.XP, ShouldEqual, XP)
}

wg.Done()
},
Timeout: time.Second,
},
)
c.So(err, ShouldBeNil)
wg.Wait()
}
}
}

func TestHttp(t *testing.T) {
Convey("HTTP", t, func(c C) {
testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){
Expand Down

0 comments on commit 2cbc4e1

Please sign in to comment.