Skip to content

Commit

Permalink
support raw parsing for problematic Redis Search types
Browse files Browse the repository at this point in the history
  • Loading branch information
ofekshenawa committed Aug 31, 2024
1 parent 1b8afbe commit 3408303
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 11 deletions.
22 changes: 16 additions & 6 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Cmder interface {

readTimeout() *time.Duration
readReply(rd *proto.Reader) error

readRawReply(rd *proto.Reader) error
SetErr(error)
Err() error
}
Expand Down Expand Up @@ -122,11 +122,11 @@ func cmdString(cmd Cmder, val interface{}) string {
//------------------------------------------------------------------------------

type baseCmd struct {
ctx context.Context
args []interface{}
err error
keyPos int8

ctx context.Context
args []interface{}
err error
keyPos int8
rawVal interface{}
_readTimeout *time.Duration
}

Expand Down Expand Up @@ -197,6 +197,11 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) {
cmd._readTimeout = &d
}

func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) {
cmd.rawVal, err = rd.ReadReply()
return err
}

//------------------------------------------------------------------------------

type Cmd struct {
Expand Down Expand Up @@ -5550,3 +5555,8 @@ func (cmd *MonitorCmd) Stop() {
defer cmd.mu.Unlock()
cmd.status = monitorStatusStop
}

type SearchCmd struct {
baseCmd
val interface{}
}
23 changes: 20 additions & 3 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net" // TODO change to import only specific method (Not necesarry in compiling)
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -412,6 +412,20 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
return lastErr
}

func (c *baseClient) isProblematicMethodsOfSearchResp3(cmd Cmder) bool {
if c.opt.Protocol != 3 {
return false
}

switch cmd.(type) {
case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd:
fmt.Println("Some RESP3 results for Redis Query Engine responses may change. Refer to the readme for guidance")
return true
default:
return false
}
}

func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
Expand All @@ -427,8 +441,11 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
atomic.StoreUint32(&retryTimeout, 1)
return err
}

if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
readReplyFunc := cmd.readReply
if c.isProblematicMethodsOfSearchResp3(cmd) {
readReplyFunc = cmd.readRawReply
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil {
if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1)
} else {
Expand Down
39 changes: 39 additions & 0 deletions search_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,14 @@ func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) {
return cmd.val, cmd.err
}

func (cmd *AggregateCmd) RawVal() interface{} {
return cmd.rawVal
}

func (cmd *AggregateCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}

func (cmd *AggregateCmd) String() string {
return cmdString(cmd, cmd.val)
}
Expand Down Expand Up @@ -1337,6 +1345,13 @@ func (cmd *FTInfoCmd) Val() FTInfoResult {
return cmd.val
}

func (cmd *FTInfoCmd) RawVal() interface{} {
return cmd.rawVal
}

func (cmd *FTInfoCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}
func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) {
n, err := rd.ReadMapLen()
if err != nil {
Expand Down Expand Up @@ -1447,6 +1462,14 @@ func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult {
return cmd.val
}

func (cmd *FTSpellCheckCmd) RawVal() interface{} {
return cmd.rawVal
}

func (cmd *FTSpellCheckCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}

func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice()
if err != nil {
Expand Down Expand Up @@ -1628,6 +1651,14 @@ func (cmd *FTSearchCmd) Val() FTSearchResult {
return cmd.val
}

func (cmd *FTSearchCmd) RawVal() interface{} {
return cmd.rawVal
}

func (cmd *FTSearchCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}

func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) {
data, err := rd.ReadSlice()
if err != nil {
Expand Down Expand Up @@ -1904,6 +1935,14 @@ func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) {
return cmd.val, cmd.err
}

func (cmd *FTSynDumpCmd) RawVal() interface{} {
return cmd.rawVal
}

func (cmd *FTSynDumpCmd) RawResult() (interface{}, error) {
return cmd.rawVal, cmd.err
}

func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error {
termSynonymPairs, err := rd.ReadSlice()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func WaitForIndexing(c *redis.Client, index string) {
}
}

var _ = Describe("RediSearch commands", Label("search"), func() {
var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
ctx := context.TODO()
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 2})
client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

Expand Down

0 comments on commit 3408303

Please sign in to comment.