Skip to content

Commit

Permalink
Revert "Revert "Merge pull request grafana#971 from grafana/batch-ind…
Browse files Browse the repository at this point in the history
…ex-lookups""

This reverts commit 8b74f9053037c4590ec1512b65288e4869d5e748.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Sep 11, 2018
1 parent 13d488b commit 65281a1
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 206 deletions.
32 changes: 22 additions & 10 deletions aws/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
chunk_util "github.com/weaveworks/cortex/pkg/chunk/util"
"github.com/weaveworks/cortex/pkg/util"
)

Expand Down Expand Up @@ -301,7 +302,11 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
return backoff.Err()
}

func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
func (a storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
return chunk_util.DoParallelQueries(ctx, a.query, queries, callback)
}

func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue})
defer sp.Finish()

Expand Down Expand Up @@ -371,7 +376,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c
return nil
}

func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) {
func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) {
backoff := util.NewBackoff(ctx, a.cfg.backoffConfig)
defer func() {
dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries()))
Expand Down Expand Up @@ -401,7 +406,10 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput
}

queryOutput := page.Data().(*dynamodb.QueryOutput)
return dynamoDBReadResponse(queryOutput.Items), nil
return &dynamoDBReadResponse{
i: -1,
items: queryOutput.Items,
}, nil
}
return nil, fmt.Errorf("QueryPage error: %s for table %v, last error %v", backoff.Err(), *input.TableName, err)
}
Expand Down Expand Up @@ -785,18 +793,22 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e
}

// Slice of values returned; map key is attribute name
type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue
type dynamoDBReadResponse struct {
i int
items []map[string]*dynamodb.AttributeValue
}

func (b dynamoDBReadResponse) Len() int {
return len(b)
func (b *dynamoDBReadResponse) Next() bool {
b.i++
return b.i < len(b.items)
}

func (b dynamoDBReadResponse) RangeValue(i int) []byte {
return b[i][rangeKey].B
func (b *dynamoDBReadResponse) RangeValue() []byte {
return b.items[b.i][rangeKey].B
}

func (b dynamoDBReadResponse) Value(i int) []byte {
chunkValue, ok := b[i][valueKey]
func (b *dynamoDBReadResponse) Value() []byte {
chunkValue, ok := b.items[b.i][valueKey]
if !ok {
return nil
}
Expand Down
28 changes: 16 additions & 12 deletions cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/common/model"

"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/util"
)

const (
Expand Down Expand Up @@ -185,7 +186,11 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch)
return nil
}

func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
func (s *storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
return util.DoParallelQueries(ctx, s.query, queries, callback)
}

func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
var q *gocql.Query

switch {
Expand Down Expand Up @@ -218,7 +223,7 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery,
defer iter.Close()
scanner := iter.Scanner()
for scanner.Next() {
var b readBatch
b := &readBatch{}
if err := scanner.Scan(&b.rangeValue, &b.value); err != nil {
return errors.WithStack(err)
}
Expand All @@ -231,27 +236,26 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery,

// readBatch represents a batch of rows read from Cassandra.
type readBatch struct {
consumed bool
rangeValue []byte
value []byte
}

// Len implements chunk.ReadBatch; in Cassandra we 'stream' results back
// one-by-one, so this always returns 1.
func (readBatch) Len() int {
return 1
func (b *readBatch) Next() bool {
if b.consumed {
return false
}
b.consumed = true
return true
}

func (b readBatch) RangeValue(index int) []byte {
if index != 0 {
panic("index != 0")
}
func (b *readBatch) RangeValue() []byte {
return b.rangeValue
}

func (b readBatch) Value(index int) []byte {
if index != 0 {
panic("index != 0")
}
func (b *readBatch) Value() []byte {
return b.value
}

Expand Down
45 changes: 7 additions & 38 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,53 +347,22 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode
}

func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
incomingEntries := make(chan []IndexEntry)
incomingErrors := make(chan error)
for _, query := range queries {
go func(query IndexQuery) {
entries, err := c.lookupEntriesByQuery(ctx, query)
if err != nil {
incomingErrors <- err
} else {
incomingEntries <- entries
}
}(query)
}

// Combine the results into one slice
var entries []IndexEntry
var lastErr error
for i := 0; i < len(queries); i++ {
select {
case incoming := <-incomingEntries:
entries = append(entries, incoming...)
case err := <-incomingErrors:
lastErr = err
}
}

return entries, lastErr
}

func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) {
var entries []IndexEntry

if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) {
for i := 0; i < resp.Len(); i++ {
err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
for resp.Next() {
entries = append(entries, IndexEntry{
TableName: query.TableName,
HashValue: query.HashValue,
RangeValue: resp.RangeValue(i),
Value: resp.Value(i),
RangeValue: resp.RangeValue(),
Value: resp.Value(),
})
}
return true
}); err != nil {
})
if err != nil {
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err)
return nil, err
}

return entries, nil
return entries, err
}

func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) {
Expand Down
Loading

0 comments on commit 65281a1

Please sign in to comment.