Skip to content

Commit

Permalink
[redisCluster] fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannm committed Jul 15, 2024
1 parent 48ef74d commit d1db69e
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 81 deletions.
67 changes: 31 additions & 36 deletions std/clusters/rediscluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ import (
const KeepTTL = -1

type cluster struct {
rc *redis.Client
ps *redis.PubSub
msgChan <-chan *redis.Message
id string
d kit.ClusterDelegate
shutdownChan chan struct{}
prefix string
idleTime time.Duration
gcPeriod time.Duration
rc *redis.Client
ps *redis.PubSub
msgChan <-chan *redis.Message
id string
d kit.ClusterDelegate
shutdownFn context.CancelFunc
prefix string
idleTime time.Duration
gcPeriod time.Duration
}

var (
Expand All @@ -32,17 +32,14 @@ var (

func New(name string, opts ...Option) (kit.Cluster, error) {
c := &cluster{
prefix: name,
idleTime: time.Minute * 10,
gcPeriod: time.Minute,
shutdownChan: make(chan struct{}, 1),
prefix: name,
idleTime: time.Minute * 10,
gcPeriod: time.Minute,
}
for _, o := range opts {
o(c)
}

go c.gc()

return c, nil
}

Expand All @@ -55,44 +52,42 @@ func MustNew(name string, opts ...Option) kit.Cluster {
return c
}

func (c *cluster) gc() {
func (c *cluster) gc(ctx context.Context) {
key := fmt.Sprintf("%s:gc", c.prefix)
instancesKey := fmt.Sprintf("%s:instances", c.prefix)
ctx := context.Background()
idleSec := int64(c.idleTime / time.Second)
for {
if c.id != "" {
c.rc.HSet(ctx, instancesKey, c.id, utils.TimeUnix())
}

time.Sleep(c.gcPeriod)

ok, _ := c.rc.SetNX(context.Background(), key, "running", c.idleTime).Result() //nolint:errcheck
if ok {
members, _ := c.rc.HGetAll(ctx, instancesKey).Result() //nolint:errcheck
now := utils.TimeUnix()
for k, v := range members {
if now-utils.StrToInt64(v) > idleSec {
c.rc.HDel(ctx, instancesKey, k)
}
}
}
}
_ = luaGC.Run(
ctx, c.rc,
[]string{
key,
instancesKey,
},
c.id,
idleSec,
utils.TimeUnix(),
).Err()
}

func (c *cluster) Start(ctx context.Context) error {
runCtx, cf := context.WithCancel(context.Background())
c.shutdownFn = cf

c.ps = c.rc.Subscribe(ctx, fmt.Sprintf("%s:chan:%s", c.prefix, c.id))
c.rc.HSet(ctx, fmt.Sprintf("%s:instances", c.prefix), c.id, utils.TimeUnix())
c.msgChan = c.ps.Channel()
gcTimer := time.NewTimer(c.gcPeriod)
go func() {
for {
select {
case <-gcTimer.C:
c.gc(runCtx)
case msg, ok := <-c.msgChan:
if !ok {
return
}
go c.d.OnMessage(utils.S2B(msg.Payload))
case <-c.shutdownChan:
case <-runCtx.Done():
_ = c.ps.Close()

return
Expand All @@ -104,7 +99,7 @@ func (c *cluster) Start(ctx context.Context) error {
}

func (c *cluster) Shutdown(ctx context.Context) error {
c.shutdownChan <- struct{}{}
c.shutdownFn()

return c.rc.HDel(ctx, fmt.Sprintf("%s:instances", c.prefix), c.id).Err()
}
Expand Down
2 changes: 1 addition & 1 deletion std/clusters/rediscluster/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

require (
github.com/clubpay/ronykit/kit v0.16.6
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.4
)

require (
Expand Down
4 changes: 2 additions & 2 deletions std/clusters/rediscluster/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.5.4 h1:vOFYDKKVgrI5u++QvnMT7DksSMYg7Aw/Np4vLJLKLwY=
github.com/redis/go-redis/v9 v9.5.4/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
Expand Down
13 changes: 13 additions & 0 deletions std/clusters/rediscluster/lua.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rediscluster

import (
_ "embed"

"github.com/redis/go-redis/v9"
)

var (
//go:embed lua/gc.lua
luaScript string
luaGC = redis.NewScript(luaScript)
)
19 changes: 19 additions & 0 deletions std/clusters/rediscluster/lua/gc.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local key = KEYS[1]
local instancesKey = KEYS[2]
local id = ARGV[1]
local idleSec = tonumber(ARGV[2])
local timeNow = tonumber(ARGV[3])

if id ~= "" then
redis.call("HSET", instancesKey, id, timeNow)
end

local ok = redis.call("SETNX", key, "running")
if ok == 1 then
local members = redis.call("HGETALL", instancesKey)
for i=1,#members,2 do
if timeNow - tonumber(members[i+1]) > idleSec then
redis.call("HDEL", instancesKey, members[i])
end
end
end
12 changes: 11 additions & 1 deletion std/clusters/rediscluster/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package rediscluster

import "github.com/redis/go-redis/v9"
import (
"time"

"github.com/redis/go-redis/v9"
)

type Option func(c *cluster)

Expand All @@ -26,3 +30,9 @@ func WithRedisURL(url string) Option {
c.rc = redis.NewClient(opt)
}
}

func WithGCPeriod(d time.Duration) Option {
return func(c *cluster) {
c.gcPeriod = d
}
}
3 changes: 2 additions & 1 deletion std/gateways/fastws/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ func (wsc *wsConn) executeMessages(c gnet.Conn, d kit.GatewayDelegate) error {
}

if wsc.currHead.Fin {
go wsc.execMessage(d, wsc.msgBuff)
msgBuff := wsc.msgBuff
wsc.msgBuff = buf.GetCap(wsc.msgBuff.Cap())
go wsc.execMessage(d, msgBuff)
}

// reset the current head
Expand Down
5 changes: 4 additions & 1 deletion stub/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stub
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -75,7 +76,7 @@ func HTTP(rawURL string, opts ...Option) (*RESTCtx, error) {

switch u.Scheme {
default:
return nil, fmt.Errorf("unsupported scheme: %s", u.Scheme)
return nil, errUnsupportedScheme
case "http":
case "https":
opts = append(opts, Secure())
Expand Down Expand Up @@ -170,3 +171,5 @@ func (s *Stub) Websocket(opts ...WebsocketOption) *WebsocketCtx {

return ctx
}

var errUnsupportedScheme = errors.New("unsupported scheme")
9 changes: 0 additions & 9 deletions testenv/bench_fasthttp_test.go

This file was deleted.

10 changes: 5 additions & 5 deletions testenv/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestKitWithCluster(t *testing.T) {
Convey("Kit with Cluster", t, func(c C) {
testCases := map[string]fx.Option{
"KeyValue Store - With Redis": fx.Options(
fx.Invoke(invokeRedisMonitor),
// fx.Invoke(invokeRedisMonitor),
invokeEdgeServerWithRedis("edge1", 8082, services.SimpleKeyValueService),
invokeEdgeServerWithRedis("edge2", 8083, services.SimpleKeyValueService),
),
Expand All @@ -49,6 +49,7 @@ func invokeEdgeServerWithRedis(_ string, port int, desc ...kit.ServiceBuilder) f
rediscluster.MustNew(
"testCluster",
rediscluster.WithRedisClient(utils.Must(getRedis())),
rediscluster.WithGCPeriod(time.Second*3),
),
),
kit.WithLogger(common.NewStdLogger()),
Expand Down Expand Up @@ -142,9 +143,9 @@ func kitWithCluster(t *testing.T, opt fx.Option) func(c C) {
time.Sleep(time.Second * 15)

// Set Key to instance 1
restCtx := stub.New("localhost:8082").REST()

resp := &services.KeyValue{}
err := restCtx.
err := stub.New("localhost:8082").REST().
SetMethod("POST").
DefaultResponseHandler(
func(ctx context.Context, r stub.RESTResponse) *stub.Error {
Expand All @@ -160,8 +161,7 @@ func kitWithCluster(t *testing.T, opt fx.Option) func(c C) {
c.So(resp.Value, ShouldEqual, "testValue")

// Get Key from instance 2
restCtx = stub.New("localhost:8083").REST()
err = restCtx.
err = stub.New("localhost:8083").REST().
SetMethod("GET").
SetHeader("Conn-Hdr-In", "MyValue").
SetHeader("Envelope-Hdr-In", "EnvelopeValue").
Expand Down
2 changes: 1 addition & 1 deletion testenv/fastws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func fastwsWithHugePayload(t *testing.T, opt fx.Option) func(c C) {

wsCtx := stub.New(
"localhost:8082",
// stub.WithLogger(common.NewStdLogger()),
stub.WithLogger(common.NewStdLogger()),
).
Websocket(
stub.WithPredicateKey("cmd"),
Expand Down
21 changes: 13 additions & 8 deletions testenv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ module ronykit/testenv
go 1.22

require (
github.com/clubpay/ronykit/kit v0.14.3
github.com/clubpay/ronykit/std/clusters/p2pcluster v0.14.3
github.com/clubpay/ronykit/std/clusters/rediscluster v0.14.3
github.com/clubpay/ronykit/std/gateways/fasthttp v0.14.3
github.com/clubpay/ronykit/kit v0.16.6
github.com/clubpay/ronykit/std/clusters/p2pcluster v0.16.6
github.com/clubpay/ronykit/std/clusters/rediscluster v0.16.6
github.com/clubpay/ronykit/std/gateways/fasthttp v0.16.6
github.com/clubpay/ronykit/std/gateways/fastws v0.16.6
github.com/clubpay/ronykit/stub v0.16.6
github.com/orlangure/gnomock v0.30.0
github.com/redis/go-redis/v9 v9.5.1
github.com/redis/go-redis/v9 v9.5.4
github.com/smartystreets/goconvey v1.8.1
go.uber.org/fx v1.21.1
go.uber.org/fx v1.22.1
)

require (
Expand All @@ -35,10 +37,11 @@ require (
github.com/fasthttp/websocket v1.5.9 // indirect
github.com/flynn/noise v1.1.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-redis/redis/v7 v7.4.1 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.4.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
Expand Down Expand Up @@ -92,6 +95,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/panjf2000/gnet/v2 v2.5.2 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
Expand Down Expand Up @@ -140,6 +144,7 @@ require (
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
Loading

0 comments on commit d1db69e

Please sign in to comment.