Skip to content

Commit

Permalink
refactor to fix rare case of panic
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannm committed Jul 29, 2022
1 parent 971df06 commit 6b6e39f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 42 deletions.
5 changes: 3 additions & 2 deletions bridge_north.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type GatewayDelegate interface {
// OnClose must be called whenever the connection is gone.
OnClose(connID uint64)
// OnMessage must be called whenever a new message arrives.
OnMessage(c Conn, msg []byte)
OnMessage(c Conn, wf WriteFunc, msg []byte)
}

// northBridge is a container component that connects EdgeServer with a Gateway type Bundle.
Expand All @@ -51,8 +51,9 @@ func (n *northBridge) OnClose(connID uint64) {
// Maybe later we can do something
}

func (n *northBridge) OnMessage(conn Conn, msg []byte) {
func (n *northBridge) OnMessage(conn Conn, wf WriteFunc, msg []byte) {
ctx := n.acquireCtx(conn)
ctx.wf = wf

arg, err := n.gw.Dispatch(ctx, msg)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func newContext() *Context {

// execute the Context with the provided ExecuteArg. It implements ExecuteFunc
func (ctx *Context) execute(arg ExecuteArg, c Contract) {
ctx.wf = arg.WriteFunc
ctx.
Set(CtxServiceName, arg.ServiceName).
Set(CtxContractID, arg.ContractID).
Expand Down
25 changes: 14 additions & 11 deletions edge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,20 @@ type testGateway struct {
var _ ronykit.Gateway = (*testGateway)(nil)

func (t *testGateway) Send(c *testConn, msg []byte) {
t.d.OnMessage(c, msg)
t.d.OnMessage(
c,
func(conn ronykit.Conn, e ronykit.Envelope) error {
b, err := ronykit.MarshalMessage(e.GetMsg())
if err != nil {
return err
}

_, err = conn.Write(b)

return err
},
msg,
)
}

func (t testGateway) Start(_ context.Context) error {
Expand All @@ -47,16 +60,6 @@ func (t testGateway) Dispatch(ctx *ronykit.Context, in []byte) (ronykit.ExecuteA
ctx.In().SetMsg(ronykit.RawMessage(in))

return ronykit.ExecuteArg{
WriteFunc: func(conn ronykit.Conn, e ronykit.Envelope) error {
b, err := ronykit.MarshalMessage(e.GetMsg())
if err != nil {
return err
}

_, err = conn.Write(b)

return err
},
ServiceName: "testService",
ContractID: "testService.1",
Route: "someRoute",
Expand Down
1 change: 0 additions & 1 deletion ronykit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type Bundle interface {

type (
ExecuteArg struct {
WriteFunc
ServiceName string
ContractID string
Route string
Expand Down
6 changes: 2 additions & 4 deletions std/gateway/fasthttp/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (b *bundle) wsHandler(ctx *fasthttp.RequestCtx) {
}

func (b *bundle) wsHandlerExec(buf *buf.Bytes, wsc *wsConn) {
b.d.OnMessage(wsc, *buf.Bytes())
b.d.OnMessage(wsc, b.wsWriteFunc, *buf.Bytes())
buf.Release()
}

Expand All @@ -228,7 +228,6 @@ func (b *bundle) wsDispatch(ctx *ronykit.Context, in []byte) (ronykit.ExecuteArg
SetMsg(msg)

return ronykit.ExecuteArg{
WriteFunc: b.wsWriteFunc,
ServiceName: routeData.ServiceName,
ContractID: routeData.ContractID,
Route: routeData.Predicate,
Expand Down Expand Up @@ -262,7 +261,7 @@ func (b *bundle) httpHandler(ctx *fasthttp.RequestCtx) {

c.ctx = ctx
b.d.OnOpen(c)
b.d.OnMessage(c, ctx.PostBody())
b.d.OnMessage(c, b.httpWriteFunc, ctx.PostBody())
b.d.OnClose(c.ConnID())

b.connPool.Put(c)
Expand Down Expand Up @@ -316,7 +315,6 @@ func (b *bundle) httpDispatch(ctx *ronykit.Context, in []byte) (ronykit.ExecuteA
SetMsg(m)

return ronykit.ExecuteArg{
WriteFunc: b.httpWriteFunc,
ServiceName: routeData.ServiceName,
ContractID: routeData.ContractID,
Route: fmt.Sprintf("%s %s", routeData.Method, routeData.Path),
Expand Down
20 changes: 0 additions & 20 deletions std/gateway/fastws/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,32 +98,12 @@ func (b *bundle) Dispatch(ctx *ronykit.Context, in []byte) (ronykit.ExecuteArg,
SetMsg(msg)

return ronykit.ExecuteArg{
WriteFunc: b.writeFunc,
ServiceName: routeData.ServiceName,
ContractID: routeData.ContractID,
Route: routeData.Predicate,
}, nil
}

func (b *bundle) writeFunc(conn ronykit.Conn, e ronykit.Envelope) error {
outputMsgContainer := b.rpcOutFactory()
outputMsgContainer.SetPayload(e.GetMsg())
e.WalkHdr(func(key string, val string) bool {
outputMsgContainer.SetHdr(key, val)

return true
})

data, err := outputMsgContainer.Marshal()
if err != nil {
return errors.Wrap(ronykit.ErrEncodeOutgoingMessageFailed, err)
}

_, err = conn.Write(data)

return err
}

func (b *bundle) Start(_ context.Context) error {
go func() {
opts := []gnet.Option{
Expand Down
27 changes: 24 additions & 3 deletions std/gateway/fastws/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package fastws

import (
"bytes"
"errors"
builtinErr "errors"
"io"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/clubpay/ronykit"
"github.com/clubpay/ronykit/internal/errors"
"github.com/clubpay/ronykit/utils"
"github.com/clubpay/ronykit/utils/buf"
"github.com/gobwas/ws"
Expand All @@ -33,8 +35,27 @@ func newGateway(b *bundle) (*gateway, error) {
return gw, nil
}

func (gw *gateway) writeFunc(conn ronykit.Conn, e ronykit.Envelope) error {
outputMsgContainer := gw.b.rpcOutFactory()
outputMsgContainer.SetPayload(e.GetMsg())
e.WalkHdr(func(key string, val string) bool {
outputMsgContainer.SetHdr(key, val)

return true
})

data, err := outputMsgContainer.Marshal()
if err != nil {
return errors.Wrap(ronykit.ErrEncodeOutgoingMessageFailed, err)
}

_, err = conn.Write(data)

return err
}

func (e *gateway) reactFunc(wsc *wsConn, payload *buf.Bytes, n int) {
e.b.d.OnMessage(wsc, (*payload.Bytes())[:n])
e.b.d.OnMessage(wsc, e.writeFunc, (*payload.Bytes())[:n])
payload.Release()
}

Expand Down Expand Up @@ -143,7 +164,7 @@ func (e *gateway) OnTraffic(c gnet.Conn) gnet.Action {

payloadBuffer := buf.GetLen(int(hdr.Length))
n, err := wsc.r.Read(*payloadBuffer.Bytes())
if err != nil && !errors.Is(err, io.EOF) {
if err != nil && !builtinErr.Is(err, io.EOF) {
return gnet.None
}

Expand Down

0 comments on commit 6b6e39f

Please sign in to comment.