diff --git a/.github/wordlist.txt b/.github/wordlist.txt index dceddff46..c200c60b4 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -1,4 +1,5 @@ ACLs +APIs autoload autoloader autoloading @@ -46,9 +47,11 @@ runtime SHA sharding SETNAME +SpellCheck SSL struct stunnel +SynDump TCP TLS uri diff --git a/README.md b/README.md index c7951a4d4..37714a979 100644 --- a/README.md +++ b/README.md @@ -183,6 +183,9 @@ rdb := redis.NewClient(&redis.Options{ }) ``` +#### Unstable RESP3 Structures for RediSearch Commands +When integrating Redis with application functionalities using RESP3, it's important to note that some response structures aren't final yet. This is especially true for more complex structures like search and query results. We recommend using RESP2 when using the search and query capabilities, but we plan to stabilize the RESP3-based API-s in the coming versions. You can find more guidance in the upcoming release notes. + ## Contributing Please see [out contributing guidelines](CONTRIBUTING.md) to help us improve this library! diff --git a/command.go b/command.go index 9ae97a95a..4ced2979d 100644 --- a/command.go +++ b/command.go @@ -40,7 +40,7 @@ type Cmder interface { readTimeout() *time.Duration readReply(rd *proto.Reader) error - + readRawReply(rd *proto.Reader) error SetErr(error) Err() error } @@ -122,11 +122,11 @@ func cmdString(cmd Cmder, val interface{}) string { //------------------------------------------------------------------------------ type baseCmd struct { - ctx context.Context - args []interface{} - err error - keyPos int8 - + ctx context.Context + args []interface{} + err error + keyPos int8 + rawVal interface{} _readTimeout *time.Duration } @@ -197,6 +197,11 @@ func (cmd *baseCmd) setReadTimeout(d time.Duration) { cmd._readTimeout = &d } +func (cmd *baseCmd) readRawReply(rd *proto.Reader) (err error) { + cmd.rawVal, err = rd.ReadReply() + return err +} + //------------------------------------------------------------------------------ type Cmd struct { diff --git a/options.go b/options.go index 6ed693a0b..8ba74ccd1 100644 --- a/options.go +++ b/options.go @@ -153,6 +153,9 @@ type Options struct { // Add suffix to client name. Default is empty. IdentitySuffix string + + // Enable Unstable mode for Redis Search module with RESP3. + UnstableResp3 bool } func (opt *Options) init() { diff --git a/redis.go b/redis.go index 527afb677..c8b500809 100644 --- a/redis.go +++ b/redis.go @@ -412,6 +412,19 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error { return lastErr } +func (c *baseClient) assertUnstableCommand(cmd Cmder) bool { + switch cmd.(type) { + case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd: + if c.opt.UnstableResp3 { + return true + } else { + panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.") + } + default: + return false + } +} + func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) { if attempt > 0 { if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { @@ -427,8 +440,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool atomic.StoreUint32(&retryTimeout, 1) return err } - - if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil { + readReplyFunc := cmd.readReply + // Apply unstable RESP3 search module. + if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) { + readReplyFunc = cmd.readRawReply + } + if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil { if cmd.readTimeout() == nil { atomic.StoreUint32(&retryTimeout, 1) } else { diff --git a/ring.go b/ring.go index 4ae00542b..b40221734 100644 --- a/ring.go +++ b/ring.go @@ -100,6 +100,7 @@ type RingOptions struct { DisableIndentity bool IdentitySuffix string + UnstableResp3 bool } func (opt *RingOptions) init() { @@ -168,6 +169,7 @@ func (opt *RingOptions) clientOptions() *Options { DisableIndentity: opt.DisableIndentity, IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, } } diff --git a/search_commands.go b/search_commands.go index f5118c77e..1a8a4cfef 100644 --- a/search_commands.go +++ b/search_commands.go @@ -638,6 +638,14 @@ func (cmd *AggregateCmd) Result() (*FTAggregateResult, error) { return cmd.val, cmd.err } +func (cmd *AggregateCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *AggregateCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *AggregateCmd) String() string { return cmdString(cmd, cmd.val) } @@ -1337,6 +1345,13 @@ func (cmd *FTInfoCmd) Val() FTInfoResult { return cmd.val } +func (cmd *FTInfoCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTInfoCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} func (cmd *FTInfoCmd) readReply(rd *proto.Reader) (err error) { n, err := rd.ReadMapLen() if err != nil { @@ -1447,6 +1462,14 @@ func (cmd *FTSpellCheckCmd) Val() []SpellCheckResult { return cmd.val } +func (cmd *FTSpellCheckCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSpellCheckCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSpellCheckCmd) readReply(rd *proto.Reader) (err error) { data, err := rd.ReadSlice() if err != nil { @@ -1628,6 +1651,14 @@ func (cmd *FTSearchCmd) Val() FTSearchResult { return cmd.val } +func (cmd *FTSearchCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSearchCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) { data, err := rd.ReadSlice() if err != nil { @@ -1904,6 +1935,14 @@ func (cmd *FTSynDumpCmd) Result() ([]FTSynDumpResult, error) { return cmd.val, cmd.err } +func (cmd *FTSynDumpCmd) RawVal() interface{} { + return cmd.rawVal +} + +func (cmd *FTSynDumpCmd) RawResult() (interface{}, error) { + return cmd.rawVal, cmd.err +} + func (cmd *FTSynDumpCmd) readReply(rd *proto.Reader) error { termSynonymPairs, err := rd.ReadSlice() if err != nil { diff --git a/search_test.go b/search_test.go index 0e1a473b8..93859a4e7 100644 --- a/search_test.go +++ b/search_test.go @@ -18,11 +18,13 @@ func WaitForIndexing(c *redis.Client, index string) { return } time.Sleep(100 * time.Millisecond) + } else { + return } } } -var _ = Describe("RediSearch commands", Label("search"), func() { +var _ = Describe("RediSearch commands Resp 2", Label("search"), func() { ctx := context.TODO() var client *redis.Client @@ -1415,3 +1417,187 @@ func _assert_geosearch_result(result *redis.FTSearchResult, expectedDocIDs []str // Expect(results0["id"]).To(BeEquivalentTo("a")) // Expect(results0["extra_attributes"].(map[interface{}]interface{})["__v_score"]).To(BeEquivalentTo("0")) // }) + +var _ = Describe("RediSearch commands Resp 3", Label("search"), func() { + ctx := context.TODO() + var client *redis.Client + var client2 *redis.Client + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3, UnstableResp3: true}) + client2 = redis.NewClient(&redis.Options{Addr: ":6379", Protocol: 3}) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should handle FTAggregate with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftaggregate"), func() { + text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true} + num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true} + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, num1).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + client.HSet(ctx, "doc1", "PrimaryKey", "9::362330", "CreatedDateTimeUTC", "637387878524969984") + client.HSet(ctx, "doc2", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "637387875859270016") + + options := &redis.FTAggregateOptions{Apply: []redis.FTAggregateApply{{Field: "@CreatedDateTimeUTC * 10", As: "CreatedDateTimeUTC"}}} + res, err := client.FTAggregateWithArgs(ctx, "idx1", "*", options).RawResult() + rawVal := client.FTAggregateWithArgs(ctx, "idx1", "*", options).RawVal() + + Expect(err).NotTo(HaveOccurred()) + Expect(rawVal).To(BeEquivalentTo(res)) + results := res.(map[interface{}]interface{})["results"].([]interface{}) + Expect(results[0].(map[interface{}]interface{})["extra_attributes"].(map[interface{}]interface{})["CreatedDateTimeUTC"]). + To(Or(BeEquivalentTo("6373878785249699840"), BeEquivalentTo("6373878758592700416"))) + Expect(results[1].(map[interface{}]interface{})["extra_attributes"].(map[interface{}]interface{})["CreatedDateTimeUTC"]). + To(Or(BeEquivalentTo("6373878785249699840"), BeEquivalentTo("6373878758592700416"))) + + // Test with UnstableResp3 false + Expect(func() { + options = &redis.FTAggregateOptions{Apply: []redis.FTAggregateApply{{Field: "@CreatedDateTimeUTC * 10", As: "CreatedDateTimeUTC"}}} + rawRes, _ := client2.FTAggregateWithArgs(ctx, "idx1", "*", options).RawResult() + rawVal = client2.FTAggregateWithArgs(ctx, "idx1", "*", options).RawVal() + Expect(rawRes).To(BeNil()) + Expect(rawVal).To(BeNil()) + }).Should(Panic()) + + }) + + It("should handle FTInfo with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftinfo"), func() { + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, &redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText, Sortable: true, NoStem: true}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + resInfo, err := client.FTInfo(ctx, "idx1").RawResult() + Expect(err).NotTo(HaveOccurred()) + attributes := resInfo.(map[interface{}]interface{})["attributes"].([]interface{}) + flags := attributes[0].(map[interface{}]interface{})["flags"].([]interface{}) + Expect(flags).To(ConsistOf("SORTABLE", "NOSTEM")) + + valInfo := client.FTInfo(ctx, "idx1").RawVal() + attributes = valInfo.(map[interface{}]interface{})["attributes"].([]interface{}) + flags = attributes[0].(map[interface{}]interface{})["flags"].([]interface{}) + Expect(flags).To(ConsistOf("SORTABLE", "NOSTEM")) + + // Test with UnstableResp3 false + Expect(func() { + rawResInfo, _ := client2.FTInfo(ctx, "idx1").RawResult() + rawValInfo := client2.FTInfo(ctx, "idx1").RawVal() + Expect(rawResInfo).To(BeNil()) + Expect(rawValInfo).To(BeNil()) + }).Should(Panic()) + }) + + It("should handle FTSpellCheck with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftspellcheck"), func() { + text1 := &redis.FieldSchema{FieldName: "f1", FieldType: redis.SearchFieldTypeText} + text2 := &redis.FieldSchema{FieldName: "f2", FieldType: redis.SearchFieldTypeText} + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, text2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + client.HSet(ctx, "doc1", "f1", "some valid content", "f2", "this is sample text") + client.HSet(ctx, "doc2", "f1", "very important", "f2", "lorem ipsum") + + resSpellCheck, err := client.FTSpellCheck(ctx, "idx1", "impornant").RawResult() + valSpellCheck := client.FTSpellCheck(ctx, "idx1", "impornant").RawVal() + Expect(err).NotTo(HaveOccurred()) + Expect(valSpellCheck).To(BeEquivalentTo(resSpellCheck)) + results := resSpellCheck.(map[interface{}]interface{})["results"].(map[interface{}]interface{}) + Expect(results["impornant"].([]interface{})[0].(map[interface{}]interface{})["important"]).To(BeEquivalentTo(0.5)) + + // Test with UnstableResp3 false + Expect(func() { + rawResSpellCheck, _ := client2.FTSpellCheck(ctx, "idx1", "impornant").RawResult() + rawValSpellCheck := client2.FTSpellCheck(ctx, "idx1", "impornant").RawVal() + Expect(rawResSpellCheck).To(BeNil()) + Expect(rawValSpellCheck).To(BeNil()) + }).Should(Panic()) + }) + + It("should handle FTSearch with Unstable RESP3 Search Module and without stability", Label("search", "ftcreate", "ftsearch"), func() { + val, err := client.FTCreate(ctx, "txt", &redis.FTCreateOptions{StopWords: []interface{}{"foo", "bar", "baz"}}, &redis.FieldSchema{FieldName: "txt", FieldType: redis.SearchFieldTypeText}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "txt") + client.HSet(ctx, "doc1", "txt", "foo baz") + client.HSet(ctx, "doc2", "txt", "hello world") + res1, err := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{NoContent: true}).RawResult() + val1 := client.FTSearchWithArgs(ctx, "txt", "foo bar", &redis.FTSearchOptions{NoContent: true}).RawVal() + Expect(err).NotTo(HaveOccurred()) + Expect(val1).To(BeEquivalentTo(res1)) + totalResults := res1.(map[interface{}]interface{})["total_results"] + Expect(totalResults).To(BeEquivalentTo(int64(0))) + res2, err := client.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawResult() + Expect(err).NotTo(HaveOccurred()) + totalResults2 := res2.(map[interface{}]interface{})["total_results"] + Expect(totalResults2).To(BeEquivalentTo(int64(1))) + + // Test with UnstableResp3 false + Expect(func() { + rawRes2, _ := client2.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawResult() + rawVal2 := client2.FTSearchWithArgs(ctx, "txt", "foo bar hello world", &redis.FTSearchOptions{NoContent: true}).RawVal() + Expect(rawRes2).To(BeNil()) + Expect(rawVal2).To(BeNil()) + }).Should(Panic()) + }) + It("should handle FTSynDump with Unstable RESP3 Search Module and without stability", Label("search", "ftsyndump"), func() { + text1 := &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText} + text2 := &redis.FieldSchema{FieldName: "body", FieldType: redis.SearchFieldTypeText} + val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{OnHash: true}, text1, text2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "idx1") + + resSynUpdate, err := client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"boy", "child", "offspring"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resSynUpdate).To(BeEquivalentTo("OK")) + + resSynUpdate, err = client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"baby", "child"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resSynUpdate).To(BeEquivalentTo("OK")) + + resSynUpdate, err = client.FTSynUpdate(ctx, "idx1", "id1", []interface{}{"tree", "wood"}).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resSynUpdate).To(BeEquivalentTo("OK")) + + resSynDump, err := client.FTSynDump(ctx, "idx1").RawResult() + valSynDump := client.FTSynDump(ctx, "idx1").RawVal() + Expect(err).NotTo(HaveOccurred()) + Expect(valSynDump).To(BeEquivalentTo(resSynDump)) + Expect(resSynDump.(map[interface{}]interface{})["baby"]).To(BeEquivalentTo([]interface{}{"id1"})) + + // Test with UnstableResp3 false + Expect(func() { + rawResSynDump, _ := client2.FTSynDump(ctx, "idx1").RawResult() + rawValSynDump := client2.FTSynDump(ctx, "idx1").RawVal() + Expect(rawResSynDump).To(BeNil()) + Expect(rawValSynDump).To(BeNil()) + }).Should(Panic()) + }) + + It("should test not affected Resp 3 Search method - FTExplain", Label("search", "ftexplain"), func() { + text1 := &redis.FieldSchema{FieldName: "f1", FieldType: redis.SearchFieldTypeText} + text2 := &redis.FieldSchema{FieldName: "f2", FieldType: redis.SearchFieldTypeText} + text3 := &redis.FieldSchema{FieldName: "f3", FieldType: redis.SearchFieldTypeText} + val, err := client.FTCreate(ctx, "txt", &redis.FTCreateOptions{}, text1, text2, text3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(BeEquivalentTo("OK")) + WaitForIndexing(client, "txt") + res1, err := client.FTExplain(ctx, "txt", "@f3:f3_val @f2:f2_val @f1:f1_val").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res1).ToNot(BeEmpty()) + + // Test with UnstableResp3 false + Expect(func() { + res2, err := client2.FTExplain(ctx, "txt", "@f3:f3_val @f2:f2_val @f1:f1_val").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res2).ToNot(BeEmpty()) + }).ShouldNot(Panic()) + }) +}) diff --git a/sentinel.go b/sentinel.go index 188f88494..315695544 100644 --- a/sentinel.go +++ b/sentinel.go @@ -82,6 +82,7 @@ type FailoverOptions struct { DisableIndentity bool IdentitySuffix string + UnstableResp3 bool } func (opt *FailoverOptions) clientOptions() *Options { @@ -119,6 +120,7 @@ func (opt *FailoverOptions) clientOptions() *Options { DisableIndentity: opt.DisableIndentity, IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, } } @@ -156,6 +158,7 @@ func (opt *FailoverOptions) sentinelOptions(addr string) *Options { DisableIndentity: opt.DisableIndentity, IdentitySuffix: opt.IdentitySuffix, + UnstableResp3: opt.UnstableResp3, } } diff --git a/universal.go b/universal.go index 275bef3d6..f4d2d7598 100644 --- a/universal.go +++ b/universal.go @@ -68,6 +68,7 @@ type UniversalOptions struct { DisableIndentity bool IdentitySuffix string + UnstableResp3 bool } // Cluster returns cluster options created from the universal options. @@ -160,6 +161,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions { DisableIndentity: o.DisableIndentity, IdentitySuffix: o.IdentitySuffix, + UnstableResp3: o.UnstableResp3, } } @@ -203,6 +205,7 @@ func (o *UniversalOptions) Simple() *Options { DisableIndentity: o.DisableIndentity, IdentitySuffix: o.IdentitySuffix, + UnstableResp3: o.UnstableResp3, } }