-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RedisLabs Enterprise Support #16
Comments
@rueian Is enterprise redis support on your roadmap? Love your client - currently used for prototyping. However, not able to use this client in our test/prod environments. |
Hi @lgothard, thank you for trying rueidis. I have done some research on the RedisLabs Enterprise, but unfortunately, the only way to let rueidis work with it is to make rueidis support the old RESP2, unless newer RedisLabs Enterprise releases support the RESP3. Supporting the old RESP2 in rueidis is indeed in my roadmap, but it will take some months. |
Appears RESP3/client-side caching setup with Redis Enterprise is in flight. Full support for RESP3 is currently slated for early next year. |
Thank you for the update. I am really surprised that RESP3 is not yet adopted even in Redis Enterprise since 2020. |
Hi @lgothard, v0.0.77 is the first version that can work with RESP2 and RedisLabs Enterprise Docker image. c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:12000"},
DisableCache: true,
})
if err != nil {
panic(err)
}
defer c.Close() Please note that client-side caching and pubsub are not yet supported in RESP2 mode. |
@rueian hello, do you think the rest of RESP2 (Pub/Sub and Sentinel) may be implemented in Just giving some context of what I am doing. As you remember I was migrating to While I was experimenting with Also, one thing I am concerned about when choosing the implementation (besides number of dependencies and performance) is a possibility to seamlessly run Redis code with KeyDB, DragonflyDB, and other Redis-compatible stores. Redigo works fine with those, but I still have to check |
Hi @FZambia, I am sure the rest of the RESP2 functions will be implemented at some point. maybe this month or next month. I just also need some time to figure out how to do it. Thank you for mentioning other Redis-compatible stores. I tested with KeyDB a few months ago and it worked fine at that time. DragonflyDB looks very promising but I haven't tried it. Anyway, I think I can add some integration tests with them. |
Hi @FZambia, Just as you already know, rueidis v0.0.80 was released with RESP2 PubSub and Sentinel supported. In terms of compatibility, they all support PubSub. KeyDB supports RESP3 and Client Side Caching, while DragonflyDB v0.9.1 and Kvrocks v2.0.6 only support RESP2. |
Yep, many many thanks! This is awesome, and we already have pr – Rueidis shows great results on Centrifuge benchmarks, especially in allocation efficiency. |
@rueian btw, have a small concern, a bit unrelated to this topic, but I was thinking about it during the migration to Currently But regarding TCP keepalives – I am still not sure those are enough in cases where some proxy between the app and Redis is used, keepalives only check that connection between the app and the proxy is alive, and then depending on proxy I suppose it's not 100% guaranteed that keepalives from proxy to Redis are sent. In this perspective having global To be honest, not sure whether it can be improved or background PING goroutine is enough. Also, just to mention - I am trying to avoid cancelling requests to Redis over the context as it adds a notable overhead when context should be created for each request, so I prefer having a single globally defined timeout for all requests from the application to Redis. |
Hi @FZambia,
I am now sure that TCP keepalive is not enough to detect broken links even without proxies in between. Because the kernel will start keepalive only if the TCP write buffer is empty. If there is outgoing data not being acked by the receiver, the keepalive will never kick-off.
I haven't checked how Redigo implements ReadTimeout, but this is quite surprising to me. I actually thought there were many cases that couldn't be applied with ReadTimeout. For example, blocking commands like In other words, ReadTimeout makes sense to me only when I expect there will be a response before being timed out. That is why |
In Redigo's case it is possible to As I said I am not sure sth should be changed in |
Hey @FZambia,
Thank you for letting me know how Redigo uses ReadTimeout. I haven't tried the example, but one small guess is that using
Though I believe the current background PING mechanism is enough, I also believe there is room for improvement. For instance, try using the built-in Another direction to improve I am thinking of is to reduce the PINGs. Given that the purpose of PINGs is to detect broken connections, I believe there is no need to send PINGs on a connection that keeps fulfilling other normal requests. |
Yep (and that was sth I struggled with when first integrated redigo). In general, with all those timeouts, my main goal is to have a possibility to set some global value which won't be exceeded for each operation if sth went wrong, it seems that with current background PING it can be achieved – so seems good. As more general feedback, I really enjoyed building commands with Rueidis - it drastically reduces a chance to make a stupid mistake while constructing a command. Instead of always opening Redis docs to see a command syntax it's now possible to just start typing - and quickly come to the result. |
Hi @FZambia, Just let you know that v0.0.82 contains a fix to a case where background PING getting stuck and not close the connection as expected. v0.0.82 (#114) fixes this by putting |
Any updates on this peeps! |
Hi @nicolastakashi, I don't have a azure account, but I have tried Redis Enterprise docker image. The following setup should work with it: c, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: []string{"127.0.0.1:12000"},
DisableCache: true,
})
if err != nil {
panic(err)
}
defer c.Close() |
@rueian hello! I just finished a draft of blog post about our migration from Redigo to Rueidis in Centrifuge/Centrifugo - maybe you will be interested to take a look and come up with some comments? Rueidis migration is still not merged but I hope it will at some point soon. Here is a blog post draft I am talking about: centrifugal/centrifugal.dev#18 |
Hi @FZambia, I have read the great post and thank you for sharing benchmark results of rueidis. It is my honor that my works can be listed in the Centrifugo blog. I am also glad to know that rueidis really helps others to push their application and redis to the limit. Just knowing that you have already done smart pipelining three years ago from your previous blog post about scaling websocket. I hope I could read that blog earlier so that rueidis might also born earlier. The pipelining technique is basically the same as what rueidis uses. I also love the video you take to demonstrate the command builder. Can I also put the video in the readme of rueidis as well once you published the post? One thing that caught my eye is the benchmark of |
Of course, and no need to wait for post publication.
Yep, this is a good question why it's slower – as I mostly do same things as in the Redigo's implementation. I spent some time trying to find obvious reason. The only thing I found is that in my case having:
in The diff is like this:diff --git a/pipe.go b/pipe.go
index ec7407b..899004c 100644
--- a/pipe.go
+++ b/pipe.go
@@ -495,60 +495,88 @@ func (p *pipe) handlePush(values []RedisMessage) (reply bool) {
case "message":
if len(values) >= 3 {
m := PubSubMessage{Channel: values[1].string, Message: values[2].string}
- p.nsubs.Publish(values[1].string, m)
- p.pshks.Load().(*pshks).hooks.OnMessage(m)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.nsubs.Publish(values[1].string, m)
+ }
+ hooks.OnMessage(m)
}
case "pmessage":
if len(values) >= 4 {
m := PubSubMessage{Pattern: values[1].string, Channel: values[2].string, Message: values[3].string}
- p.psubs.Publish(values[1].string, m)
- p.pshks.Load().(*pshks).hooks.OnMessage(m)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.psubs.Publish(values[1].string, m)
+ }
+ hooks.OnMessage(m)
}
case "smessage":
if len(values) >= 3 {
m := PubSubMessage{Channel: values[1].string, Message: values[2].string}
- p.ssubs.Publish(values[1].string, m)
- p.pshks.Load().(*pshks).hooks.OnMessage(m)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.ssubs.Publish(values[1].string, m)
+ }
+ hooks.OnMessage(m)
}
case "unsubscribe":
- p.nsubs.Unsubscribe(values[1].string)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.nsubs.Unsubscribe(values[1].string)
+ }
if len(values) >= 3 {
- p.pshks.Load().(*pshks).hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
+ hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
}
return true
case "punsubscribe":
- p.psubs.Unsubscribe(values[1].string)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.psubs.Unsubscribe(values[1].string)
+ }
if len(values) >= 3 {
- p.pshks.Load().(*pshks).hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
+ hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
}
return true
case "sunsubscribe":
- p.ssubs.Unsubscribe(values[1].string)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.ssubs.Unsubscribe(values[1].string)
+ }
if len(values) >= 3 {
- p.pshks.Load().(*pshks).hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
+ hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
}
return true
case "subscribe":
- p.nsubs.Confirm(values[1].string)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.nsubs.Confirm(values[1].string)
+ }
if len(values) >= 3 {
- p.pshks.Load().(*pshks).hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
+ hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
}
return true
case "psubscribe":
- p.psubs.Confirm(values[1].string)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.psubs.Confirm(values[1].string)
+ }
if len(values) >= 3 {
- p.pshks.Load().(*pshks).hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
+ hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
}
return true
case "ssubscribe":
- p.ssubs.Confirm(values[1].string)
+ hooks := p.pshks.Load().(*pshks).hooks
+ if !hooks.DisableInternalSubscriptionRegistry {
+ p.ssubs.Confirm(values[1].string)
+ }
if len(values) >= 3 {
- p.pshks.Load().(*pshks).hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
+ hooks.OnSubscription(PubSubSubscription{Kind: values[0].string, Channel: values[1].string, Count: values[2].integer})
}
return true
}
return false
}
+
func (p *pipe) _r2pipe() (r2p *pipe) {
p.r2mu.Lock()
if p.r2pipe != nil {
@@ -843,15 +871,20 @@ abort:
}
func (p *pipe) syncDo(dl time.Time, dlOk bool, cmd cmds.Completed) (resp RedisResult) {
+ var msg RedisMessage
if dlOk {
- p.conn.SetDeadline(dl)
- defer p.conn.SetDeadline(time.Time{})
+ err := p.conn.SetDeadline(dl)
+ if err != nil {
+ return newResult(msg, err)
+ }
+ defer func() { _ = p.conn.SetDeadline(time.Time{}) }()
} else if p.timeout > 0 && !cmd.IsBlock() {
- p.conn.SetDeadline(time.Now().Add(p.timeout))
- defer p.conn.SetDeadline(time.Time{})
+ err := p.conn.SetDeadline(time.Now().Add(p.timeout))
+ if err != nil {
+ return newResult(msg, err)
+ }
+ defer func() { _ = p.conn.SetDeadline(time.Time{}) }()
}
-
- var msg RedisMessage
err := writeCmd(p.w, cmd.Commands())
if err == nil {
if err = p.w.Flush(); err == nil {
@@ -870,22 +903,27 @@ func (p *pipe) syncDo(dl time.Time, dlOk bool, cmd cmds.Completed) (resp RedisRe
}
func (p *pipe) syncDoMulti(dl time.Time, dlOk bool, resp []RedisResult, multi []cmds.Completed) []RedisResult {
+ var err error
+ var msg RedisMessage
if dlOk {
- p.conn.SetDeadline(dl)
- defer p.conn.SetDeadline(time.Time{})
+ err = p.conn.SetDeadline(dl)
+ if err != nil {
+ goto abort
+ }
+ defer func() { _ = p.conn.SetDeadline(time.Time{}) }()
} else if p.timeout > 0 {
for _, cmd := range multi {
if cmd.IsBlock() {
goto process
}
}
- p.conn.SetDeadline(time.Now().Add(p.timeout))
- defer p.conn.SetDeadline(time.Time{})
+ err = p.conn.SetDeadline(time.Now().Add(p.timeout))
+ if err != nil {
+ goto abort
+ }
+ defer func() { _ = p.conn.SetDeadline(time.Time{}) }()
}
process:
- var err error
- var msg RedisMessage
-
for _, cmd := range multi {
_ = writeCmd(p.w, cmd.Commands())
}
@@ -1160,7 +1198,7 @@ func (p *pipe) Close() {
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.blcksig, -1)
if p.conn != nil {
- p.conn.Close()
+ _ = p.conn.Close()
}
p.r2mu.Lock()
if p.r2pipe != nil {
diff --git a/pubsub.go b/pubsub.go
index 5784ca0..cefeb51 100644
--- a/pubsub.go
+++ b/pubsub.go
@@ -28,6 +28,9 @@ type PubSubHooks struct {
OnMessage func(m PubSubMessage)
// OnSubscription will be called when receiving "subscribe", "unsubscribe", "psubscribe" and "punsubscribe" event.
OnSubscription func(s PubSubSubscription)
+ // DisableInternalSubscriptionRegistry disables keeping subscription registry for connection. In this case
+ // subscription management is left fully up to caller.
+ DisableInternalSubscriptionRegistry bool
}
func (h *PubSubHooks) isZero() bool {
With registry disabled I got bench results like this (old is with registry, new is with disabled registry):
I thought about whether it's possible to always disable registry when someone uses Re-running with changes above and with 30 repetitions gives the following comparison between Redigo case and Rueidis case:
|
Wow, this result is amazing. How does it even reduce the alloc/op? But, unfortunately, However, I think it is still possible to make |
Hi @FZambia, I just removed the need of Here is the benchmark code and the result on my macbook: func BenchmarkSubscribe(b *testing.B) {
c, _ := NewClient(ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
defer c.Close()
b.SetParallelism(128)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := c.Do(context.Background(), c.B().Subscribe().Channel(strconv.Itoa(rand.Int())).Build()).Error(); err != nil {
panic(err)
}
}
})
}
It indeed allocs less now because there is no need to track subscribed channels anymore. |
Thanks! For my bench I also got a speed up, comparable to the case where I just blindly removed
Of course as you remember my bench involves some code outside |
Hi all, I am going to close this issue since rueidis already supports RESP2 since v0.0.80 and should work with Redis Enterprise. Please feel free to open new issues If you want to. |
When using the RedisLabs Enterprise Docker image, I'm not able to make a connection due to the HELLO command not being supported in the enterprise edition ...
$ /opt/redislabs/bin/redis-cli -p 12000 127.0.0.1:12000> HELLO (error) ERR unknown command 'HELLO'
I don't see this called out on the compatibility list but confirmed with RedisLabs.
The text was updated successfully, but these errors were encountered: