Skip to content

Commit

Permalink
New Aggregate function: return map[string]interface{} (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
Avital-Fine authored Apr 27, 2022
1 parent cc7ddce commit ef3e8f4
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
TLS_CACERT=redis/tests/tls/ca.crt
build: # test with redisearch:latest
build:
docker:
- image: circleci/golang:1.16
- image: redislabs/redisearch:edge
Expand Down
69 changes: 61 additions & 8 deletions redisearch/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (q AggregateQuery) Serialize() redis.Args {

// Deprecated: Please use processAggReply() instead
func ProcessAggResponse(res []interface{}) [][]string {
aggregateReply := make([][]string, len(res), len(res))
aggregateReply := make([][]string, len(res))
for i := 0; i < len(res); i++ {
if d, e := redis.Strings(res[i], nil); e == nil {
aggregateReply[i] = d
Expand All @@ -293,14 +293,15 @@ func ProcessAggResponse(res []interface{}) [][]string {
return aggregateReply
}

// Deprecated: Please use processAggQueryReply() instead
func processAggReply(res []interface{}) (total int, aggregateReply [][]string, err error) {
aggregateReply = [][]string{}
total = 0
aggregate_results := len(res) - 1
if aggregate_results > 0 {
total = aggregate_results
aggregateReply = make([][]string, aggregate_results, aggregate_results)
for i := 0; i < aggregate_results; i++ {
aggregateResults := len(res) - 1
if aggregateResults > 0 {
total = aggregateResults
aggregateReply = make([][]string, aggregateResults)
for i := 0; i < aggregateResults; i++ {
if d, e := redis.Strings(res[i+1], nil); e == nil {
aggregateReply[i] = d
} else {
Expand All @@ -312,13 +313,33 @@ func processAggReply(res []interface{}) (total int, aggregateReply [][]string, e
return
}

// New Aggregate reply processor
func processAggQueryReply(res []interface{}) (total int, aggregateReply []map[string]interface{}, err error) {
aggregateReply = []map[string]interface{}{}
total = 0
aggregateResults := len(res) - 1
if aggregateResults > 0 {
total = aggregateResults
aggregateReply = make([]map[string]interface{}, aggregateResults)
for i := 0; i < aggregateResults; i++ {
if d, e := mapToStrings(res[i+1], nil); e == nil {
aggregateReply[i] = d
} else {
err = fmt.Errorf("Error parsing Aggregate Reply: %v on reply position %d", e, i)
aggregateReply[i] = nil
}
}
}
return
}

func ProcessAggResponseSS(res []interface{}) [][]string {
var lout = len(res)
aggregateReply := make([][]string, lout, lout)
aggregateReply := make([][]string, lout)
for i := 0; i < lout; i++ {
reply := res[i].([]interface{})
linner := len(reply)
aggregateReply[i] = make([]string, linner, linner)
aggregateReply[i] = make([]string, linner)
for j := 0; j < linner; j++ {
if reply[j] == nil {
log.Print(fmt.Sprintf("Error parsing Aggregate Reply on position (%d,%d)", i, j))
Expand All @@ -330,3 +351,35 @@ func ProcessAggResponseSS(res []interface{}) [][]string {
}
return aggregateReply
}

// mapToStrings is a helper that converts an array (alternating key, value) into a map[string]interface{}.
// The value can be string or []string. Numbers will be treated as strings. Requires an even number of
// values in result.
func mapToStrings(result interface{}, err error) (map[string]interface{}, error) {
values, err := redis.Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, fmt.Errorf("redigo: mapToStrings expects even number of values result")
}
m := make(map[string]interface{}, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, okKey := redis.String(values[i], err)
if okKey != nil {
return nil, fmt.Errorf("mapToStrings key not a bulk string value")
}

var value interface{}
value, okValue := redis.String(values[i+1], err)
if okValue != nil {
value, okValue = redis.Strings(values[i+1], err)
}
if okValue != nil && okValue != redis.ErrNil {
return nil, fmt.Errorf("mapToStrings value got unexpected element type: %T", values[i+1])
}

m[string(key)] = value
}
return m, nil
}
45 changes: 45 additions & 0 deletions redisearch/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func Init() {

AddValues(c)
}

func TestAggregateGroupBy(t *testing.T) {
Init()
c := createClient("docs-games-idx1")
Expand All @@ -161,6 +162,10 @@ func TestAggregateGroupBy(t *testing.T) {
_, count, err := c.Aggregate(q1)
assert.Nil(t, err)
assert.Equal(t, 5, count)

count, _, err = c.AggregateQuery(q1)
assert.Nil(t, err)
assert.Equal(t, 5, count)
}

func TestAggregateMinMax(t *testing.T) {
Expand All @@ -181,6 +186,13 @@ func TestAggregateMinMax(t *testing.T) {
assert.GreaterOrEqual(t, f, 88.0)
assert.Less(t, f, 89.0)

_, rep, err := c.AggregateQuery(q1)
assert.Nil(t, err)
fmt.Println(rep[0])
f, _ = strconv.ParseFloat(rep[0]["minPrice"].(string), 64)
assert.GreaterOrEqual(t, f, 88.0)
assert.Less(t, f, 89.0)

q2 := NewAggregateQuery().SetQuery(NewQuery("sony")).
GroupBy(*NewGroupBy().AddFields("@brand").
Reduce(*NewReducer(GroupByReducerCount, []string{})).
Expand All @@ -193,6 +205,12 @@ func TestAggregateMinMax(t *testing.T) {
f, _ = strconv.ParseFloat(row[5], 64)
assert.GreaterOrEqual(t, f, 695.0)
assert.Less(t, f, 696.0)

_, rep, err = c.AggregateQuery(q2)
assert.Nil(t, err)
f, _ = strconv.ParseFloat(rep[0]["maxPrice"].(string), 64)
assert.GreaterOrEqual(t, f, 695.0)
assert.Less(t, f, 696.0)
}

func TestAggregateCountDistinct(t *testing.T) {
Expand All @@ -208,6 +226,27 @@ func TestAggregateCountDistinct(t *testing.T) {
assert.Nil(t, err)
row := res[0]
assert.Equal(t, "1484", row[3])

_, rep, err := c.AggregateQuery(q1)
assert.Nil(t, err)
assert.Equal(t, "1484", rep[0]["count_distinct(title)"])
}

func TestAggregateToList(t *testing.T) {
Init()
c := createClient("docs-games-idx1")

q1 := NewAggregateQuery().
GroupBy(*NewGroupBy().AddFields("@brand").
Reduce(*NewReducer(GroupByReducerToList, []string{"@brand"})))

total, reply, err := c.AggregateQuery(q1) // Can't be used with Aggregate when using ToList!
assert.Nil(t, err)
assert.Equal(t, 292, total)
_, ok := reply[0]["brand"].(string)
assert.True(t, ok)
_, ok = reply[0]["__generated_aliastolistbrand"].([]string)
assert.True(t, ok)
}

func TestAggregateFilter(t *testing.T) {
Expand All @@ -226,6 +265,12 @@ func TestAggregateFilter(t *testing.T) {
assert.Greater(t, f, 5.0)
}

_, rep, err := c.AggregateQuery(q1)
assert.Nil(t, err)
for _, row := range rep {
f, _ := strconv.ParseFloat(row["count"].(string), 64)
assert.Greater(t, f, 5.0)
}
}

func makeAggResponseInterface(seed int64, nElements int, responseSizes []int) (res []interface{}) {
Expand Down
61 changes: 43 additions & 18 deletions redisearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,26 +245,12 @@ func (i *Client) SpellCheck(q *Query, s *SpellCheckOptions) (suggs []MisspelledT
return
}

// Aggregate
// Deprecated: Use AggregateQuery() instead.
func (i *Client) Aggregate(q *AggregateQuery) (aggregateReply [][]string, total int, err error) {
conn := i.pool.Get()
defer conn.Close()
hasCursor := q.WithCursor
validCursor := q.CursorHasResults()
var res []interface{} = nil
if !validCursor {
args := redis.Args{i.name}
args = append(args, q.Serialize()...)
res, err = redis.Values(conn.Do("FT.AGGREGATE", args...))
} else {
args := redis.Args{"READ", i.name, q.Cursor.Id}
res, err = redis.Values(conn.Do("FT.CURSOR", args...))
}
if err != nil {
return
}
res, err := i.aggregate(q)

// has no cursor
if !hasCursor {
if !q.WithCursor {
total, aggregateReply, err = processAggReply(res)
// has cursor
} else {
Expand All @@ -278,7 +264,46 @@ func (i *Client) Aggregate(q *AggregateQuery) (aggregateReply [][]string, total
}
total, aggregateReply, err = processAggReply(partialResults)
}
return
}

// AggregateQuery - New version to Aggregate() function. The values in each map can be string or []string.
func (i *Client) AggregateQuery(q *AggregateQuery) (total int, aggregateReply []map[string]interface{}, err error) {
res, err := i.aggregate(q)

// has no cursor
if !q.WithCursor {
total, aggregateReply, err = processAggQueryReply(res)
// has cursor
} else {
var partialResults, err = redis.Values(res[0], nil)
if err != nil {
return total, aggregateReply, err
}
q.Cursor.Id, err = redis.Int(res[1], nil)
if err != nil {
return total, aggregateReply, err
}
total, aggregateReply, err = processAggQueryReply(partialResults)
}
return
}

func (i *Client) aggregate(q *AggregateQuery) (res []interface{}, err error) {
conn := i.pool.Get()
defer conn.Close()
validCursor := q.CursorHasResults()
if !validCursor {
args := redis.Args{i.name}
args = append(args, q.Serialize()...)
res, err = redis.Values(conn.Do("FT.AGGREGATE", args...))
} else {
args := redis.Args{"READ", i.name, q.Cursor.Id}
res, err = redis.Values(conn.Do("FT.CURSOR", args...))
}
if err != nil {
return
}
return
}

Expand Down

0 comments on commit ef3e8f4

Please sign in to comment.