Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributing sum queries #1878

Merged
merged 26 commits into from
Feb 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
de8f330
querier.sum-shards
owen-d Jan 16, 2020
5fb7c29
addresses pr comments
owen-d Jan 24, 2020
afdd99d
instruments frontend sharding, splitby
owen-d Jan 28, 2020
6bf5f20
LabelsSeriesID unexported again
owen-d Jan 28, 2020
3e28014
removes unnecessary codec interface in astmapping
owen-d Jan 28, 2020
5aa58bc
simplifies VectorSquasher as we never use matrices
owen-d Jan 28, 2020
26e488b
combines queryrange series & value files
owen-d Jan 28, 2020
ba07d5b
removes noops struct embedding strategy in schema, provides noop impl…
owen-d Jan 28, 2020
025f87d
NewSubtreeFolder no longer can return an error as it inlines the json…
owen-d Jan 28, 2020
09ac713
account for QueryIngestersWithin renaming
owen-d Jan 28, 2020
dc629c1
fixes rebase import collision
owen-d Jan 31, 2020
b9a2b67
fixes rebase conflicts
owen-d Feb 3, 2020
5ac6309
-marks absent as non parallelizable
owen-d Feb 3, 2020
ddd47b4
upstream promql compatibility changes
owen-d Feb 5, 2020
07e894c
addresses pr comments
owen-d Feb 10, 2020
4c6f40b
import collisions
owen-d Feb 10, 2020
42851ff
linting - fixes goimports -local requirement
owen-d Feb 10, 2020
a4dcfff
Merge remote-tracking branch 'upstream/master' into feature/query-sha…
owen-d Feb 19, 2020
7347d45
fixes merge conflicts
owen-d Feb 19, 2020
10dd597
addresses pr comments
owen-d Feb 20, 2020
a309f62
stylistic changes
owen-d Feb 20, 2020
bc3f7f5
s/downstream/sharded/
owen-d Feb 20, 2020
1a159f0
s/sum_shards/parallelise_shardable_queries/
owen-d Feb 20, 2020
65718db
query-audit docs
owen-d Feb 20, 2020
7e56489
notes sharded parallelizations are only supported by chunk store
owen-d Feb 20, 2020
e6cb0b1
doc suggestions
owen-d Feb 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## master / unreleased

* [FEATURE] Fan out parallelizable queries to backend queriers concurrently. #1878
* `querier.parallelise-shardable-queries` (bool)
* Requires a shard-compatible schema (v10+)
* This causes the number of traces to increase accordingly.
* The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. The schema is only required to use this option.
* It's also advised to increase downstream concurrency controls as well:
* `querier.max-outstanding-requests-per-tenant`
* `querier.max-query-parallelism`
* `querier.max-concurrent`
* `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
* [CHANGE] The frontend http server will now send 502 in case of deadline exceeded and 499 if the user requested cancellation. #2156
* [CHANGE] Config file changed to remove top level `config_store` field in favor of a nested `configdb` field. #2125
* [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040
Expand Down
24 changes: 24 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ The ingester query API was improved over time, but defaults to the old behaviour

## Query Frontend

- `-querier.parallelise-shardable-queries`

If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example:
`sum by (foo) (rate(bar{baz=”blip”}[1m]))` ->
```
sum by (foo) (
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or
...
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m]))
)
```
When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume.
It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes:

- `querier.max-outstanding-requests-per-tenant`
- `querier.max-query-parallelism`
- `querier.max-concurrent`
owen-d marked this conversation as resolved.
Show resolved Hide resolved
- `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)

Furthermore, both querier and query-frontend components require the `querier.query-ingesters-within` parameter to know when to start sharding requests (ingester queries are not sharded). It's recommended to align this with `ingester.max-chunk-age`.
owen-d marked this conversation as resolved.
Show resolved Hide resolved

Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well (for instance via `JAEGER_REPORTER_MAX_QUEUE_SIZE`).

- `-querier.align-querier-with-step`

If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results.
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ results_cache:
# error is returned.
# CLI flag: -querier.max-retries-per-request
[max_retries: <int> | default = 5]

# Perform query parallelisations based on storage sharding configuration and
# query ASTs. This feature is supported only by the chunks storage engine.
# CLI flag: -querier.parallelise-shardable-queries
[parallelise_shardable_queries: <boolean> | default = false]
```

## `ruler_config`
Expand Down
140 changes: 140 additions & 0 deletions docs/operations/query-auditor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
title: "Query Auditor (tool)"
linkTitle: "Query Auditor (tool)"
weight: 2
slug: query-auditor
---

The query auditor is a tool bundled in the Cortex repository, but **not** included in Docker images -- this must be built from source. It's primarily useful for those _developing_ Cortex, but can be helpful to operators as well during certain scenarios (backend migrations come to mind).

## How it works

The `query-audit` tool performs a set of queries against two backends that expose the Prometheus read API. This is generally the `query-frontend` component of two Cortex deployments. It will then compare the differences in the responses to determine the average difference for each query. It does this by:

- Ensuring the resulting label sets match.
- For each label set, ensuring they contain the same number of samples as their pair from the other backend.
- For each sample, calculates their difference against it's pair from the other backend/label set.
- Calculates the average diff per query from the above diffs.
owen-d marked this conversation as resolved.
Show resolved Hide resolved

### Limitations

It currently only supports queries with `Matrix` response types.

### Use cases

- Correctness testing when working on the read path.
- Comparing results from different backends.

### Example Configuration

```yaml
control:
host: http://localhost:8080/api/prom
headers:
"X-Scope-OrgID": 1234

test:
host: http://localhost:8081/api/prom
headers:
"X-Scope-OrgID": 1234

queries:
- query: 'sum(rate(container_cpu_usage_seconds_total[5m]))'
start: 2019-11-25T00:00:00Z
end: 2019-11-28T00:00:00Z
step_size: 15m
- query: 'sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name)'
start: 2019-11-25T00:00:00Z
end: 2019-11-28T00:00:00Z
step_size: 15m
- query: 'sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name)'
start: 2019-11-25T00:00:00Z
end: 2019-11-26T00:00:00Z
step_size: 15m
- query: 'histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job))'
start: 2019-11-25T00:00:00Z
end: 2019-11-25T06:00:00Z
step_size: 15m
# two shardable legs
- query: 'sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m]))'
start: 2019-11-25T00:00:00Z
end: 2019-11-25T06:00:00Z
step_size: 15m
# one shardable leg
- query: 'sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m])'
start: 2019-11-25T00:00:00Z
end: 2019-11-25T06:00:00Z
step_size: 15m
```

### Example Output

Under ideal circumstances, you'll see output like the following:

```
$ go run ./tools/query-audit/ -f config.yaml

0.000000% avg diff for:
query: sum(rate(container_cpu_usage_seconds_total[5m]))
series: 1
samples: 289
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-28 00:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name)
series: 95
samples: 25877
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-28 00:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name)
series: 4308
samples: 374989
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-26 00:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job))
series: 13
samples: 325
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-25 06:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m]))
series: 21
samples: 525
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-25 06:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m])
series: 942
samples: 23550
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-25 06:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: sum by (namespace) (predict_linear(container_cpu_usage_seconds_total[5m], 10))
series: 16
samples: 400
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-25 06:00:00 +0000 UTC
step: 15m0s

0.000000% avg diff for:
query: sum by (namespace) (avg_over_time((rate(container_cpu_usage_seconds_total[5m]))[10m:]) > 1)
series: 4
samples: 52
start: 2019-11-25 00:00:00 +0000 UTC
end: 2019-11-25 01:00:00 +0000 UTC
step: 5m0s
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ require (
google.golang.org/api v0.14.0
google.golang.org/grpc v1.25.1
gopkg.in/yaml.v2 v2.2.5
sigs.k8s.io/yaml v1.1.0
)

replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
}

func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()

var lock sync.Mutex
var entries []IndexEntry
err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
tbmConfig TableManagerConfig
schemaCfg = DefaultSchemaConfig("", schemaName, 0)
)
err := schemaCfg.Validate()
require.NoError(t, err)
flagext.DefaultValues(&tbmConfig)
storage := NewMockStorage()
tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil)
Expand Down
9 changes: 6 additions & 3 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ func (c *Fetcher) worker() {
// FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be
// lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks")
log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks")
defer log.Span.Finish()

// Now fetch the actual chunk data from Memcache / S3
cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys)

fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs)
fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs)
if err != nil {
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
}
Expand Down Expand Up @@ -199,12 +199,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {

// ProcessCacheResponse decodes the chunks coming back from the cache, separating
// hits and misses.
func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
var (
requests = make([]decodeRequest, 0, len(keys))
responses = make(chan decodeResponse)
missing []Chunk
)
log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse")
defer log.Span.Finish()

i, j := 0, 0
for i < len(chunks) && j < len(keys) {
Expand All @@ -229,6 +231,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
for ; i < len(chunks); i++ {
missing = append(missing, chunks[i])
}
level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))

go func() {
for _, request := range requests {
Expand Down
Loading