From 35bf69730537e26ab0f6c0166cbd83f83078df7d Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 19 Dec 2024 18:38:37 -0800 Subject: [PATCH 1/5] `XADD` Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 22 ++++++++++++++++++++ go/api/stream_commands.go | 29 +++++++++++++++++++++++++++ go/integTest/shared_commands_test.go | 30 +++++++++++++++++++++++++++- 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 go/api/stream_commands.go diff --git a/go/api/base_client.go b/go/api/base_client.go index 0bb250fd07..8126e81d26 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -11,6 +11,7 @@ import "C" import ( "errors" + "fmt" "math" "strconv" "unsafe" @@ -26,6 +27,7 @@ type BaseClient interface { HashCommands ListCommands SetCommands + StreamCommands ConnectionManagementCommands GenericBaseCommands // Close terminates the client by closing all associated resources. @@ -1204,3 +1206,23 @@ func (client *baseClient) Renamenx(key string, newKey string) (Result[bool], err } return handleBooleanResponse(result) } + +func (client *baseClient) XAdd(key string, values [][]string) (Result[string], error) { + args := make([]string, 0, 2+2*len(values)) + args = append(args, key, "*") + for _, pair := range values { + if len(pair) != 2 { + return CreateNilStringResult(), fmt.Errorf( + "Array entry had the wrong length. Expected length 2 but got length %d", + len(pair), + ) + } + args = append(args, pair...) + } + + result, err := client.executeCommand(C.XAdd, args) + if err != nil { + return CreateNilStringResult(), err + } + return handleStringResponse(result) +} diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go new file mode 100644 index 0000000000..851bb88ba3 --- /dev/null +++ b/go/api/stream_commands.go @@ -0,0 +1,29 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package api + +// Supports commands and transactions for the "Stream Commands" group for standalone and cluster clients. +// +// See [valkey.io] for details. +// +// [valkey.io]: https://valkey.io/commands/#stream +type StreamCommands interface { + // Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // values - Field-value pairs to be added to the entry. + // + // Return value: + // The id of the added entry. + // + // For example: + // result, err := client.XAdd("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}) + // result.IsNil(): false + // result.Value(): "1526919030474-55" + // + // [valkey.io]: https://valkey.io/commands/xadd/ + XAdd(key string, values [][]string) (Result[string], error) +} diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 63f0e39ec7..9e6473e205 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -3743,7 +3743,7 @@ func (suite *GlideTestSuite) TestUnlink() { }) } -func (suite *GlideTestSuite) Test_Rename() { +func (suite *GlideTestSuite) TestRename() { suite.runWithDefaultClients(func(client api.BaseClient) { // Test 1 Check if the command successfully renamed key := "{keyName}" + uuid.NewString() @@ -3781,3 +3781,31 @@ func (suite *GlideTestSuite) TestRenamenx() { assert.Equal(suite.T(), false, res2.Value()) }) } + +func (suite *GlideTestSuite) TestXAdd() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + // stream does not exist + res, err := client.XAdd(key, [][]string{{"field1", "value1"}, {"field1", "value2"}}) + assert.Nil(suite.T(), err) + assert.False(suite.T(), res.IsNil()) + // don't check the value, because it contains server's timestamp + + // adding data to existing stream + res, err = client.XAdd(key, [][]string{{"field3", "value3"}}) + assert.Nil(suite.T(), err) + assert.False(suite.T(), res.IsNil()) + + // incorrect input + _, err = client.XAdd(key, [][]string{}) + assert.NotNil(suite.T(), err) + _, err = client.XAdd(key, [][]string{{"1", "2", "3"}}) + assert.NotNil(suite.T(), err) + + // key is not a string + key = uuid.NewString() + client.Set(key, "abc") + _, err = client.XAdd(key, [][]string{{"f", "v"}}) + assert.NotNil(suite.T(), err) + }) +} From cd84514bd176e27383499b606116f52cc7678f57 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Mon, 23 Dec 2024 17:37:15 -0800 Subject: [PATCH 2/5] code Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 31 ++-- go/api/command_options.go | 220 ++++++++++++++++++++++++-- go/api/options/zadd_options.go | 106 ------------- go/api/sorted_set_commands.go | 8 +- go/api/stream_commands.go | 23 ++- go/integTest/glide_test_suite_test.go | 4 +- go/integTest/shared_commands_test.go | 40 ++++- 7 files changed, 287 insertions(+), 145 deletions(-) delete mode 100644 go/api/options/zadd_options.go diff --git a/go/api/base_client.go b/go/api/base_client.go index 4513efd745..877c570e1e 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -16,7 +16,6 @@ import ( "strconv" "unsafe" - "github.com/valkey-io/valkey-glide/go/glide/api/options" "github.com/valkey-io/valkey-glide/go/glide/protobuf" "github.com/valkey-io/valkey-glide/go/glide/utils" "google.golang.org/protobuf/proto" @@ -1234,12 +1233,22 @@ func (client *baseClient) Renamenx(key string, newKey string) (Result[bool], err } func (client *baseClient) XAdd(key string, values [][]string) (Result[string], error) { - args := make([]string, 0, 2+2*len(values)) - args = append(args, key, "*") + return client.XAddWithOptions(key, values, NewXAddOptions()) +} + +func (client *baseClient) XAddWithOptions(key string, values [][]string, options *XAddOptions) (Result[string], error) { + args := []string{} + args = append(args, key) + optionArgs, err := options.toArgs() + if err != nil { + return CreateNilStringResult(), err + } else { + args = append(args, optionArgs...) + } for _, pair := range values { if len(pair) != 2 { return CreateNilStringResult(), fmt.Errorf( - "Array entry had the wrong length. Expected length 2 but got length %d", + "array entry had the wrong length. Expected length 2 but got length %d", len(pair), ) } @@ -1250,7 +1259,7 @@ func (client *baseClient) XAdd(key string, values [][]string) (Result[string], e if err != nil { return CreateNilStringResult(), err } - return handleStringResponse(result) + return handleStringOrNullResponse(result) } func (client *baseClient) ZAdd( @@ -1271,9 +1280,9 @@ func (client *baseClient) ZAdd( func (client *baseClient) ZAddWithOptions( key string, membersScoreMap map[string]float64, - opts *options.ZAddOptions, + opts *ZAddOptions, ) (Result[int64], error) { - optionArgs, err := opts.ToArgs() + optionArgs, err := opts.toArgs() if err != nil { return CreateNilInt64Result(), err } @@ -1289,8 +1298,8 @@ func (client *baseClient) ZAddWithOptions( return handleLongResponse(result) } -func (client *baseClient) zAddIncrBase(key string, opts *options.ZAddOptions) (Result[float64], error) { - optionArgs, err := opts.ToArgs() +func (client *baseClient) zAddIncrBase(key string, opts *ZAddOptions) (Result[float64], error) { + optionArgs, err := opts.toArgs() if err != nil { return CreateNilFloat64Result(), err } @@ -1308,7 +1317,7 @@ func (client *baseClient) ZAddIncr( member string, increment float64, ) (Result[float64], error) { - options, err := options.NewZAddOptionsBuilder().SetIncr(true, increment, member) + options, err := NewZAddOptionsBuilder().SetIncr(true, increment, member) if err != nil { return CreateNilFloat64Result(), err } @@ -1320,7 +1329,7 @@ func (client *baseClient) ZAddIncrWithOptions( key string, member string, increment float64, - opts *options.ZAddOptions, + opts *ZAddOptions, ) (Result[float64], error) { incrOpts, err := opts.SetIncr(true, increment, member) if err != nil { diff --git a/go/api/command_options.go b/go/api/command_options.go index bbfaf982a0..00fff04b98 100644 --- a/go/api/command_options.go +++ b/go/api/command_options.go @@ -3,6 +3,7 @@ package api import ( + "errors" "strconv" "github.com/valkey-io/valkey-glide/go/glide/utils" @@ -282,19 +283,7 @@ func (listDirection ListDirection) toString() (string, error) { // This base option struct represents the common set of optional arguments for the SCAN family of commands. // Concrete implementations of this class are tied to specific SCAN commands (`SCAN`, `SSCAN`). type BaseScanOptions struct { - /** - * The match filter is applied to the result of the command and will only include - * strings that match the pattern specified. If the sorted set is large enough for scan commands to return - * only a subset of the sorted set then there could be a case where the result is empty although there are - * items that match the pattern specified. This is due to the default `COUNT` being `10` which indicates - * that it will only fetch and match `10` items from the list. - */ match string - /** - * `COUNT` is a just a hint for the command for how many elements to fetch from the - * sorted set. `COUNT` could be ignored until the sorted set is large enough for the `SCAN` commands to - * represent the results as compact single-allocation packed encoding. - */ count int64 } @@ -302,11 +291,19 @@ func NewBaseScanOptionsBuilder() *BaseScanOptions { return &BaseScanOptions{} } +// The match filter is applied to the result of the command and will only include +// strings that match the pattern specified. If the sorted set is large enough for scan commands to return +// only a subset of the sorted set then there could be a case where the result is empty although there are +// items that match the pattern specified. This is due to the default `COUNT` being `10` which indicates +// that it will only fetch and match `10` items from the list. func (scanOptions *BaseScanOptions) SetMatch(m string) *BaseScanOptions { scanOptions.match = m return scanOptions } +// `COUNT` is a just a hint for the command for how many elements to fetch from the +// sorted set. `COUNT` could be ignored until the sorted set is large enough for the `SCAN` commands to +// represent the results as compact single-allocation packed encoding. func (scanOptions *BaseScanOptions) SetCount(c int64) *BaseScanOptions { scanOptions.count = c return scanOptions @@ -325,3 +322,202 @@ func (opts *BaseScanOptions) toArgs() ([]string, error) { return args, err } + +// Optional arguments to `ZAdd` in [SortedSetCommands] +type ZAddOptions struct { + conditionalChange ConditionalSet + updateOptions UpdateOptions + changed bool + incr bool + increment float64 + member string +} + +func NewZAddOptionsBuilder() *ZAddOptions { + return &ZAddOptions{} +} + +// `conditionalChange` defines conditions for updating or adding elements with `ZAdd` command. +func (options *ZAddOptions) SetConditionalChange(c ConditionalSet) *ZAddOptions { + options.conditionalChange = c + return options +} + +// `updateOptions` specifies conditions for updating scores with `ZAdd` command. +func (options *ZAddOptions) SetUpdateOptions(u UpdateOptions) *ZAddOptions { + options.updateOptions = u + return options +} + +// `Changed` changes the return value from the number of new elements added to the total number of elements changed. +func (options *ZAddOptions) SetChanged(ch bool) (*ZAddOptions, error) { + if options.incr { + return nil, errors.New("changed cannot be set when incr is true") + } + options.changed = ch + return options, nil +} + +// `INCR` sets the increment value to use when incr is true. +func (options *ZAddOptions) SetIncr(incr bool, increment float64, member string) (*ZAddOptions, error) { + if options.changed { + return nil, errors.New("incr cannot be set when changed is true") + } + options.incr = incr + options.increment = increment + options.member = member + return options, nil +} + +// `toArgs` converts the options to a list of arguments. +func (opts *ZAddOptions) toArgs() ([]string, error) { + args := []string{} + var err error + + if opts.conditionalChange == OnlyIfExists || opts.conditionalChange == OnlyIfDoesNotExist { + args = append(args, string(opts.conditionalChange)) + } + + if opts.updateOptions == ScoreGreaterThanCurrent || opts.updateOptions == ScoreLessThanCurrent { + args = append(args, string(opts.updateOptions)) + } + + if opts.changed { + args = append(args, ChangedKeyword) + } + + if opts.incr { + args = append(args, IncrKeyword, utils.FloatToString(opts.increment), opts.member) + } + + return args, err +} + +type UpdateOptions string + +const ( + // Only update existing elements if the new score is less than the current score. Equivalent to + // "LT" in the Valkey API. + ScoreLessThanCurrent UpdateOptions = "LT" + // Only update existing elements if the new score is greater than the current score. Equivalent + // to "GT" in the Valkey API. + ScoreGreaterThanCurrent UpdateOptions = "GT" +) + +const ( + ChangedKeyword string = "CH" // Valkey API keyword used to return total number of elements changed + IncrKeyword string = "INCR" // Valkey API keyword to make zadd act like ZINCRBY. +) + +type triStateBool int + +// Tri-state bool for use option builders. We cannot rely on the default value of an non-initialized variable. +const ( + triStateBoolUndefined triStateBool = iota + triStateBoolTrue + triStateBoolFalse +) + +// Optional arguments to `XAdd` in [StreamCommands] +type XAddOptions struct { + id string + makeStream triStateBool + trimOptions *XTrimOptions +} + +// Create new empty `XAddOptions` +func NewXAddOptions() *XAddOptions { + return &XAddOptions{} +} + +// New entry will be added with this `id“. +func (xao *XAddOptions) SetId(id string) *XAddOptions { + xao.id = id + return xao +} + +// If set, a new stream won't be created if no stream matches the given key. +func (xao *XAddOptions) SetDontMakeNewStream() *XAddOptions { + xao.makeStream = triStateBoolFalse + return xao +} + +// If set, add operation will also trim the older entries in the stream. +func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions { + xao.trimOptions = options + return xao +} + +func (xao *XAddOptions) toArgs() ([]string, error) { + args := []string{} + var err error + if xao.makeStream == triStateBoolFalse { + args = append(args, "NOMKSTREAM") + } + if xao.trimOptions != nil { + moreArgs, err := xao.trimOptions.toArgs() + if err != nil { + return args, err + } + args = append(args, moreArgs...) + } + if xao.id != "" { + args = append(args, xao.id) + } else { + args = append(args, "*") + } + return args, err +} + +// Optional arguments for `XTrim` and `XAdd` in [StreamCommands] +type XTrimOptions struct { + exact triStateBool + limit int64 + method string + threshold string +} + +// Option to trim the stream according to minimum ID. +func NewXTrimOptionsWithMinId(threshold string) *XTrimOptions { + return &XTrimOptions{threshold: threshold, method: "MINID"} +} + +// Option to trim the stream according to maximum stream length. +func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions { + return &XTrimOptions{threshold: utils.IntToString(threshold), method: "MAXLEN"} +} + +// Match exactly on the threshold. +func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions { + xto.exact = triStateBoolTrue + return xto +} + +// Trim in a near-exact manner, which is more efficient. +func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { + xto.exact = triStateBoolFalse + return xto +} + +// Max number of stream entries to be trimmed for non-exact match. +func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { + xto.exact = triStateBoolFalse + xto.limit = limit + return xto +} + +func (xto *XTrimOptions) toArgs() ([]string, error) { + args := []string{} + args = append(args, xto.method) + if xto.exact == triStateBoolTrue { + args = append(args, "=") + } else if xto.exact == triStateBoolFalse { + args = append(args, "~") + } + args = append(args, xto.threshold) + if xto.limit > 0 { + args = append(args, "LIMIT", utils.IntToString(xto.limit)) + } + var err error + return args, err +} diff --git a/go/api/options/zadd_options.go b/go/api/options/zadd_options.go deleted file mode 100644 index 7926b346cc..0000000000 --- a/go/api/options/zadd_options.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 - -package options - -import ( - "errors" - - "github.com/valkey-io/valkey-glide/go/glide/utils" -) - -// Optional arguments to `ZAdd` in [SortedSetCommands] -type ZAddOptions struct { - conditionalChange ConditionalChange - updateOptions UpdateOptions - changed bool - incr bool - increment float64 - member string -} - -func NewZAddOptionsBuilder() *ZAddOptions { - return &ZAddOptions{} -} - -// `conditionalChange“ defines conditions for updating or adding elements with {@link SortedSetBaseCommands#zadd} -// command. -func (options *ZAddOptions) SetConditionalChange(c ConditionalChange) *ZAddOptions { - options.conditionalChange = c - return options -} - -// `updateOptions` specifies conditions for updating scores with zadd command. -func (options *ZAddOptions) SetUpdateOptions(u UpdateOptions) *ZAddOptions { - options.updateOptions = u - return options -} - -// `Changed` changes the return value from the number of new elements added to the total number of elements changed. -func (options *ZAddOptions) SetChanged(ch bool) (*ZAddOptions, error) { - if options.incr { - return nil, errors.New("changed cannot be set when incr is true") - } - options.changed = ch - return options, nil -} - -// `INCR` sets the increment value to use when incr is true. -func (options *ZAddOptions) SetIncr(incr bool, increment float64, member string) (*ZAddOptions, error) { - if options.changed { - return nil, errors.New("incr cannot be set when changed is true") - } - options.incr = incr - options.increment = increment - options.member = member - return options, nil -} - -// `ToArgs` converts the options to a list of arguments. -func (opts *ZAddOptions) ToArgs() ([]string, error) { - args := []string{} - var err error - - if opts.conditionalChange == OnlyIfExists || opts.conditionalChange == OnlyIfDoesNotExist { - args = append(args, string(opts.conditionalChange)) - } - - if opts.updateOptions == ScoreGreaterThanCurrent || opts.updateOptions == ScoreLessThanCurrent { - args = append(args, string(opts.updateOptions)) - } - - if opts.changed { - args = append(args, ChangedKeyword) - } - - if opts.incr { - args = append(args, IncrKeyword, utils.FloatToString(opts.increment), opts.member) - } - - return args, err -} - -// A ConditionalSet defines whether a new value should be set or not. -type ConditionalChange string - -const ( - // Only update elements that already exist. Don't add new elements. Equivalent to "XX" in the Valkey API. - OnlyIfExists ConditionalChange = "XX" - // Only add new elements. Don't update already existing elements. Equivalent to "NX" in the Valkey API. - OnlyIfDoesNotExist ConditionalChange = "NX" -) - -type UpdateOptions string - -const ( - // Only update existing elements if the new score is less than the current score. Equivalent to - // "LT" in the Valkey API. - ScoreLessThanCurrent UpdateOptions = "LT" - // Only update existing elements if the new score is greater than the current score. Equivalent - // to "GT" in the Valkey API. - ScoreGreaterThanCurrent UpdateOptions = "GT" -) - -const ( - ChangedKeyword string = "CH" // Valkey API keyword used to return total number of elements changed - IncrKeyword string = "INCR" // Valkey API keyword to make zadd act like ZINCRBY. -) diff --git a/go/api/sorted_set_commands.go b/go/api/sorted_set_commands.go index 02a9697e6d..39e1449d87 100644 --- a/go/api/sorted_set_commands.go +++ b/go/api/sorted_set_commands.go @@ -2,10 +2,6 @@ package api -import ( - "github.com/valkey-io/valkey-glide/go/glide/api/options" -) - // SortedSetCommands supports commands and transactions for the "Sorted Set Commands" group for standalone and cluster clients. // // See [valkey.io] for details. @@ -48,7 +44,7 @@ type SortedSetCommands interface { // fmt.Println(res.Value()) // Output: 3 // // [valkey.io]: https://valkey.io/commands/zadd/ - ZAddWithOptions(key string, membersScoreMap map[string]float64, opts *options.ZAddOptions) (Result[int64], error) + ZAddWithOptions(key string, membersScoreMap map[string]float64, opts *ZAddOptions) (Result[int64], error) // Adds one or more members to a sorted set, or updates their scores. Creates the key if it doesn't exist. // @@ -87,7 +83,7 @@ type SortedSetCommands interface { // fmt.Println(res.Value()) // Output: 1.0 // // [valkey.io]: https://valkey.io/commands/zadd/ - ZAddIncrWithOptions(key string, member string, increment float64, opts *options.ZAddOptions) (Result[float64], error) + ZAddIncrWithOptions(key string, member string, increment float64, opts *ZAddOptions) (Result[float64], error) // Increments the score of member in the sorted set stored at key by increment. // If member does not exist in the sorted set, it is added with increment as its score. diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 851bb88ba3..45676aa857 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -2,7 +2,7 @@ package api -// Supports commands and transactions for the "Stream Commands" group for standalone and cluster clients. +// Supports commands and transactions for the "Stream" group of commands for standalone and cluster clients. // // See [valkey.io] for details. // @@ -26,4 +26,25 @@ type StreamCommands interface { // // [valkey.io]: https://valkey.io/commands/xadd/ XAdd(key string, values [][]string) (Result[string], error) + + // Adds an entry to the specified stream stored at `key`. If the `key` doesn't exist, the stream is created. + // + // See [valkey.io] for details. + // + // Parameters: + // key - The key of the stream. + // values - Field-value pairs to be added to the entry. + // options - Stream add options. + // + // Return value: + // The id of the added entry. + // + // For example: + // options := NewXAddOptions().WithId("0-1").WithDontMakeNewStream() + // result, err := client.XAdd("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}, options) + // result.IsNil(): false + // result.Value(): "0-1" + // + // [valkey.io]: https://valkey.io/commands/xadd/ + XAddWithOptions(key string, values [][]string, options *XAddOptions) (Result[string], error) } diff --git a/go/integTest/glide_test_suite_test.go b/go/integTest/glide_test_suite_test.go index 51efe6d7fd..eb80993d9d 100644 --- a/go/integTest/glide_test_suite_test.go +++ b/go/integTest/glide_test_suite_test.go @@ -114,9 +114,9 @@ func extractAddresses(suite *GlideTestSuite, output string) []api.NodeAddress { func runClusterManager(suite *GlideTestSuite, args []string, ignoreExitCode bool) string { pythonArgs := append([]string{"../../utils/cluster_manager.py"}, args...) - output, err := exec.Command("python3", pythonArgs...).Output() + output, err := exec.Command("python3", pythonArgs...).CombinedOutput() if len(output) > 0 { - suite.T().Logf("cluster_manager.py stdout:\n====\n%s\n====\n", string(output)) + suite.T().Logf("cluster_manager.py output:\n====\n%s\n====\n", string(output)) } if err != nil { diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 1a7a6f8883..a495ad9ef3 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -11,7 +11,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/valkey-io/valkey-glide/go/glide/api" - "github.com/valkey-io/valkey-glide/go/glide/api/options" ) const ( @@ -3877,6 +3876,33 @@ func (suite *GlideTestSuite) TestXAdd() { }) } +func (suite *GlideTestSuite) TestXAddWithOptions() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + // stream does not exist + res, err := client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, api.NewXAddOptions().SetDontMakeNewStream()) + assert.Nil(suite.T(), err) + assert.True(suite.T(), res.IsNil()) + + // adding data to with given ID + res, err = client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, api.NewXAddOptions().SetId("0-1")) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "0-1", res.Value()) + + client.XAdd(key, [][]string{{"field2", "value2"}}) + // TODO run XLen there + // this will trim the first entry. + res, err = client.XAddWithOptions( + key, + [][]string{{"field3", "value3"}}, + api.NewXAddOptions().SetTrimOptions(api.NewXTrimOptionsWithMaxLen(2).SetExactTrimming()), + ) + assert.Nil(suite.T(), err) + assert.False(suite.T(), res.IsNil()) + // TODO run XLen there + }) +} + func (suite *GlideTestSuite) TestZAddAndZAddIncr() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String() @@ -3913,8 +3939,8 @@ func (suite *GlideTestSuite) TestZAddAndZAddIncr() { assert.IsType(suite.T(), &api.RequestError{}, err) // with NX & XX - onlyIfExistsOpts := options.NewZAddOptionsBuilder().SetConditionalChange(options.OnlyIfExists) - onlyIfDoesNotExistOpts := options.NewZAddOptionsBuilder().SetConditionalChange(options.OnlyIfDoesNotExist) + onlyIfExistsOpts := api.NewZAddOptionsBuilder().SetConditionalChange(api.OnlyIfExists) + onlyIfDoesNotExistOpts := api.NewZAddOptionsBuilder().SetConditionalChange(api.OnlyIfDoesNotExist) res, err = client.ZAddWithOptions(key3, membersScoreMap, onlyIfExistsOpts) assert.Nil(suite.T(), err) @@ -3945,10 +3971,10 @@ func (suite *GlideTestSuite) TestZAddAndZAddIncr() { membersScoreMap2["one"] = 10.0 - gtOpts := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreGreaterThanCurrent) - ltOpts := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreLessThanCurrent) - gtOptsChanged, _ := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreGreaterThanCurrent).SetChanged(true) - ltOptsChanged, _ := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreLessThanCurrent).SetChanged(true) + gtOpts := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreGreaterThanCurrent) + ltOpts := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreLessThanCurrent) + gtOptsChanged, _ := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreGreaterThanCurrent).SetChanged(true) + ltOptsChanged, _ := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreLessThanCurrent).SetChanged(true) res, err = client.ZAddWithOptions(key4, membersScoreMap2, gtOptsChanged) assert.Nil(suite.T(), err) From 308de786e28aced58fbe5b7b3384e672d11be101 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 24 Dec 2024 12:51:11 -0800 Subject: [PATCH 3/5] options Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 23 +-- go/api/command_options.go | 200 --------------------------- go/api/options/zadd_options.go | 106 ++++++++++++++ go/api/sorted_set_commands.go | 8 +- go/api/stream_commands.go | 10 +- go/integTest/shared_commands_test.go | 23 +-- 6 files changed, 146 insertions(+), 224 deletions(-) create mode 100644 go/api/options/zadd_options.go diff --git a/go/api/base_client.go b/go/api/base_client.go index 877c570e1e..d0ef062629 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -16,6 +16,7 @@ import ( "strconv" "unsafe" + "github.com/valkey-io/valkey-glide/go/glide/api/options" "github.com/valkey-io/valkey-glide/go/glide/protobuf" "github.com/valkey-io/valkey-glide/go/glide/utils" "google.golang.org/protobuf/proto" @@ -1233,13 +1234,17 @@ func (client *baseClient) Renamenx(key string, newKey string) (Result[bool], err } func (client *baseClient) XAdd(key string, values [][]string) (Result[string], error) { - return client.XAddWithOptions(key, values, NewXAddOptions()) + return client.XAddWithOptions(key, values, options.NewXAddOptions()) } -func (client *baseClient) XAddWithOptions(key string, values [][]string, options *XAddOptions) (Result[string], error) { +func (client *baseClient) XAddWithOptions( + key string, + values [][]string, + options *options.XAddOptions, +) (Result[string], error) { args := []string{} args = append(args, key) - optionArgs, err := options.toArgs() + optionArgs, err := options.ToArgs() if err != nil { return CreateNilStringResult(), err } else { @@ -1280,9 +1285,9 @@ func (client *baseClient) ZAdd( func (client *baseClient) ZAddWithOptions( key string, membersScoreMap map[string]float64, - opts *ZAddOptions, + opts *options.ZAddOptions, ) (Result[int64], error) { - optionArgs, err := opts.toArgs() + optionArgs, err := opts.ToArgs() if err != nil { return CreateNilInt64Result(), err } @@ -1298,8 +1303,8 @@ func (client *baseClient) ZAddWithOptions( return handleLongResponse(result) } -func (client *baseClient) zAddIncrBase(key string, opts *ZAddOptions) (Result[float64], error) { - optionArgs, err := opts.toArgs() +func (client *baseClient) zAddIncrBase(key string, opts *options.ZAddOptions) (Result[float64], error) { + optionArgs, err := opts.ToArgs() if err != nil { return CreateNilFloat64Result(), err } @@ -1317,7 +1322,7 @@ func (client *baseClient) ZAddIncr( member string, increment float64, ) (Result[float64], error) { - options, err := NewZAddOptionsBuilder().SetIncr(true, increment, member) + options, err := options.NewZAddOptionsBuilder().SetIncr(true, increment, member) if err != nil { return CreateNilFloat64Result(), err } @@ -1329,7 +1334,7 @@ func (client *baseClient) ZAddIncrWithOptions( key string, member string, increment float64, - opts *ZAddOptions, + opts *options.ZAddOptions, ) (Result[float64], error) { incrOpts, err := opts.SetIncr(true, increment, member) if err != nil { diff --git a/go/api/command_options.go b/go/api/command_options.go index 00fff04b98..d2934b869e 100644 --- a/go/api/command_options.go +++ b/go/api/command_options.go @@ -3,7 +3,6 @@ package api import ( - "errors" "strconv" "github.com/valkey-io/valkey-glide/go/glide/utils" @@ -322,202 +321,3 @@ func (opts *BaseScanOptions) toArgs() ([]string, error) { return args, err } - -// Optional arguments to `ZAdd` in [SortedSetCommands] -type ZAddOptions struct { - conditionalChange ConditionalSet - updateOptions UpdateOptions - changed bool - incr bool - increment float64 - member string -} - -func NewZAddOptionsBuilder() *ZAddOptions { - return &ZAddOptions{} -} - -// `conditionalChange` defines conditions for updating or adding elements with `ZAdd` command. -func (options *ZAddOptions) SetConditionalChange(c ConditionalSet) *ZAddOptions { - options.conditionalChange = c - return options -} - -// `updateOptions` specifies conditions for updating scores with `ZAdd` command. -func (options *ZAddOptions) SetUpdateOptions(u UpdateOptions) *ZAddOptions { - options.updateOptions = u - return options -} - -// `Changed` changes the return value from the number of new elements added to the total number of elements changed. -func (options *ZAddOptions) SetChanged(ch bool) (*ZAddOptions, error) { - if options.incr { - return nil, errors.New("changed cannot be set when incr is true") - } - options.changed = ch - return options, nil -} - -// `INCR` sets the increment value to use when incr is true. -func (options *ZAddOptions) SetIncr(incr bool, increment float64, member string) (*ZAddOptions, error) { - if options.changed { - return nil, errors.New("incr cannot be set when changed is true") - } - options.incr = incr - options.increment = increment - options.member = member - return options, nil -} - -// `toArgs` converts the options to a list of arguments. -func (opts *ZAddOptions) toArgs() ([]string, error) { - args := []string{} - var err error - - if opts.conditionalChange == OnlyIfExists || opts.conditionalChange == OnlyIfDoesNotExist { - args = append(args, string(opts.conditionalChange)) - } - - if opts.updateOptions == ScoreGreaterThanCurrent || opts.updateOptions == ScoreLessThanCurrent { - args = append(args, string(opts.updateOptions)) - } - - if opts.changed { - args = append(args, ChangedKeyword) - } - - if opts.incr { - args = append(args, IncrKeyword, utils.FloatToString(opts.increment), opts.member) - } - - return args, err -} - -type UpdateOptions string - -const ( - // Only update existing elements if the new score is less than the current score. Equivalent to - // "LT" in the Valkey API. - ScoreLessThanCurrent UpdateOptions = "LT" - // Only update existing elements if the new score is greater than the current score. Equivalent - // to "GT" in the Valkey API. - ScoreGreaterThanCurrent UpdateOptions = "GT" -) - -const ( - ChangedKeyword string = "CH" // Valkey API keyword used to return total number of elements changed - IncrKeyword string = "INCR" // Valkey API keyword to make zadd act like ZINCRBY. -) - -type triStateBool int - -// Tri-state bool for use option builders. We cannot rely on the default value of an non-initialized variable. -const ( - triStateBoolUndefined triStateBool = iota - triStateBoolTrue - triStateBoolFalse -) - -// Optional arguments to `XAdd` in [StreamCommands] -type XAddOptions struct { - id string - makeStream triStateBool - trimOptions *XTrimOptions -} - -// Create new empty `XAddOptions` -func NewXAddOptions() *XAddOptions { - return &XAddOptions{} -} - -// New entry will be added with this `id“. -func (xao *XAddOptions) SetId(id string) *XAddOptions { - xao.id = id - return xao -} - -// If set, a new stream won't be created if no stream matches the given key. -func (xao *XAddOptions) SetDontMakeNewStream() *XAddOptions { - xao.makeStream = triStateBoolFalse - return xao -} - -// If set, add operation will also trim the older entries in the stream. -func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions { - xao.trimOptions = options - return xao -} - -func (xao *XAddOptions) toArgs() ([]string, error) { - args := []string{} - var err error - if xao.makeStream == triStateBoolFalse { - args = append(args, "NOMKSTREAM") - } - if xao.trimOptions != nil { - moreArgs, err := xao.trimOptions.toArgs() - if err != nil { - return args, err - } - args = append(args, moreArgs...) - } - if xao.id != "" { - args = append(args, xao.id) - } else { - args = append(args, "*") - } - return args, err -} - -// Optional arguments for `XTrim` and `XAdd` in [StreamCommands] -type XTrimOptions struct { - exact triStateBool - limit int64 - method string - threshold string -} - -// Option to trim the stream according to minimum ID. -func NewXTrimOptionsWithMinId(threshold string) *XTrimOptions { - return &XTrimOptions{threshold: threshold, method: "MINID"} -} - -// Option to trim the stream according to maximum stream length. -func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions { - return &XTrimOptions{threshold: utils.IntToString(threshold), method: "MAXLEN"} -} - -// Match exactly on the threshold. -func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions { - xto.exact = triStateBoolTrue - return xto -} - -// Trim in a near-exact manner, which is more efficient. -func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { - xto.exact = triStateBoolFalse - return xto -} - -// Max number of stream entries to be trimmed for non-exact match. -func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { - xto.exact = triStateBoolFalse - xto.limit = limit - return xto -} - -func (xto *XTrimOptions) toArgs() ([]string, error) { - args := []string{} - args = append(args, xto.method) - if xto.exact == triStateBoolTrue { - args = append(args, "=") - } else if xto.exact == triStateBoolFalse { - args = append(args, "~") - } - args = append(args, xto.threshold) - if xto.limit > 0 { - args = append(args, "LIMIT", utils.IntToString(xto.limit)) - } - var err error - return args, err -} diff --git a/go/api/options/zadd_options.go b/go/api/options/zadd_options.go new file mode 100644 index 0000000000..7926b346cc --- /dev/null +++ b/go/api/options/zadd_options.go @@ -0,0 +1,106 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package options + +import ( + "errors" + + "github.com/valkey-io/valkey-glide/go/glide/utils" +) + +// Optional arguments to `ZAdd` in [SortedSetCommands] +type ZAddOptions struct { + conditionalChange ConditionalChange + updateOptions UpdateOptions + changed bool + incr bool + increment float64 + member string +} + +func NewZAddOptionsBuilder() *ZAddOptions { + return &ZAddOptions{} +} + +// `conditionalChange“ defines conditions for updating or adding elements with {@link SortedSetBaseCommands#zadd} +// command. +func (options *ZAddOptions) SetConditionalChange(c ConditionalChange) *ZAddOptions { + options.conditionalChange = c + return options +} + +// `updateOptions` specifies conditions for updating scores with zadd command. +func (options *ZAddOptions) SetUpdateOptions(u UpdateOptions) *ZAddOptions { + options.updateOptions = u + return options +} + +// `Changed` changes the return value from the number of new elements added to the total number of elements changed. +func (options *ZAddOptions) SetChanged(ch bool) (*ZAddOptions, error) { + if options.incr { + return nil, errors.New("changed cannot be set when incr is true") + } + options.changed = ch + return options, nil +} + +// `INCR` sets the increment value to use when incr is true. +func (options *ZAddOptions) SetIncr(incr bool, increment float64, member string) (*ZAddOptions, error) { + if options.changed { + return nil, errors.New("incr cannot be set when changed is true") + } + options.incr = incr + options.increment = increment + options.member = member + return options, nil +} + +// `ToArgs` converts the options to a list of arguments. +func (opts *ZAddOptions) ToArgs() ([]string, error) { + args := []string{} + var err error + + if opts.conditionalChange == OnlyIfExists || opts.conditionalChange == OnlyIfDoesNotExist { + args = append(args, string(opts.conditionalChange)) + } + + if opts.updateOptions == ScoreGreaterThanCurrent || opts.updateOptions == ScoreLessThanCurrent { + args = append(args, string(opts.updateOptions)) + } + + if opts.changed { + args = append(args, ChangedKeyword) + } + + if opts.incr { + args = append(args, IncrKeyword, utils.FloatToString(opts.increment), opts.member) + } + + return args, err +} + +// A ConditionalSet defines whether a new value should be set or not. +type ConditionalChange string + +const ( + // Only update elements that already exist. Don't add new elements. Equivalent to "XX" in the Valkey API. + OnlyIfExists ConditionalChange = "XX" + // Only add new elements. Don't update already existing elements. Equivalent to "NX" in the Valkey API. + OnlyIfDoesNotExist ConditionalChange = "NX" +) + +type UpdateOptions string + +const ( + // Only update existing elements if the new score is less than the current score. Equivalent to + // "LT" in the Valkey API. + ScoreLessThanCurrent UpdateOptions = "LT" + // Only update existing elements if the new score is greater than the current score. Equivalent + // to "GT" in the Valkey API. + ScoreGreaterThanCurrent UpdateOptions = "GT" +) + +const ( + ChangedKeyword string = "CH" // Valkey API keyword used to return total number of elements changed + IncrKeyword string = "INCR" // Valkey API keyword to make zadd act like ZINCRBY. +) diff --git a/go/api/sorted_set_commands.go b/go/api/sorted_set_commands.go index 39e1449d87..02a9697e6d 100644 --- a/go/api/sorted_set_commands.go +++ b/go/api/sorted_set_commands.go @@ -2,6 +2,10 @@ package api +import ( + "github.com/valkey-io/valkey-glide/go/glide/api/options" +) + // SortedSetCommands supports commands and transactions for the "Sorted Set Commands" group for standalone and cluster clients. // // See [valkey.io] for details. @@ -44,7 +48,7 @@ type SortedSetCommands interface { // fmt.Println(res.Value()) // Output: 3 // // [valkey.io]: https://valkey.io/commands/zadd/ - ZAddWithOptions(key string, membersScoreMap map[string]float64, opts *ZAddOptions) (Result[int64], error) + ZAddWithOptions(key string, membersScoreMap map[string]float64, opts *options.ZAddOptions) (Result[int64], error) // Adds one or more members to a sorted set, or updates their scores. Creates the key if it doesn't exist. // @@ -83,7 +87,7 @@ type SortedSetCommands interface { // fmt.Println(res.Value()) // Output: 1.0 // // [valkey.io]: https://valkey.io/commands/zadd/ - ZAddIncrWithOptions(key string, member string, increment float64, opts *ZAddOptions) (Result[float64], error) + ZAddIncrWithOptions(key string, member string, increment float64, opts *options.ZAddOptions) (Result[float64], error) // Increments the score of member in the sorted set stored at key by increment. // If member does not exist in the sorted set, it is added with increment as its score. diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 45676aa857..1696a168c2 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -2,6 +2,8 @@ package api +import "github.com/valkey-io/valkey-glide/go/glide/api/options" + // Supports commands and transactions for the "Stream" group of commands for standalone and cluster clients. // // See [valkey.io] for details. @@ -40,11 +42,11 @@ type StreamCommands interface { // The id of the added entry. // // For example: - // options := NewXAddOptions().WithId("0-1").WithDontMakeNewStream() - // result, err := client.XAdd("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}, options) + // options := options.NewXAddOptions().SetId("100-500").SetDontMakeNewStream() + // result, err := client.XAddWithOptions("myStream", [][]string{{"field1", "value1"}, {"field2", "value2"}}, options) // result.IsNil(): false - // result.Value(): "0-1" + // result.Value(): "100-500" // // [valkey.io]: https://valkey.io/commands/xadd/ - XAddWithOptions(key string, values [][]string, options *XAddOptions) (Result[string], error) + XAddWithOptions(key string, values [][]string, options *options.XAddOptions) (Result[string], error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index a495ad9ef3..7a7c0c50e9 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/valkey-io/valkey-glide/go/glide/api" + "github.com/valkey-io/valkey-glide/go/glide/api/options" ) const ( @@ -3880,12 +3881,16 @@ func (suite *GlideTestSuite) TestXAddWithOptions() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.NewString() // stream does not exist - res, err := client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, api.NewXAddOptions().SetDontMakeNewStream()) + res, err := client.XAddWithOptions( + key, + [][]string{{"field1", "value1"}}, + options.NewXAddOptions().SetDontMakeNewStream(), + ) assert.Nil(suite.T(), err) assert.True(suite.T(), res.IsNil()) // adding data to with given ID - res, err = client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, api.NewXAddOptions().SetId("0-1")) + res, err = client.XAddWithOptions(key, [][]string{{"field1", "value1"}}, options.NewXAddOptions().SetId("0-1")) assert.Nil(suite.T(), err) assert.Equal(suite.T(), "0-1", res.Value()) @@ -3895,7 +3900,7 @@ func (suite *GlideTestSuite) TestXAddWithOptions() { res, err = client.XAddWithOptions( key, [][]string{{"field3", "value3"}}, - api.NewXAddOptions().SetTrimOptions(api.NewXTrimOptionsWithMaxLen(2).SetExactTrimming()), + options.NewXAddOptions().SetTrimOptions(options.NewXTrimOptionsWithMaxLen(2).SetExactTrimming()), ) assert.Nil(suite.T(), err) assert.False(suite.T(), res.IsNil()) @@ -3939,8 +3944,8 @@ func (suite *GlideTestSuite) TestZAddAndZAddIncr() { assert.IsType(suite.T(), &api.RequestError{}, err) // with NX & XX - onlyIfExistsOpts := api.NewZAddOptionsBuilder().SetConditionalChange(api.OnlyIfExists) - onlyIfDoesNotExistOpts := api.NewZAddOptionsBuilder().SetConditionalChange(api.OnlyIfDoesNotExist) + onlyIfExistsOpts := options.NewZAddOptionsBuilder().SetConditionalChange(options.OnlyIfExists) + onlyIfDoesNotExistOpts := options.NewZAddOptionsBuilder().SetConditionalChange(options.OnlyIfDoesNotExist) res, err = client.ZAddWithOptions(key3, membersScoreMap, onlyIfExistsOpts) assert.Nil(suite.T(), err) @@ -3971,10 +3976,10 @@ func (suite *GlideTestSuite) TestZAddAndZAddIncr() { membersScoreMap2["one"] = 10.0 - gtOpts := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreGreaterThanCurrent) - ltOpts := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreLessThanCurrent) - gtOptsChanged, _ := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreGreaterThanCurrent).SetChanged(true) - ltOptsChanged, _ := api.NewZAddOptionsBuilder().SetUpdateOptions(api.ScoreLessThanCurrent).SetChanged(true) + gtOpts := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreGreaterThanCurrent) + ltOpts := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreLessThanCurrent) + gtOptsChanged, _ := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreGreaterThanCurrent).SetChanged(true) + ltOptsChanged, _ := options.NewZAddOptionsBuilder().SetUpdateOptions(options.ScoreLessThanCurrent).SetChanged(true) res, err = client.ZAddWithOptions(key4, membersScoreMap2, gtOptsChanged) assert.Nil(suite.T(), err) From 06c0145576125d5ec947c6405d325905702638bf Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Tue, 24 Dec 2024 13:06:33 -0800 Subject: [PATCH 4/5] add missing file Signed-off-by: Yury-Fridlyand --- go/api/options/stream_options.go | 120 +++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 go/api/options/stream_options.go diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go new file mode 100644 index 0000000000..2a07c0ad2c --- /dev/null +++ b/go/api/options/stream_options.go @@ -0,0 +1,120 @@ +// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 + +package options + +import ( + "github.com/valkey-io/valkey-glide/go/glide/utils" +) + +type triStateBool int + +// Tri-state bool for use option builders. We cannot rely on the default value of an non-initialized variable. +const ( + triStateBoolUndefined triStateBool = iota + triStateBoolTrue + triStateBoolFalse +) + +// Optional arguments to `XAdd` in [StreamCommands] +type XAddOptions struct { + id string + makeStream triStateBool + trimOptions *XTrimOptions +} + +// Create new empty `XAddOptions` +func NewXAddOptions() *XAddOptions { + return &XAddOptions{} +} + +// New entry will be added with this `id“. +func (xao *XAddOptions) SetId(id string) *XAddOptions { + xao.id = id + return xao +} + +// If set, a new stream won't be created if no stream matches the given key. +func (xao *XAddOptions) SetDontMakeNewStream() *XAddOptions { + xao.makeStream = triStateBoolFalse + return xao +} + +// If set, add operation will also trim the older entries in the stream. +func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions { + xao.trimOptions = options + return xao +} + +func (xao *XAddOptions) ToArgs() ([]string, error) { + args := []string{} + var err error + if xao.makeStream == triStateBoolFalse { + args = append(args, "NOMKSTREAM") + } + if xao.trimOptions != nil { + moreArgs, err := xao.trimOptions.ToArgs() + if err != nil { + return args, err + } + args = append(args, moreArgs...) + } + if xao.id != "" { + args = append(args, xao.id) + } else { + args = append(args, "*") + } + return args, err +} + +// Optional arguments for `XTrim` and `XAdd` in [StreamCommands] +type XTrimOptions struct { + exact triStateBool + limit int64 + method string + threshold string +} + +// Option to trim the stream according to minimum ID. +func NewXTrimOptionsWithMinId(threshold string) *XTrimOptions { + return &XTrimOptions{threshold: threshold, method: "MINID"} +} + +// Option to trim the stream according to maximum stream length. +func NewXTrimOptionsWithMaxLen(threshold int64) *XTrimOptions { + return &XTrimOptions{threshold: utils.IntToString(threshold), method: "MAXLEN"} +} + +// Match exactly on the threshold. +func (xto *XTrimOptions) SetExactTrimming() *XTrimOptions { + xto.exact = triStateBoolTrue + return xto +} + +// Trim in a near-exact manner, which is more efficient. +func (xto *XTrimOptions) SetNearlyExactTrimming() *XTrimOptions { + xto.exact = triStateBoolFalse + return xto +} + +// Max number of stream entries to be trimmed for non-exact match. +func (xto *XTrimOptions) SetNearlyExactTrimmingAndLimit(limit int64) *XTrimOptions { + xto.exact = triStateBoolFalse + xto.limit = limit + return xto +} + +func (xto *XTrimOptions) ToArgs() ([]string, error) { + args := []string{} + args = append(args, xto.method) + if xto.exact == triStateBoolTrue { + args = append(args, "=") + } else if xto.exact == triStateBoolFalse { + args = append(args, "~") + } + args = append(args, xto.threshold) + if xto.limit > 0 { + args = append(args, "LIMIT", utils.IntToString(xto.limit)) + } + var err error + return args, err +} From fc05a12be8bfe4f748bf145c6ae27b18ade0f39e Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 27 Dec 2024 14:54:34 -0800 Subject: [PATCH 5/5] Address PR comments. Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 1a3855ac04..3de2a5170f 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1269,9 +1269,8 @@ func (client *baseClient) XAddWithOptions( optionArgs, err := options.ToArgs() if err != nil { return CreateNilStringResult(), err - } else { - args = append(args, optionArgs...) } + args = append(args, optionArgs...) for _, pair := range values { if len(pair) != 2 { return CreateNilStringResult(), fmt.Errorf(