Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributing sum queries #1878

Merged
merged 26 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
de8f330
querier.sum-shards
owen-d Jan 16, 2020
5fb7c29
addresses pr comments
owen-d Jan 24, 2020
afdd99d
instruments frontend sharding, splitby
owen-d Jan 28, 2020
6bf5f20
LabelsSeriesID unexported again
owen-d Jan 28, 2020
3e28014
removes unnecessary codec interface in astmapping
owen-d Jan 28, 2020
5aa58bc
simplifies VectorSquasher as we never use matrices
owen-d Jan 28, 2020
26e488b
combines queryrange series & value files
owen-d Jan 28, 2020
ba07d5b
removes noops struct embedding strategy in schema, provides noop impl…
owen-d Jan 28, 2020
025f87d
NewSubtreeFolder no longer can return an error as it inlines the json…
owen-d Jan 28, 2020
09ac713
account for QueryIngestersWithin renaming
owen-d Jan 28, 2020
dc629c1
fixes rebase import collision
owen-d Jan 31, 2020
b9a2b67
fixes rebase conflicts
owen-d Feb 3, 2020
5ac6309
-marks absent as non parallelizable
owen-d Feb 3, 2020
ddd47b4
upstream promql compatibility changes
owen-d Feb 5, 2020
07e894c
addresses pr comments
owen-d Feb 10, 2020
4c6f40b
import collisions
owen-d Feb 10, 2020
42851ff
linting - fixes goimports -local requirement
owen-d Feb 10, 2020
a4dcfff
Merge remote-tracking branch 'upstream/master' into feature/query-sha…
owen-d Feb 19, 2020
7347d45
fixes merge conflicts
owen-d Feb 19, 2020
10dd597
addresses pr comments
owen-d Feb 20, 2020
a309f62
stylistic changes
owen-d Feb 20, 2020
bc3f7f5
s/downstream/sharded/
owen-d Feb 20, 2020
1a159f0
s/sum_shards/parallelise_shardable_queries/
owen-d Feb 20, 2020
65718db
query-audit docs
owen-d Feb 20, 2020
7e56489
notes sharded parallelizations are only supported by chunk store
owen-d Feb 20, 2020
e6cb0b1
doc suggestions
owen-d Feb 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ Note that the ruler flags need to be changed in this upgrade. You're moving from
Further, if you're using the configs service, we've upgraded the migration library and this requires some manual intervention. See full instructions below to upgrade your PostgreSQL.

* [CHANGE] The frontend component now does not cache results if it finds a `Cache-Control` header and if one of its values is `no-store`. #1974
* [FEATURE] Fan out parallelizable queries to backend queriers concurrently.
owen-d marked this conversation as resolved.
Show resolved Hide resolved
* `-querier.sum-shards` (bool)
* Requires a shard-compatible schema (v10+)
* This causes the number of traces to increase accordingly.
* The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume.
owen-d marked this conversation as resolved.
Show resolved Hide resolved
* It's also advised to increase downstream concurrency controls as well:
* `querier.max-outstanding-requests-per-tenant`
* `querier.max-query-parallelism`
* `querier.max-concurrent`
* `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled.
owen-d marked this conversation as resolved.
Show resolved Hide resolved
* [CHANGE] Flags changed with transition to upstream Prometheus rules manager:
* `-ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`.
* `-ruler.group-timeout`has been removed.
Expand Down
22 changes: 22 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ The ingester query API was improved over time, but defaults to the old behaviour

## Query Frontend

- `-querier.sum-shards`

If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example:
`sum by (foo) (rate(bar{baz=”blip”}[1m]))` ->
```
sum by (foo) (
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or
...
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m]))
)
```
When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume.
It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes:

- `querier.max-outstanding-requests-per-tenant`
- `querier.max-query-parallelism`
- `querier.max-concurrent`
owen-d marked this conversation as resolved.
Show resolved Hide resolved
Furthermore, both querier and query-frontend components require the `querier.query-ingesters-within` parameter to know when to start sharding requests (ingester queries are not sharded). It's recommended to align this with `ingester.max-chunk-age`.
owen-d marked this conversation as resolved.
Show resolved Hide resolved

Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well.
owen-d marked this conversation as resolved.
Show resolved Hide resolved

- `-querier.align-querier-with-step`

If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results.
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ results_cache:
# error is returned.
# CLI flag: -querier.max-retries-per-request
[max_retries: <int> | default = 5]

# Parse the ast and parallelize sums by shard.
# CLI flag: -querier.sum-shards
[sum_shards: <boolean> | default = false]
```

## `ruler_config`
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
google.golang.org/api v0.14.0
google.golang.org/grpc v1.25.1
gopkg.in/yaml.v2 v2.2.5
sigs.k8s.io/yaml v1.1.0
)

replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible
Expand Down
3 changes: 1 addition & 2 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
errs "github.com/weaveworks/common/errors"

prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"

errs "github.com/weaveworks/common/errors"
)

// Errors that decode can return
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
}

func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()

var lock sync.Mutex
var entries []IndexEntry
err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
tbmConfig TableManagerConfig
schemaCfg = DefaultSchemaConfig("", schemaName, 0)
)
err := schemaCfg.Validate()
require.NoError(t, err)
flagext.DefaultValues(&tbmConfig)
storage := NewMockStorage()
tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil)
Expand Down
17 changes: 14 additions & 3 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/prometheus/promql"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -146,13 +147,13 @@ func (c *Fetcher) worker() {
// FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be
// lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks")
log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks")
defer log.Span.Finish()

// Now fetch the actual chunk data from Memcache / S3
cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys)

fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs)
fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs)
if err != nil {
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
}
Expand Down Expand Up @@ -199,12 +200,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {

// ProcessCacheResponse decodes the chunks coming back from the cache, separating
// hits and misses.
func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
var (
requests = make([]decodeRequest, 0, len(keys))
responses = make(chan decodeResponse)
missing []Chunk
)
log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse")
defer log.Span.Finish()

i, j := 0, 0
for i < len(chunks) && j < len(keys) {
Expand All @@ -229,6 +232,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
for ; i < len(chunks); i++ {
missing = append(missing, chunks[i])
}
level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))

go func() {
for _, request := range requests {
Expand All @@ -252,3 +256,10 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
}
return found, missing, err
}

func injectShardLabels(chunks []Chunk, shard astmapper.ShardAnnotation) {
jtlisi marked this conversation as resolved.
Show resolved Hide resolved
owen-d marked this conversation as resolved.
Show resolved Hide resolved
for i, chunk := range chunks {
chunk.Metric = append(chunk.Metric, shard.Label())
owen-d marked this conversation as resolved.
Show resolved Hide resolved
chunks[i] = chunk
}
}
62 changes: 60 additions & 2 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ import (
"fmt"
"strings"

owen-d marked this conversation as resolved.
Show resolved Hide resolved
"strconv"

"github.com/go-kit/kit/log/level"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
)

const (
Expand Down Expand Up @@ -48,6 +54,7 @@ type Schema interface {
GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error)
GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery

// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
Expand Down Expand Up @@ -218,6 +225,10 @@ func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string,
return result, nil
}

func (s schema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return s.entries.FilterReadQueries(queries, shard)
}

type entries interface {
GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
Expand All @@ -228,6 +239,7 @@ type entries interface {
GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery
}

// original entries:
Expand Down Expand Up @@ -303,6 +315,10 @@ func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery,
return nil, ErrNotSupported
}

func (originalEntries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v3Schema went to base64 encoded label values & a version ID
// - range key: <label name>\0<base64(label value)>\0<chunk name>\0<version 1>

Expand Down Expand Up @@ -422,6 +438,10 @@ func (labelNameInHashKeyEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]I
return nil, ErrNotSupported
}

func (labelNameInHashKeyEntries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v5 schema is an extension of v4, with the chunk end time in the
// range key to improve query latency. However, it did it wrong
// so the chunk end times are ignored.
Expand Down Expand Up @@ -496,6 +516,10 @@ func (v5Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error
return nil, ErrNotSupported
}

func (v5Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
// moves label value out of range key (see #199).
type v6Entries struct{}
Expand Down Expand Up @@ -576,10 +600,13 @@ func (v6Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error
return nil, ErrNotSupported
}

// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct {
func (v6Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct{}

func (v9Entries) GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return nil, ErrNotSupported
}
Expand Down Expand Up @@ -675,6 +702,10 @@ func (v9Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error
return nil, ErrNotSupported
}

func (v9Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return queries
}

// v10Entries builds on v9 by sharding index rows to reduce their size.
type v10Entries struct {
rowShards uint32
Expand Down Expand Up @@ -784,6 +815,33 @@ func (v10Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, erro
return nil, ErrNotSupported
}

// FilterReadQueries will return only queries that match a certain shard
func (v10Entries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) (matches []IndexQuery) {
if shard == nil {
return queries
}

for _, query := range queries {
s := strings.Split(query.HashValue, ":")[0]
n, err := strconv.Atoi(s)
owen-d marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
level.Error(util.Logger).Log(
"msg",
"Unable to determine shard from IndexQuery",
"HashValue",
query.HashValue,
"schema",
"v10",
)
}

if err == nil && n == shard.Shard {
matches = append(matches, query)
}
}
return matches
}

// v11Entries builds on v10 but adds index entries for each series to store respective labels.
type v11Entries struct {
v10Entries
Expand Down
20 changes: 18 additions & 2 deletions pkg/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,18 @@ func (cfg *SchemaConfig) Validate() error {
return err
}
}

return nil
}

func defaultRowShards(schema string) uint32 {
switch schema {
case "v1", "v2", "v3", "v4", "v5", "v6", "v9":
return 0
default:
return 16
}
}

// ForEachAfter will call f() on every entry after t, splitting
// entries if necessary so there is an entry starting at t
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) {
Expand All @@ -219,7 +227,7 @@ func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)

// CreateSchema returns the schema defined by the PeriodConfig
func (cfg PeriodConfig) CreateSchema() Schema {
rowShards := uint32(16)
rowShards := defaultRowShards(cfg.Schema)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}
Expand Down Expand Up @@ -309,6 +317,14 @@ func (cfg *SchemaConfig) Load() error {
return err
}

for i, periodCfg := range cfg.Configs {
// apply default row shards
if periodCfg.RowShards == 0 {
periodCfg.RowShards = defaultRowShards(periodCfg.Schema)
cfg.Configs[i] = periodCfg
}
}
owen-d marked this conversation as resolved.
Show resolved Hide resolved

return cfg.Validate()
}

Expand Down
Loading