Skip to content

Commit

Permalink
Merge pull request #15
Browse files Browse the repository at this point in the history
* chore: small refactor

* chore: cleanup and refactor testenv tests
  • Loading branch information
ehsannm authored Jul 22, 2024
1 parent d8efd6f commit 5de1f90
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 188 deletions.
9 changes: 6 additions & 3 deletions kit/bridge_south.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ func (sb *southBridge) OnMessage(data []byte) {
func (sb *southBridge) createSenderConn(
carrier *envelopeCarrier, timeout time.Duration, callbackFn func(*envelopeCarrier),
) *clusterConn {
rxCtx, cancelFn := context.WithCancel(context.Background())
ctx, cancelFn := context.WithCancel(context.Background())
if timeout > 0 {
rxCtx, cancelFn = context.WithTimeout(rxCtx, timeout)
ctx, cancelFn = context.WithTimeout(ctx, timeout)
}

conn := &clusterConn{
ctx: rxCtx,
ctx: ctx,
cf: cancelFn,
callbackFn: callbackFn,
cluster: sb.cb,
Expand Down Expand Up @@ -253,6 +253,8 @@ func (sb *southBridge) wrapWithCoordinator(c Contract) Contract {

func (sb *southBridge) genForwarderHandler(sel EdgeSelectorFunc) HandlerFunc {
return func(ctx *Context) {
// if it is already a forwarded request, we should forward it again.
// This is required to avoid infinite loops.
if ctx.forwarded {
return
}
Expand Down Expand Up @@ -287,6 +289,7 @@ func (sb *southBridge) genForwarderHandler(sel EdgeSelectorFunc) HandlerFunc {
ctx.Error(conn.Err())
case <-ctx.ctx.Done():
ctx.Error(ctx.ctx.Err())
conn.cf()
}

// We should stop executing next handlers, since our request has been executed on
Expand Down
171 changes: 171 additions & 0 deletions testenv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"testing"
"time"

"github.com/clubpay/ronykit/kit"
"github.com/clubpay/ronykit/kit/common"
"github.com/clubpay/ronykit/kit/utils"
"github.com/clubpay/ronykit/std/clusters/p2pcluster"
"github.com/clubpay/ronykit/std/clusters/rediscluster"
"github.com/clubpay/ronykit/std/gateways/fasthttp"
"github.com/clubpay/ronykit/std/gateways/fastws"
"github.com/orlangure/gnomock"
redisContainer "github.com/orlangure/gnomock/preset/redis"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -105,6 +112,170 @@ func invokeRedisMonitor(lc fx.Lifecycle, _ *redis.Client) {
}()
}

func invokeEdgeServerFastHttp(_ string, port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle, _ *redis.Client) {
edge := kit.NewServer(
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fasthttp.MustNew(
fasthttp.WithDisableHeaderNamesNormalizing(),
fasthttp.Listen(fmt.Sprintf(":%d", port)),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func invokeEdgeServerWithFastWS(port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle) {
edge := kit.NewServer(
kit.ReusePort(true),
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fastws.MustNew(
fastws.WithPredicateKey("cmd"),
fastws.Listen(fmt.Sprintf("tcp4://0.0.0.0:%d", port)),
fastws.WithLogger(common.NewStdLogger()),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func invokeEdgeServerWithRedis(_ string, port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle, _ *redis.Client) {
edge := kit.NewServer(
kit.WithCluster(
rediscluster.MustNew(
"testCluster",
rediscluster.WithRedisClient(utils.Must(getRedis())),
rediscluster.WithGCPeriod(time.Second*3),
),
),
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fasthttp.MustNew(
fasthttp.WithDisableHeaderNamesNormalizing(),
fasthttp.Listen(fmt.Sprintf(":%d", port)),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func invokeEdgeServerWithP2P(_ string, port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle, _ *redis.Client) {
edge := kit.NewServer(
kit.WithCluster(
p2pcluster.New(
"testCluster",
p2pcluster.WithLogger(common.NewStdLogger()),
p2pcluster.WithBroadcastInterval(time.Second),
),
),
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fasthttp.MustNew(
fasthttp.WithDisableHeaderNamesNormalizing(),
fasthttp.Listen(fmt.Sprintf(":%d", port)),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func Prepare(t *testing.T, c C, option ...fx.Option) {
_, _ = c.Println('\n')
opts := []fx.Option{
Expand Down
94 changes: 0 additions & 94 deletions testenv/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@ package testenv
import (
"context"
"encoding/json"
"fmt"
"net/http"
"testing"
"time"

"ronykit/testenv/services"

"github.com/clubpay/ronykit/kit"
"github.com/clubpay/ronykit/kit/common"
"github.com/clubpay/ronykit/kit/utils"
"github.com/clubpay/ronykit/std/clusters/p2pcluster"
"github.com/clubpay/ronykit/std/clusters/rediscluster"
"github.com/clubpay/ronykit/std/gateways/fasthttp"
"github.com/clubpay/ronykit/stub"
"github.com/redis/go-redis/v9"
. "github.com/smartystreets/goconvey/convey"
"go.uber.org/fx"
)
Expand All @@ -41,94 +35,6 @@ func TestKitWithCluster(t *testing.T) {
})
}

func invokeEdgeServerWithRedis(_ string, port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle, _ *redis.Client) {
edge := kit.NewServer(
kit.WithCluster(
rediscluster.MustNew(
"testCluster",
rediscluster.WithRedisClient(utils.Must(getRedis())),
rediscluster.WithGCPeriod(time.Second*3),
),
),
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fasthttp.MustNew(
fasthttp.WithDisableHeaderNamesNormalizing(),
fasthttp.Listen(fmt.Sprintf(":%d", port)),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func invokeEdgeServerWithP2P(_ string, port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle, _ *redis.Client) {
edge := kit.NewServer(
kit.WithCluster(
p2pcluster.New(
"testCluster",
p2pcluster.WithLogger(common.NewStdLogger()),
p2pcluster.WithBroadcastInterval(time.Second),
),
),
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fasthttp.MustNew(
fasthttp.WithDisableHeaderNamesNormalizing(),
fasthttp.Listen(fmt.Sprintf(":%d", port)),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func kitWithCluster(t *testing.T, opt fx.Option) func(c C) {
ctx := context.Background()

Expand Down
41 changes: 0 additions & 41 deletions testenv/fastws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package testenv

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -11,7 +10,6 @@ import (
"github.com/clubpay/ronykit/kit"
"github.com/clubpay/ronykit/kit/common"
"github.com/clubpay/ronykit/kit/utils"
"github.com/clubpay/ronykit/std/gateways/fastws"
"github.com/clubpay/ronykit/stub"
. "github.com/smartystreets/goconvey/convey"
"go.uber.org/fx"
Expand All @@ -34,45 +32,6 @@ func TestFastWS(t *testing.T) {
})
}

func invokeEdgeServerWithFastWS(port int, desc ...kit.ServiceBuilder) fx.Option {
return fx.Invoke(
func(lc fx.Lifecycle) {
edge := kit.NewServer(
kit.ReusePort(true),
kit.WithLogger(common.NewStdLogger()),
kit.WithErrorHandler(
func(ctx *kit.Context, err error) {
fmt.Println("EdgeError: ", err)
},
),
kit.WithGateway(
fastws.MustNew(
fastws.WithPredicateKey("cmd"),
fastws.Listen(fmt.Sprintf("tcp4://0.0.0.0:%d", port)),
fastws.WithLogger(common.NewStdLogger()),
),
),
kit.WithServiceBuilder(desc...),
)

lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
edge.Start(ctx)

return nil
},
OnStop: func(ctx context.Context) error {
edge.Shutdown(ctx)

return nil
},
},
)
},
)
}

func fastwsWithHugePayload(t *testing.T, opt fx.Option) func(c C) {
ctx := context.Background()

Expand Down
Loading

0 comments on commit 5de1f90

Please sign in to comment.