diff --git a/Gopkg.lock b/Gopkg.lock index c66d4166ac55..81a83e310037 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -171,8 +171,8 @@ revision = "3a0bb77429bd3a61596f5e8a3172445844342120" [[projects]] - branch = "optionally-enforce-metric-name" - digest = "1:2cced64caeee7972668c48e0a80e98f09489bd3a7a61c470fd0ca36cb5e24344" + branch = "master" + digest = "1:5750dfa5a8160b51a01c02d6d7841b50fe7d2d97344c200fc4cdc3c46b3953f8" name = "github.com/cortexproject/cortex" packages = [ "pkg/chunk", @@ -199,8 +199,7 @@ "pkg/util/wire", ] pruneopts = "UT" - revision = "16a08e037bce5343c5692aa1015e76b172ddf917" - source = "https://github.com/grafana/cortex.git" + revision = "ff51bd3c7267184042ea4cf347e6d1fa24934c91" [[projects]] digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" @@ -836,16 +835,16 @@ version = "v2.15.0" [[projects]] - digest = "1:0f09db8429e19d57c8346ad76fbbc679341fa86073d3b8fb5ac919f0357d8f4c" + digest = "1:0da2810678a062e0567c3215911869b0423da0e497c56683ff8e87e7a6952597" name = "github.com/uber/jaeger-lib" packages = ["metrics"] pruneopts = "UT" - revision = "ed3a127ec5fef7ae9ea95b01b542c47fbd999ce5" - version = "v1.5.0" + revision = "5519f3beabf28707fca8d3f47f9cff87dad48d96" + version = "v1.3.0" [[projects]] - branch = "dev" - digest = "1:f13b5f09ac1d4ea4cecd6075fa7d0c46c419775eb585f42812f2b19f3de33d92" + branch = "master" + digest = "1:bab227df7effab81fbfdbf0c4d14ce78345a8d169c24c62c74c6053c3b22a252" name = "github.com/weaveworks/common" packages = [ "aws", @@ -861,8 +860,7 @@ "user", ] pruneopts = "UT" - revision = "6ebd07f752e2a8d51d3ac919014f972350d25b39" - source = "github.com/tomwilkie/weaveworks-common" + revision = "81a1a4d158e60de72dbead600ec011fb90344f8c" [[projects]] digest = "1:bb40f7ff970145324f2a2acafdff3a23ed3f05db49cb5eb519b3d6bee86a5887" diff --git a/Gopkg.toml b/Gopkg.toml index 0a5128f2a2f7..113f6920a851 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -26,13 +26,11 @@ [[constraint]] name = "github.com/cortexproject/cortex" - branch = "optionally-enforce-metric-name" - source = "https://github.com/grafana/cortex.git" + branch = "master" [[constraint]] name = "github.com/weaveworks/common" - source = "github.com/tomwilkie/weaveworks-common" - branch = "dev" + branch = "master" [[constraint]] name = "gopkg.in/fsnotify.v1" diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index 77f32106bfcf..a1dff2849329 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -454,7 +454,7 @@ func (a dynamoDBRequestAdapter) HasNextPage() bool { } func (a dynamoDBRequestAdapter) Retryable() bool { - return *a.request.Retryable + return aws.BoolValue(a.request.Retryable) } type chunksPlusError struct { @@ -523,7 +523,10 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c for _, chunk := range chunks { key := chunk.ExternalKey() chunksByKey[key] = chunk - tableName := a.schemaCfg.ChunkTableFor(chunk.From) + tableName, err := a.schemaCfg.ChunkTableFor(chunk.From) + if err != nil { + return nil, err + } outstanding.Add(tableName, key, placeholder) } @@ -646,7 +649,11 @@ func (a dynamoDBStorageClient) PutChunks(ctx context.Context, chunks []chunk.Chu } key := chunks[i].ExternalKey() - table := a.schemaCfg.ChunkTableFor(chunks[i].From) + table, err := a.schemaCfg.ChunkTableFor(chunks[i].From) + if err != nil { + return err + } + dynamoDBWrites.Add(table, key, placeholder, buf) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/fixtures.go index e35027adc917..d05cd738efae 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/fixtures.go @@ -7,7 +7,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/testutils" "github.com/cortexproject/cortex/pkg/util" - "github.com/prometheus/common/model" ) type fixture struct { @@ -32,7 +31,7 @@ var Fixtures = []testutils.Fixture{ fixture{ name: "S3 chunks", clients: func() (chunk.IndexClient, chunk.ObjectClient, chunk.TableClient, chunk.SchemaConfig, error) { - schemaConfig := chunk.SchemaConfig{} // Defaults == S3 + schemaConfig := testutils.DefaultSchemaConfig("s3") dynamoDB := newMockDynamoDB(0, 0) table := &dynamoTableClient{ DynamoDB: dynamoDB, @@ -61,16 +60,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix provisionedErr, gangsize, maxParallelism), clients: func() (chunk.IndexClient, chunk.ObjectClient, chunk.TableClient, chunk.SchemaConfig, error) { dynamoDB := newMockDynamoDB(0, provisionedErr) - schemaCfg := chunk.SchemaConfig{ - Configs: []chunk.PeriodConfig{{ - IndexType: "aws", - From: model.Now(), - ChunkTables: chunk.PeriodicTableConfig{ - Prefix: "chunks", - Period: 10 * time.Minute, - }, - }}, - } + schemaCfg := testutils.DefaultSchemaConfig("aws") table := &dynamoTableClient{ DynamoDB: dynamoDB, } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/fixtures.go index 20ab38ca9b40..ef13263e6c9c 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/fixtures.go @@ -6,7 +6,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/testutils" - "github.com/prometheus/common/model" ) // GOCQL doesn't provide nice mocks, so we use a real Cassandra instance. @@ -49,7 +48,7 @@ func Fixtures() ([]testutils.Fixture, error) { } // Get a SchemaConfig with the defaults. - schemaConfig := chunk.DefaultSchemaConfig("cassandra", "v1", model.Now()) + schemaConfig := testutils.DefaultSchemaConfig("cassandra") storageClient, err := NewStorageClient(cfg, schemaConfig) if err != nil { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go index 28df0c0b2bd8..a3b4f6069108 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go @@ -270,7 +270,10 @@ func (s *StorageClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) err return errors.WithStack(err) } key := chunks[i].ExternalKey() - tableName := s.schemaCfg.ChunkTableFor(chunks[i].From) + tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From) + if err != nil { + return err + } // Must provide a range key, even though its not useds - hence 0x00. q := s.session.Query(fmt.Sprintf("INSERT INTO %s (hash, range, value) VALUES (?, 0x00, ?)", @@ -289,12 +292,16 @@ func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]c } func (s *StorageClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) { - tableName := s.schemaCfg.ChunkTableFor(input.From) + tableName, err := s.schemaCfg.ChunkTableFor(input.From) + if err != nil { + return input, err + } + var buf []byte if err := s.session.Query(fmt.Sprintf("SELECT value FROM %s WHERE hash = ?", tableName), input.ExternalKey()). WithContext(ctx).Scan(&buf); err != nil { return input, errors.WithStack(err) } - err := input.Decode(decodeContext, buf) + err = input.Decode(decodeContext, buf) return input, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index 2790e55cd777..ff913c934e62 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -209,12 +209,6 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, length > limit (%s > %s)", (*through).Sub(from), maxQueryLength) } - // Fetch metric name chunks if the matcher is of type equal, - metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers) - if !ok || metricNameMatcher.Type != labels.MatchEqual { - return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "query must contain metric name") - } - now := model.Now() if from.After(now) { @@ -234,6 +228,12 @@ func (c *store) validateQuery(ctx context.Context, from model.Time, through *mod *through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } + // Check there is a metric name matcher of type equal, + metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers) + if !ok || metricNameMatcher.Type != labels.MatchEqual { + return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "query must contain metric name") + } + return metricNameMatcher.Value, matchers, false, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go index 33adf364d914..145564e564cc 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go @@ -14,12 +14,16 @@ const samplesPerChunk = 120 var errOutOfBounds = errors.New("out of bounds") +type smallChunk struct { + *chunkenc.XORChunk + start int64 + end int64 +} + // bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no // upperbound on number of samples it can contain. type bigchunk struct { - chunks []chunkenc.Chunk - starts []int64 - ends []int64 + chunks []smallChunk appender chunkenc.Appender remainingSamples int @@ -31,6 +35,9 @@ func newBigchunk() *bigchunk { func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { if b.remainingSamples == 0 { + if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes { + return addToOverflowChunk(b, sample) + } if err := b.addNextChunk(sample.Timestamp); err != nil { return nil, err } @@ -38,7 +45,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) b.remainingSamples-- - b.ends[len(b.ends)-1] = int64(sample.Timestamp) + b.chunks[len(b.chunks)-1].end = int64(sample.Timestamp) return []Chunk{b}, nil } @@ -47,14 +54,14 @@ func (b *bigchunk) addNextChunk(start model.Time) error { // To save memory, we "compact" the previous chunk - the array backing the slice // will be upto 2x too big, and we can save this space. if l := len(b.chunks); l > 0 { - c := b.chunks[l-1] + c := b.chunks[l-1].XORChunk buf := make([]byte, len(c.Bytes())) copy(buf, c.Bytes()) compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf) if err != nil { return err } - b.chunks[l-1] = compacted + b.chunks[l-1].XORChunk = compacted.(*chunkenc.XORChunk) } chunk := chunkenc.NewXORChunk() @@ -63,9 +70,11 @@ func (b *bigchunk) addNextChunk(start model.Time) error { return err } - b.starts = append(b.starts, int64(start)) - b.ends = append(b.ends, int64(start)) - b.chunks = append(b.chunks, chunk) + b.chunks = append(b.chunks, smallChunk{ + XORChunk: chunk, + start: int64(start), + end: int64(start), + }) b.appender = appender b.remainingSamples = samplesPerChunk @@ -101,7 +110,7 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { return err } - b.chunks = make([]chunkenc.Chunk, 0, numChunks) + b.chunks = make([]smallChunk, 0, numChunks) for i := uint16(0); i < numChunks; i++ { chunkLen, err := r.ReadUint16() if err != nil { @@ -123,9 +132,11 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { return err } - b.chunks = append(b.chunks, chunk) - b.starts = append(b.starts, start) - b.ends = append(b.ends, end) + b.chunks = append(b.chunks, smallChunk{ + XORChunk: chunk.(*chunkenc.XORChunk), + start: int64(start), + end: int64(end), + }) } return nil } @@ -164,18 +175,16 @@ func (b *bigchunk) NewIterator() Iterator { func (b *bigchunk) Slice(start, end model.Time) Chunk { i, j := 0, len(b.chunks) for k := 0; k < len(b.chunks); k++ { - if b.ends[k] < int64(start) { + if b.chunks[k].end < int64(start) { i = k + 1 } - if b.starts[k] > int64(end) { + if b.chunks[k].start > int64(end) { j = k break } } return &bigchunk{ chunks: b.chunks[i:j], - starts: b.starts[i:j], - ends: b.ends[i:j], } } @@ -227,9 +236,9 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool { // If the seek is outside the current chunk, use the index to find the right // chunk. - if int64(target) < it.starts[it.i] || int64(target) > it.ends[it.i] { + if int64(target) < it.chunks[it.i].start || int64(target) > it.chunks[it.i].end { it.curr = nil - for it.i = 0; it.i < len(it.chunks) && int64(target) > it.ends[it.i]; it.i++ { + for it.i = 0; it.i < len(it.chunks) && int64(target) > it.chunks[it.i].end; it.i++ { } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go index 2b7a9c76d2c8..76ad6914c650 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go @@ -224,7 +224,7 @@ func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { func (c *deltaEncodedChunk) setLen() error { l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) if int(l) > cap(*c) { - return fmt.Errorf("delta chunk length exceeded during unmarshaling: %d", l) + return fmt.Errorf("delta chunk length exceeded during unmarshalling: %d", l) } if int(l) < deltaHeaderBytes { return fmt.Errorf("delta chunk length less than header size: %d < %d", l, deltaHeaderBytes) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go index 8e7852eef3d3..ac5470abca35 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go @@ -252,7 +252,7 @@ func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { func (c *doubleDeltaEncodedChunk) setLen() error { l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) if int(l) > cap(*c) { - return fmt.Errorf("doubledelta chunk length exceeded during unmarshaling: %d", l) + return fmt.Errorf("doubledelta chunk length exceeded during unmarshalling: %d", l) } if int(l) < doubleDeltaHeaderMinBytes { return fmt.Errorf("doubledelta chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go index f9062d3adbb2..26b1308f627f 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go @@ -1,6 +1,7 @@ package encoding import ( + "flag" "fmt" "strconv" ) @@ -8,8 +9,22 @@ import ( // Encoding defines which encoding we are using, delta, doubledelta, or varbit type Encoding byte -// DefaultEncoding can be changed via a flag. -var DefaultEncoding = DoubleDelta +// Config configures the behaviour of chunk encoding +type Config struct{} + +var ( + // DefaultEncoding exported for use in unit tests elsewhere + DefaultEncoding = DoubleDelta + alwaysMarshalFullsizeChunks = true + bigchunkSizeCapBytes = 0 +) + +// RegisterFlags registers configuration settings. +func (Config) RegisterFlags(f *flag.FlagSet) { + f.Var(&DefaultEncoding, "ingester.chunk-encoding", "Encoding version to use for chunks.") + flag.BoolVar(&alwaysMarshalFullsizeChunks, "store.fullsize-chunks", alwaysMarshalFullsizeChunks, "When saving varbit chunks, pad to 1024 bytes") + flag.IntVar(&bigchunkSizeCapBytes, "store.bigchunk-size-cap-bytes", bigchunkSizeCapBytes, "When using bigchunk encoding, start a new bigchunk if over this size (0 = unlimited)") +} // String implements flag.Value. func (e Encoding) String() string { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go index 2b602065df6f..2005053aa78a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go @@ -18,7 +18,6 @@ package encoding import ( "encoding/binary" - "flag" "fmt" "io" "math" @@ -301,7 +300,7 @@ func (c varbitChunk) Marshal(w io.Writer) error { // UnmarshalFromBuf implements chunk. func (c varbitChunk) UnmarshalFromBuf(buf []byte) error { if copied := copy(c, buf); copied != cap(c) && copied != c.marshalLen() { - return fmt.Errorf("incorrect byte count copied from buffer during unmarshaling, want %d or %d, got %d", c.marshalLen(), ChunkLen, copied) + return fmt.Errorf("incorrect byte count copied from buffer during unmarshalling, want %d or %d, got %d", c.marshalLen(), ChunkLen, copied) } return nil } @@ -315,16 +314,6 @@ func (c varbitChunk) Utilization() float64 { return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1) } -// MarshalConfig configures the behaviour of marshalling -type MarshalConfig struct{} - -var alwaysMarshalFullsizeChunks = true - -// RegisterFlags registers configuration settings. -func (MarshalConfig) RegisterFlags(f *flag.FlagSet) { - flag.BoolVar(&alwaysMarshalFullsizeChunks, "store.fullsize-chunks", alwaysMarshalFullsizeChunks, "When saving varbit chunks, pad to 1024 bytes") -} - // marshalLen returns the number of bytes that should be marshalled for this chunk func (c varbitChunk) marshalLen() int { bits := c.nextSampleOffset() diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit_helpers.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit_helpers.go index 9ca639e9421d..9fe9c09feaf1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit_helpers.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit_helpers.go @@ -38,7 +38,7 @@ func isInt32(v model.SampleValue) bool { return model.SampleValue(int32(v)) == v } -// countBits returs the number of leading zero bits and the number of +// countBits returns the number of leading zero bits and the number of // significant bits after that in the given bit pattern. The maximum number of // leading zeros is 31 (so that it can be represented by a 5bit number). Leading // zeros beyond that are considered part of the significant bits. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go index 15975b7594e2..05417d465055 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go @@ -3,6 +3,8 @@ package gcp import ( "bytes" "context" + "encoding/binary" + "encoding/hex" "flag" "fmt" "strings" @@ -10,7 +12,6 @@ import ( "cloud.google.com/go/bigtable" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "google.golang.org/api/option" "github.com/cortexproject/cortex/pkg/chunk" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" @@ -35,7 +36,8 @@ type Config struct { GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` - ColumnKey bool + ColumnKey bool + DistributeKeys bool } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -55,6 +57,8 @@ type storageClientColumnKey struct { schemaCfg chunk.SchemaConfig client *bigtable.Client keysFn keysFn + + distributeKeys bool } // storageClientV1 implements chunk.storageClient for GCP. @@ -64,9 +68,7 @@ type storageClientV1 struct { // NewStorageClientV1 returns a new v1 StorageClient. func NewStorageClientV1(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - opts := instrumentation() - opts = append(opts, option.WithGRPCDialOption(cfg.GRPCClientConfig.DialOption())) - + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err @@ -90,7 +92,8 @@ func newStorageClientV1(cfg Config, schemaCfg chunk.SchemaConfig, client *bigtab // NewStorageClientColumnKey returns a new v2 StorageClient. func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.IndexClient, error) { - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } @@ -98,18 +101,35 @@ func NewStorageClientColumnKey(ctx context.Context, cfg Config, schemaCfg chunk. } func newStorageClientColumnKey(cfg Config, schemaCfg chunk.SchemaConfig, client *bigtable.Client) *storageClientColumnKey { + return &storageClientColumnKey{ cfg: cfg, schemaCfg: schemaCfg, client: client, keysFn: func(hashValue string, rangeValue []byte) (string, string) { - // We could hash the row key for better distribution but we decided against it - // because that would make migrations very, very hard. + + // We hash the row key and prepend it back to the key for better distribution. + // We preserve the existing key to make migrations and o11y easier. + if cfg.DistributeKeys { + hashValue = hashPrefix(hashValue) + "-" + hashValue + } + return hashValue, string(rangeValue) }, } } +// hashPrefix calculates a 64bit hash of the input string and hex-encodes +// the result, taking care to zero pad etc. +func hashPrefix(input string) string { + prefix := hashAdd(hashNew(), input) + var encodedUint64 [8]byte + binary.LittleEndian.PutUint64(encodedUint64[:], prefix) + var hexEncoded [16]byte + hex.Encode(hexEncoded[:], encodedUint64[:]) + return string(hexEncoded[:]) +} + func (s *storageClientColumnKey) Stop() { s.client.Close() } @@ -196,8 +216,9 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk queries: map[string]chunk.IndexQuery{}, } } - tq.queries[query.HashValue] = query - tq.rows = append(tq.rows, query.HashValue) + hashKey, _ := s.keysFn(query.HashValue, nil) + tq.queries[hashKey] = query + tq.rows = append(tq.rows, hashKey) tableQueries[query.TableName] = tq } @@ -249,50 +270,6 @@ func (s *storageClientColumnKey) QueryPages(ctx context.Context, queries []chunk return lastErr } -func (s *storageClientColumnKey) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) - defer sp.Finish() - - table := s.client.Open(query.TableName) - - rOpts := []bigtable.ReadOption{ - bigtable.RowFilter(bigtable.FamilyFilter(columnFamily)), - } - - if len(query.RangeValuePrefix) > 0 { - rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValuePrefix), string(query.RangeValuePrefix)+null))) - } else if len(query.RangeValueStart) > 0 { - rOpts = append(rOpts, bigtable.RowFilter(bigtable.ColumnRangeFilter(columnFamily, string(query.RangeValueStart), null))) - } - - r, err := table.ReadRow(ctx, query.HashValue, rOpts...) - if err != nil { - sp.LogFields(otlog.String("error", err.Error())) - return errors.WithStack(err) - } - - val, ok := r[columnFamily] - if !ok { - // There are no matching rows. - return nil - } - - if query.ValueEqual != nil { - filteredItems := make([]bigtable.ReadItem, 0, len(val)) - for _, item := range val { - if bytes.Equal(query.ValueEqual, item.Value) { - filteredItems = append(filteredItems, item) - } - } - - val = filteredItems - } - callback(&columnKeyBatch{ - items: val, - }) - return nil -} - // columnKeyBatch represents a batch of values read from Bigtable. type columnKeyBatch struct { items []bigtable.ReadItem @@ -346,7 +323,6 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal readOpts = append(readOpts, bigtable.RowFilter(bigtable.ValueFilter(string(query.ValueEqual)))) } */ - if len(query.RangeValuePrefix) > 0 { rowRange = bigtable.PrefixRange(query.HashValue + separator + string(query.RangeValuePrefix)) } else if len(query.RangeValueStart) > 0 { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go index 1dab5853fe70..a43980bde1ac 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_object_client.go @@ -22,7 +22,8 @@ type bigtableObjectClient struct { // NewBigtableObjectClient makes a new chunk.ObjectClient that stores chunks in // Bigtable. func NewBigtableObjectClient(ctx context.Context, cfg Config, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { - client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) + client, err := bigtable.NewClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } @@ -51,7 +52,10 @@ func (s *bigtableObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chu return err } key := chunks[i].ExternalKey() - tableName := s.schemaCfg.ChunkTableFor(chunks[i].From) + tableName, err := s.schemaCfg.ChunkTableFor(chunks[i].From) + if err != nil { + return err + } keys[tableName] = append(keys[tableName], key) mut := bigtable.NewMutation() @@ -82,7 +86,10 @@ func (s *bigtableObjectClient) GetChunks(ctx context.Context, input []chunk.Chun chunks := map[string]map[string]chunk.Chunk{} keys := map[string]bigtable.RowList{} for _, c := range input { - tableName := s.schemaCfg.ChunkTableFor(c.From) + tableName, err := s.schemaCfg.ChunkTableFor(c.From) + if err != nil { + return nil, err + } key := c.ExternalKey() keys[tableName] = append(keys[tableName], key) if _, ok := chunks[tableName]; !ok { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fixtures.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fixtures.go index 0ebc1b0bcb78..d0eafac81d8a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fixtures.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fixtures.go @@ -2,12 +2,11 @@ package gcp import ( "context" - "time" + "fmt" "cloud.google.com/go/bigtable" "cloud.google.com/go/bigtable/bttest" "github.com/fsouza/fake-gcs-server/fakestorage" - "github.com/prometheus/common/model" "google.golang.org/api/option" "google.golang.org/grpc" @@ -27,6 +26,7 @@ type fixture struct { gcsObjectClient bool columnKeyClient bool + hashPrefix bool } func (f *fixture) Name() string { @@ -56,16 +56,7 @@ func (f *fixture) Clients() ( return } - schemaConfig = chunk.SchemaConfig{ - Configs: []chunk.PeriodConfig{{ - IndexType: "gcp", - From: model.Now(), - ChunkTables: chunk.PeriodicTableConfig{ - Prefix: "chunks", - Period: 10 * time.Minute, - }, - }}, - } + schemaConfig = testutils.DefaultSchemaConfig("gcp-columnkey") tClient = &tableClient{ client: adminClient, } @@ -75,10 +66,13 @@ func (f *fixture) Clients() ( return } + cfg := Config{ + DistributeKeys: f.hashPrefix, + } if f.columnKeyClient { - iClient = newStorageClientColumnKey(Config{}, schemaConfig, client) + iClient = newStorageClientColumnKey(cfg, schemaConfig, client) } else { - iClient = newStorageClientV1(Config{}, schemaConfig, client) + iClient = newStorageClientV1(cfg, schemaConfig, client) } if f.gcsObjectClient { @@ -99,21 +93,19 @@ func (f *fixture) Teardown() error { } // Fixtures for unit testing GCP storage. -var Fixtures = []testutils.Fixture{ - &fixture{ - name: "bigtable", - }, - &fixture{ - name: "bigtable-columnkey", - columnKeyClient: true, - }, - &fixture{ - name: "bigtable-gcs", - gcsObjectClient: true, - }, - &fixture{ - name: "bigtable-columnkey-gcs", - gcsObjectClient: true, - columnKeyClient: true, - }, -} +var Fixtures = func() []testutils.Fixture { + fixtures := []testutils.Fixture{} + for _, gcsObjectClient := range []bool{true, false} { + for _, columnKeyClient := range []bool{true, false} { + for _, hashPrefix := range []bool{true, false} { + fixtures = append(fixtures, &fixture{ + name: fmt.Sprintf("bigtable-columnkey:%v-gcsObjectClient:%v-hashPrefix:%v", columnKeyClient, gcsObjectClient, hashPrefix), + columnKeyClient: columnKeyClient, + gcsObjectClient: gcsObjectClient, + hashPrefix: hashPrefix, + }) + } + } + } + return fixtures +}() diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fnv.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fnv.go new file mode 100644 index 000000000000..851a9d7f19f9 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/fnv.go @@ -0,0 +1,36 @@ +// Modified from github.com/prometheus/common/model/fnv.go +// Copyright 2015 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcp + +// Inline and byte-free variant of hash/fnv's fnv64a. + +const ( + offset64 = 14695981039346656037 + prime64 = 1099511628211 +) + +// hashNew initializies a new fnv64a hash value. +func hashNew() uint64 { + return offset64 +} + +// hashAdd adds a string to a fnv64a hash value, returning the updated hash. +func hashAdd(h uint64, s string) uint64 { + for i := 0; i < len(s); i++ { + h ^= uint64(s[i]) + h *= prime64 + } + return h +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go index 1f621cf6f754..0d92a5b49e5a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go @@ -21,17 +21,24 @@ type gcsObjectClient struct { // GCSConfig is config for the GCS Chunk Client. type GCSConfig struct { - BucketName string `yaml:"bucket_name"` + BucketName string `yaml:"bucket_name"` + ChunkBufferSize int `yaml:"chunk_buffer_size"` } // RegisterFlags registers flags. func (cfg *GCSConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.BucketName, "gcs.bucketname", "", "Name of GCS bucket to put chunks in.") + f.IntVar(&cfg.ChunkBufferSize, "gcs.chunk-buffer-size", 0, "The size of the buffer that GCS client for each PUT request. 0 to disable buffering.") } // NewGCSObjectClient makes a new chunk.ObjectClient that writes chunks to GCS. func NewGCSObjectClient(ctx context.Context, cfg GCSConfig, schemaCfg chunk.SchemaConfig) (chunk.ObjectClient, error) { - client, err := storage.NewClient(ctx, instrumentation()...) + option, err := gcsInstrumentation(ctx) + if err != nil { + return nil, err + } + + client, err := storage.NewClient(ctx, option) if err != nil { return nil, err } @@ -59,6 +66,11 @@ func (s *gcsObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) e return err } writer := s.bucket.Object(chunk.ExternalKey()).NewWriter(ctx) + // Default GCSChunkSize is 8M and for each call, 8M is allocated xD + // By setting it to 0, we just upload the object in a single a request + // which should work for our chunk sizes. + writer.ChunkSize = s.cfg.ChunkBufferSize + if _, err := writer.Write(buf); err != nil { return err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go index 62b7b9e050ce..a87cd10b3dff 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/instrumentation.go @@ -1,40 +1,87 @@ package gcp import ( - "github.com/grpc-ecosystem/go-grpc-middleware" + "context" + "net/http" + "strconv" + "time" + otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/api/option" + google_http "google.golang.org/api/transport/http" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/util/middleware" ) -var bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "bigtable_request_duration_seconds", - Help: "Time spent doing Bigtable requests.", - - // Bigtable latency seems to range from a few ms to a few hundred ms and is - // important. So use 6 buckets from 1ms to 1s. - Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), -}, []string{"operation", "status_code"}) - -func instrumentation() []option.ClientOption { - return []option.ClientOption{ - option.WithGRPCDialOption( - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), - middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration), - )), - ), - option.WithGRPCDialOption( - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), - middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration), - )), - ), +var ( + bigtableRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "bigtable_request_duration_seconds", + Help: "Time spent doing Bigtable requests.", + + // Bigtable latency seems to range from a few ms to a few hundred ms and is + // important. So use 6 buckets from 1ms to 1s. + Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + }, []string{"operation", "status_code"}) + + gcsRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "gcs_request_duration_seconds", + Help: "Time spent doing GCS requests.", + + // Bigtable latency seems to range from a few ms to a few hundred ms and is + // important. So use 6 buckets from 1ms to 1s. + Buckets: prometheus.ExponentialBuckets(0.001, 4, 6), + }, []string{"operation", "status_code"}) +) + +func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + return []grpc.UnaryClientInterceptor{ + otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), + middleware.PrometheusGRPCUnaryInstrumentation(bigtableRequestDuration), + }, + []grpc.StreamClientInterceptor{ + otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), + middleware.PrometheusGRPCStreamInstrumentation(bigtableRequestDuration), + } +} + +func gcsInstrumentation(ctx context.Context) (option.ClientOption, error) { + transport, err := google_http.NewTransport(ctx, http.DefaultTransport) + if err != nil { + return nil, err + } + client := &http.Client{ + Transport: instrumentedTransport{ + observer: gcsRequestDuration, + next: transport, + }, + } + return option.WithHTTPClient(client), nil +} + +func toOptions(opts []grpc.DialOption) []option.ClientOption { + result := make([]option.ClientOption, 0, len(opts)) + for _, opt := range opts { + result = append(result, option.WithGRPCDialOption(opt)) + } + return result +} + +type instrumentedTransport struct { + observer prometheus.ObserverVec + next http.RoundTripper +} + +func (i instrumentedTransport) RoundTrip(req *http.Request) (*http.Response, error) { + start := time.Now() + resp, err := i.next.RoundTrip(req) + if err == nil { + i.observer.WithLabelValues(req.Method, strconv.Itoa(resp.StatusCode)).Observe(time.Since(start).Seconds()) } + return resp, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go index 2a62c1b04216..bb6c7e2ca70a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go @@ -2,12 +2,14 @@ package gcp import ( "context" - "strings" + + "google.golang.org/grpc/codes" "cloud.google.com/go/bigtable" "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/pkg/errors" ) type tableClient struct { @@ -17,7 +19,8 @@ type tableClient struct { // NewTableClient returns a new TableClient. func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) { - client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, instrumentation()...) + opts := toOptions(cfg.GRPCClientConfig.DialOption(bigtableInstrumentation())) + client, err := bigtable.NewAdminClient(ctx, cfg.Project, cfg.Instance, opts...) if err != nil { return nil, err } @@ -30,7 +33,7 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error) func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { tables, err := c.client.Tables(ctx) if err != nil { - return nil, err + return nil, errors.Wrap(err, "client.Tables") } // Check each table has the right column family. If not, omit it. @@ -38,7 +41,7 @@ func (c *tableClient) ListTables(ctx context.Context) ([]string, error) { for _, table := range tables { info, err := c.client.TableInfo(ctx, table) if err != nil { - return nil, err + return nil, errors.Wrap(err, "client.TableInfo") } if hasColumnFamily(info.FamilyInfos) { @@ -61,22 +64,27 @@ func hasColumnFamily(infos []bigtable.FamilyInfo) bool { func (c *tableClient) CreateTable(ctx context.Context, desc chunk.TableDesc) error { if err := c.client.CreateTable(ctx, desc.Name); err != nil { if !alreadyExistsError(err) { - return err + return errors.Wrap(err, "client.CreateTable") + } + } + + if err := c.client.CreateColumnFamily(ctx, desc.Name, columnFamily); err != nil { + if !alreadyExistsError(err) { + return errors.Wrap(err, "client.CreateColumnFamily") } } - return c.client.CreateColumnFamily(ctx, desc.Name, columnFamily) + + return nil } func alreadyExistsError(err error) bool { - // This is super fragile, but I can't find a better way of doing it. - // Have filed bug upstream: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/672 serr, ok := status.FromError(err) - return ok && strings.Contains(serr.Message(), "already exists") + return ok && serr.Code() == codes.AlreadyExists } func (c *tableClient) DeleteTable(ctx context.Context, name string) error { if err := c.client.DeleteTable(ctx, name); err != nil { - return err + return errors.Wrap(err, "client.DeleteTable") } return nil diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go index 8ddb54608f14..1fa5db0cca7a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go @@ -422,13 +422,13 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr } // ChunkTableFor calculates the chunk table shard for a given point in time. -func (cfg SchemaConfig) ChunkTableFor(t model.Time) string { +func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) { for i := range cfg.Configs { if t > cfg.Configs[i].From && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) { - return cfg.Configs[i].ChunkTables.TableFor(t) + return cfg.Configs[i].ChunkTables.TableFor(t), nil } } - return "" + return "", fmt.Errorf("no chunk table found for time %v", t) } // TableFor calculates the table shard for a given point in time. diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go index 10ab4268235b..fcb9a02204a1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/caching_index_client.go @@ -9,6 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" proto "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" @@ -205,7 +206,10 @@ func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batc } func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) { - cacheGets.Inc() + log, ctx := spanlogger.New(ctx, "cachingIndexClient.cacheFetch") + defer log.Finish() + + cacheGets.Add(float64(len(keys))) // Build a map from hash -> key; NB there can be collisions here; we'll fetch // the last hash. @@ -233,15 +237,20 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat var readBatch ReadBatch if err := proto.Unmarshal(bufs[j], &readBatch); err != nil { - level.Warn(util.Logger).Log("msg", "error unmarshalling index entry from cache", "err", err) + level.Warn(log).Log("msg", "error unmarshalling index entry from cache", "err", err) cacheCorruptErrs.Inc() continue } // Make sure the hash(key) is not a collision in the cache by looking at the // key in the value. - if key != readBatch.Key || (readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry))) { - cacheCorruptErrs.Inc() + if key != readBatch.Key { + level.Debug(log).Log("msg", "dropping index cache entry due to key collision", "key", key, "readBatch.Key", readBatch.Key, "expiry") + continue + } + + if readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry)) { + level.Debug(log).Log("msg", "dropping index cache entry due to expiration", "key", key, "readBatch.Key", readBatch.Key, "expiry", time.Unix(0, readBatch.Expiry)) continue } @@ -262,5 +271,6 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat missed = append(missed, miss) } + level.Debug(log).Log("hits", len(batches), "misses", len(misses)) return batches, missed } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go index 114b36906491..d45ed09de02e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go @@ -137,6 +137,9 @@ func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig) (chun return gcp.NewStorageClientV1(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "gcp-columnkey", "bigtable": return gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) + case "bigtable-hashed": + cfg.GCPStorageConfig.DistributeKeys = true + return gcp.NewStorageClientColumnKey(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) case "boltdb": @@ -189,7 +192,7 @@ func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } return aws.NewDynamoDBTableClient(cfg.AWSStorageConfig.DynamoDBConfig) - case "gcp", "gcp-columnkey": + case "gcp", "gcp-columnkey", "bigtable", "bigtable-hashed": return gcp.NewTableClient(context.Background(), cfg.GCPStorageConfig) case "cassandra": return cassandra.NewTableClient(context.Background(), cfg.CassandraStorageConfig) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go index 988ba7816e71..4c99a77fc9ab 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go @@ -305,46 +305,49 @@ func (m *TableManager) calculateExpectedTables() []TableDesc { // partitionTables works out tables that need to be created vs tables that need to be updated func (m *TableManager) partitionTables(ctx context.Context, descriptions []TableDesc) ([]TableDesc, []TableDesc, []TableDesc, error) { - existingTables, err := m.client.ListTables(ctx) + tables, err := m.client.ListTables(ctx) if err != nil { return nil, nil, nil, err } - sort.Strings(existingTables) - tablePrefixes := map[string]struct{}{} - for _, cfg := range m.schemaCfg.Configs { - tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} - tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + existingTables := make(map[string]struct{}, len(tables)) + for _, table := range tables { + existingTables[table] = struct{}{} + } + + expectedTables := make(map[string]TableDesc, len(descriptions)) + for _, desc := range descriptions { + expectedTables[desc.Name] = desc } toCreate, toCheck, toDelete := []TableDesc{}, []TableDesc{}, []TableDesc{} - i, j := 0, 0 - for i < len(descriptions) && j < len(existingTables) { - if descriptions[i].Name < existingTables[j] { - // Table descriptions[i] doesn't exist - toCreate = append(toCreate, descriptions[i]) - i++ - } else if descriptions[i].Name > existingTables[j] { - // existingTables[j].name isn't in descriptions, and can be removed - if m.cfg.RetentionPeriod > 0 { + for _, expectedTable := range expectedTables { + if _, ok := existingTables[expectedTable.Name]; ok { + toCheck = append(toCheck, expectedTable) + } else { + toCreate = append(toCreate, expectedTable) + } + } + + if m.cfg.RetentionPeriod > 0 { + // Ensure we only delete tables which have a prefix managed by Cortex. + tablePrefixes := map[string]struct{}{} + for _, cfg := range m.schemaCfg.Configs { + tablePrefixes[cfg.IndexTables.Prefix] = struct{}{} + tablePrefixes[cfg.ChunkTables.Prefix] = struct{}{} + } + + for existingTable := range existingTables { + if _, ok := expectedTables[existingTable]; !ok { for tblPrefix := range tablePrefixes { - if strings.HasPrefix(existingTables[j], tblPrefix) { - toDelete = append(toDelete, TableDesc{Name: existingTables[j]}) + if strings.HasPrefix(existingTable, tblPrefix) { + toDelete = append(toDelete, TableDesc{Name: existingTable}) break } } } - j++ - } else { - // Table exists, need to check it has correct throughput - toCheck = append(toCheck, descriptions[i]) - i++ - j++ } } - for ; i < len(descriptions); i++ { - toCreate = append(toCreate, descriptions[i]) - } return toCreate, toCheck, toDelete, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go index 123c465257c3..91bc56a0e70a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go @@ -23,6 +23,12 @@ type Fixture interface { Teardown() error } +// DefaultSchemaConfig returns default schema for use in test fixtures +func DefaultSchemaConfig(kind string) chunk.SchemaConfig { + schemaConfig := chunk.DefaultSchemaConfig(kind, "v1", model.Now().Add(-time.Hour*2)) + return schemaConfig +} + // Setup a fixture with initial tables func Setup(fixture Fixture, tableName string) (chunk.IndexClient, chunk.ObjectClient, error) { var tbmConfig chunk.TableManagerConfig @@ -49,11 +55,11 @@ func Setup(fixture Fixture, tableName string) (chunk.IndexClient, chunk.ObjectCl } // CreateChunks creates some chunks for testing -func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { +func CreateChunks(startIndex, batchSize int, start model.Time) ([]string, []chunk.Chunk, error) { keys := []string{} chunks := []chunk.Chunk{} for j := 0; j < batchSize; j++ { - chunk := dummyChunkFor(model.Now(), model.Metric{ + chunk := dummyChunkFor(start, model.Metric{ model.MetricNameLabel: "foo", "index": model.LabelValue(strconv.Itoa(startIndex*batchSize + j)), }) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/util/parallel_chunk_fetch.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/util/parallel_chunk_fetch.go index 5b18578385fb..25748244445a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/util/parallel_chunk_fetch.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/util/parallel_chunk_fetch.go @@ -29,7 +29,7 @@ func GetParallelChunks(ctx context.Context, chunks []chunk.Chunk, f func(context processedChunks := make(chan chunk.Chunk) errors := make(chan error) - for i := 0; i < max(maxParallel, len(chunks)); i++ { + for i := 0; i < min(maxParallel, len(chunks)); i++ { go func() { decodeContext := chunk.NewDecodeContext() for c := range queuedChunks { @@ -63,8 +63,8 @@ func GetParallelChunks(ctx context.Context, chunks []chunk.Chunk, f func(context return result, lastErr } -func max(a, b int) int { - if a > b { +func min(a, b int) int { + if a < b { return a } return b diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go index e14e7ceac881..764cf708949b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go @@ -3,9 +3,8 @@ package client import ( "flag" - "github.com/grpc-ecosystem/go-grpc-middleware" otgrpc "github.com/opentracing-contrib/go-grpc" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" @@ -36,22 +35,22 @@ type closableHealthAndIngesterClient struct { conn *grpc.ClientConn } -// MakeIngesterClient makes a new IngesterClient -func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) { - opts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( +func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) { + return []grpc.UnaryClientInterceptor{ otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), middleware.ClientUserHeaderInterceptor, cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration), - )), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + }, []grpc.StreamClientInterceptor{ otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()), middleware.StreamClientUserHeaderInterceptor, cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration), - )), - cfg.GRPCClientConfig.DialOption(), - } + } +} + +// MakeIngesterClient makes a new IngesterClient +func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) { + opts := []grpc.DialOption{grpc.WithInsecure()} + opts = append(opts, cfg.GRPCClientConfig.DialOption(instrumentation())...) conn, err := grpc.Dial(addr, opts...) if err != nil { return nil, err diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go index c4764ca1d23f..f177eefabf31 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go @@ -248,6 +248,18 @@ func FromLabelPairsToLabels(labelPairs []LabelPair) labels.Labels { return ls } +// FromLabelsToLabelPairs converts labels.Labels to []LabelPair +func FromLabelsToLabelPairs(s labels.Labels) []LabelPair { + labelPairs := make([]LabelPair, 0, len(s)) + for _, v := range s { + labelPairs = append(labelPairs, LabelPair{ + Name: []byte(v.Name), + Value: []byte(v.Value), + }) + } + return labelPairs // note already sorted +} + // FastFingerprint runs the same algorithm as Prometheus labelSetToFastFingerprint() func FastFingerprint(labelPairs []LabelPair) model.Fingerprint { if len(labelPairs) == 0 { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go index 66a562173ffc..a7d5490231d1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.pb.go @@ -60,7 +60,7 @@ var MatchType_value = map[string]int32{ } func (MatchType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{0} + return fileDescriptor_cortex_dc30309a17c87a98, []int{0} } type WriteRequest_SourceEnum int32 @@ -80,7 +80,7 @@ var WriteRequest_SourceEnum_value = map[string]int32{ } func (WriteRequest_SourceEnum) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{0, 0} + return fileDescriptor_cortex_dc30309a17c87a98, []int{0, 0} } type WriteRequest struct { @@ -91,7 +91,7 @@ type WriteRequest struct { func (m *WriteRequest) Reset() { *m = WriteRequest{} } func (*WriteRequest) ProtoMessage() {} func (*WriteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{0} + return fileDescriptor_cortex_dc30309a17c87a98, []int{0} } func (m *WriteRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -133,7 +133,7 @@ type WriteResponse struct { func (m *WriteResponse) Reset() { *m = WriteResponse{} } func (*WriteResponse) ProtoMessage() {} func (*WriteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{1} + return fileDescriptor_cortex_dc30309a17c87a98, []int{1} } func (m *WriteResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -169,7 +169,7 @@ type ReadRequest struct { func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{2} + return fileDescriptor_cortex_dc30309a17c87a98, []int{2} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -212,7 +212,7 @@ type ReadResponse struct { func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{3} + return fileDescriptor_cortex_dc30309a17c87a98, []int{3} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -257,7 +257,7 @@ type QueryRequest struct { func (m *QueryRequest) Reset() { *m = QueryRequest{} } func (*QueryRequest) ProtoMessage() {} func (*QueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{4} + return fileDescriptor_cortex_dc30309a17c87a98, []int{4} } func (m *QueryRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -314,7 +314,7 @@ type QueryResponse struct { func (m *QueryResponse) Reset() { *m = QueryResponse{} } func (*QueryResponse) ProtoMessage() {} func (*QueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{5} + return fileDescriptor_cortex_dc30309a17c87a98, []int{5} } func (m *QueryResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -358,7 +358,7 @@ type QueryStreamResponse struct { func (m *QueryStreamResponse) Reset() { *m = QueryStreamResponse{} } func (*QueryStreamResponse) ProtoMessage() {} func (*QueryStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{6} + return fileDescriptor_cortex_dc30309a17c87a98, []int{6} } func (m *QueryStreamResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -401,7 +401,7 @@ type LabelValuesRequest struct { func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{7} + return fileDescriptor_cortex_dc30309a17c87a98, []int{7} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -444,7 +444,7 @@ type LabelValuesResponse struct { func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{8} + return fileDescriptor_cortex_dc30309a17c87a98, []int{8} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -480,13 +480,91 @@ func (m *LabelValuesResponse) GetLabelValues() []string { return nil } +type LabelNamesRequest struct { +} + +func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } +func (*LabelNamesRequest) ProtoMessage() {} +func (*LabelNamesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_cortex_dc30309a17c87a98, []int{9} +} +func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LabelNamesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LabelNamesRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *LabelNamesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelNamesRequest.Merge(dst, src) +} +func (m *LabelNamesRequest) XXX_Size() int { + return m.Size() +} +func (m *LabelNamesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LabelNamesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LabelNamesRequest proto.InternalMessageInfo + +type LabelNamesResponse struct { + LabelNames []string `protobuf:"bytes,1,rep,name=label_names,json=labelNames" json:"label_names,omitempty"` +} + +func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } +func (*LabelNamesResponse) ProtoMessage() {} +func (*LabelNamesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_cortex_dc30309a17c87a98, []int{10} +} +func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LabelNamesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LabelNamesResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *LabelNamesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelNamesResponse.Merge(dst, src) +} +func (m *LabelNamesResponse) XXX_Size() int { + return m.Size() +} +func (m *LabelNamesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_LabelNamesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_LabelNamesResponse proto.InternalMessageInfo + +func (m *LabelNamesResponse) GetLabelNames() []string { + if m != nil { + return m.LabelNames + } + return nil +} + type UserStatsRequest struct { } func (m *UserStatsRequest) Reset() { *m = UserStatsRequest{} } func (*UserStatsRequest) ProtoMessage() {} func (*UserStatsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{9} + return fileDescriptor_cortex_dc30309a17c87a98, []int{11} } func (m *UserStatsRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -525,7 +603,7 @@ type UserStatsResponse struct { func (m *UserStatsResponse) Reset() { *m = UserStatsResponse{} } func (*UserStatsResponse) ProtoMessage() {} func (*UserStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{10} + return fileDescriptor_cortex_dc30309a17c87a98, []int{12} } func (m *UserStatsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -590,7 +668,7 @@ type UserIDStatsResponse struct { func (m *UserIDStatsResponse) Reset() { *m = UserIDStatsResponse{} } func (*UserIDStatsResponse) ProtoMessage() {} func (*UserIDStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{11} + return fileDescriptor_cortex_dc30309a17c87a98, []int{13} } func (m *UserIDStatsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -640,7 +718,7 @@ type UsersStatsResponse struct { func (m *UsersStatsResponse) Reset() { *m = UsersStatsResponse{} } func (*UsersStatsResponse) ProtoMessage() {} func (*UsersStatsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{12} + return fileDescriptor_cortex_dc30309a17c87a98, []int{14} } func (m *UsersStatsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -685,7 +763,7 @@ type MetricsForLabelMatchersRequest struct { func (m *MetricsForLabelMatchersRequest) Reset() { *m = MetricsForLabelMatchersRequest{} } func (*MetricsForLabelMatchersRequest) ProtoMessage() {} func (*MetricsForLabelMatchersRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{13} + return fileDescriptor_cortex_dc30309a17c87a98, []int{15} } func (m *MetricsForLabelMatchersRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -742,7 +820,7 @@ type MetricsForLabelMatchersResponse struct { func (m *MetricsForLabelMatchersResponse) Reset() { *m = MetricsForLabelMatchersResponse{} } func (*MetricsForLabelMatchersResponse) ProtoMessage() {} func (*MetricsForLabelMatchersResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{14} + return fileDescriptor_cortex_dc30309a17c87a98, []int{16} } func (m *MetricsForLabelMatchersResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -788,7 +866,7 @@ type TimeSeriesChunk struct { func (m *TimeSeriesChunk) Reset() { *m = TimeSeriesChunk{} } func (*TimeSeriesChunk) ProtoMessage() {} func (*TimeSeriesChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{15} + return fileDescriptor_cortex_dc30309a17c87a98, []int{17} } func (m *TimeSeriesChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -855,7 +933,7 @@ type Chunk struct { func (m *Chunk) Reset() { *m = Chunk{} } func (*Chunk) ProtoMessage() {} func (*Chunk) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{16} + return fileDescriptor_cortex_dc30309a17c87a98, []int{18} } func (m *Chunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -918,7 +996,7 @@ type TransferChunksResponse struct { func (m *TransferChunksResponse) Reset() { *m = TransferChunksResponse{} } func (*TransferChunksResponse) ProtoMessage() {} func (*TransferChunksResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{17} + return fileDescriptor_cortex_dc30309a17c87a98, []int{19} } func (m *TransferChunksResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -956,7 +1034,7 @@ type TimeSeries struct { func (m *TimeSeries) Reset() { *m = TimeSeries{} } func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{18} + return fileDescriptor_cortex_dc30309a17c87a98, []int{20} } func (m *TimeSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1007,7 +1085,7 @@ type LabelPair struct { func (m *LabelPair) Reset() { *m = LabelPair{} } func (*LabelPair) ProtoMessage() {} func (*LabelPair) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{19} + return fileDescriptor_cortex_dc30309a17c87a98, []int{21} } func (m *LabelPair) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1044,7 +1122,7 @@ type Sample struct { func (m *Sample) Reset() { *m = Sample{} } func (*Sample) ProtoMessage() {} func (*Sample) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{20} + return fileDescriptor_cortex_dc30309a17c87a98, []int{22} } func (m *Sample) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1094,7 +1172,7 @@ type LabelMatchers struct { func (m *LabelMatchers) Reset() { *m = LabelMatchers{} } func (*LabelMatchers) ProtoMessage() {} func (*LabelMatchers) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{21} + return fileDescriptor_cortex_dc30309a17c87a98, []int{23} } func (m *LabelMatchers) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1137,7 +1215,7 @@ type Metric struct { func (m *Metric) Reset() { *m = Metric{} } func (*Metric) ProtoMessage() {} func (*Metric) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{22} + return fileDescriptor_cortex_dc30309a17c87a98, []int{24} } func (m *Metric) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1182,7 +1260,7 @@ type LabelMatcher struct { func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_cortex_4f0b8b776e298528, []int{23} + return fileDescriptor_cortex_dc30309a17c87a98, []int{25} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1242,6 +1320,8 @@ func init() { proto.RegisterType((*QueryStreamResponse)(nil), "cortex.QueryStreamResponse") proto.RegisterType((*LabelValuesRequest)(nil), "cortex.LabelValuesRequest") proto.RegisterType((*LabelValuesResponse)(nil), "cortex.LabelValuesResponse") + proto.RegisterType((*LabelNamesRequest)(nil), "cortex.LabelNamesRequest") + proto.RegisterType((*LabelNamesResponse)(nil), "cortex.LabelNamesResponse") proto.RegisterType((*UserStatsRequest)(nil), "cortex.UserStatsRequest") proto.RegisterType((*UserStatsResponse)(nil), "cortex.UserStatsResponse") proto.RegisterType((*UserIDStatsResponse)(nil), "cortex.UserIDStatsResponse") @@ -1531,6 +1611,56 @@ func (this *LabelValuesResponse) Equal(that interface{}) bool { } return true } +func (this *LabelNamesRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LabelNamesRequest) + if !ok { + that2, ok := that.(LabelNamesRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *LabelNamesResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LabelNamesResponse) + if !ok { + that2, ok := that.(LabelNamesResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.LabelNames) != len(that1.LabelNames) { + return false + } + for i := range this.LabelNames { + if this.LabelNames[i] != that1.LabelNames[i] { + return false + } + } + return true +} func (this *UserStatsRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -2091,6 +2221,25 @@ func (this *LabelValuesResponse) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *LabelNamesRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&client.LabelNamesRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *LabelNamesResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&client.LabelNamesResponse{") + s = append(s, "LabelNames: "+fmt.Sprintf("%#v", this.LabelNames)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *UserStatsRequest) GoString() string { if this == nil { return "nil" @@ -2321,6 +2470,7 @@ type IngesterClient interface { Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (*QueryResponse, error) QueryStream(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Ingester_QueryStreamClient, error) LabelValues(ctx context.Context, in *LabelValuesRequest, opts ...grpc.CallOption) (*LabelValuesResponse, error) + LabelNames(ctx context.Context, in *LabelNamesRequest, opts ...grpc.CallOption) (*LabelNamesResponse, error) UserStats(ctx context.Context, in *UserStatsRequest, opts ...grpc.CallOption) (*UserStatsResponse, error) AllUserStats(ctx context.Context, in *UserStatsRequest, opts ...grpc.CallOption) (*UsersStatsResponse, error) MetricsForLabelMatchers(ctx context.Context, in *MetricsForLabelMatchersRequest, opts ...grpc.CallOption) (*MetricsForLabelMatchersResponse, error) @@ -2395,6 +2545,15 @@ func (c *ingesterClient) LabelValues(ctx context.Context, in *LabelValuesRequest return out, nil } +func (c *ingesterClient) LabelNames(ctx context.Context, in *LabelNamesRequest, opts ...grpc.CallOption) (*LabelNamesResponse, error) { + out := new(LabelNamesResponse) + err := c.cc.Invoke(ctx, "/cortex.Ingester/LabelNames", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *ingesterClient) UserStats(ctx context.Context, in *UserStatsRequest, opts ...grpc.CallOption) (*UserStatsResponse, error) { out := new(UserStatsResponse) err := c.cc.Invoke(ctx, "/cortex.Ingester/UserStats", in, out, opts...) @@ -2462,6 +2621,7 @@ type IngesterServer interface { Query(context.Context, *QueryRequest) (*QueryResponse, error) QueryStream(*QueryRequest, Ingester_QueryStreamServer) error LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error) + LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error) UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error) AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error) MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error) @@ -2548,6 +2708,24 @@ func _Ingester_LabelValues_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _Ingester_LabelNames_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LabelNamesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IngesterServer).LabelNames(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/cortex.Ingester/LabelNames", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IngesterServer).LabelNames(ctx, req.(*LabelNamesRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Ingester_UserStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UserStatsRequest) if err := dec(in); err != nil { @@ -2644,6 +2822,10 @@ var _Ingester_serviceDesc = grpc.ServiceDesc{ MethodName: "LabelValues", Handler: _Ingester_LabelValues_Handler, }, + { + MethodName: "LabelNames", + Handler: _Ingester_LabelNames_Handler, + }, { MethodName: "UserStats", Handler: _Ingester_UserStats_Handler, @@ -2942,6 +3124,57 @@ func (m *LabelValuesResponse) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *LabelNamesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LabelNamesRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *LabelNamesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LabelNamesResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.LabelNames) > 0 { + for _, s := range m.LabelNames { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + func (m *UserStatsRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -3591,6 +3824,30 @@ func (m *LabelValuesResponse) Size() (n int) { return n } +func (m *LabelNamesRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *LabelNamesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.LabelNames) > 0 { + for _, s := range m.LabelNames { + l = len(s) + n += 1 + l + sovCortex(uint64(l)) + } + } + return n +} + func (m *UserStatsRequest) Size() (n int) { if m == nil { return 0 @@ -3953,6 +4210,25 @@ func (this *LabelValuesResponse) String() string { }, "") return s } +func (this *LabelNamesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LabelNamesRequest{`, + `}`, + }, "") + return s +} +func (this *LabelNamesResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LabelNamesResponse{`, + `LabelNames:` + fmt.Sprintf("%v", this.LabelNames) + `,`, + `}`, + }, "") + return s +} func (this *UserStatsRequest) String() string { if this == nil { return "nil" @@ -4877,6 +5153,135 @@ func (m *LabelValuesResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *LabelNamesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelNamesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelNamesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipCortex(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LabelNamesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelNamesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelNamesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LabelNames", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCortex + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.LabelNames = append(m.LabelNames, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCortex(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCortex + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *UserStatsRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -6477,85 +6882,87 @@ var ( ) func init() { - proto.RegisterFile("github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto", fileDescriptor_cortex_4f0b8b776e298528) -} - -var fileDescriptor_cortex_4f0b8b776e298528 = []byte{ - // 1207 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x5b, 0x6f, 0x1b, 0x45, - 0x14, 0xde, 0x8d, 0x2f, 0x89, 0x8f, 0x2f, 0x75, 0x27, 0x2d, 0x31, 0xae, 0xd8, 0x94, 0x91, 0x5a, - 0x22, 0xa0, 0x76, 0x49, 0x04, 0x04, 0x41, 0x05, 0x4e, 0x9b, 0xb6, 0x46, 0xb9, 0xae, 0x1d, 0x40, - 0x48, 0x68, 0xb5, 0xb1, 0x27, 0xce, 0xd2, 0xbd, 0xb8, 0x33, 0xb3, 0x40, 0xde, 0xf8, 0x07, 0xf0, - 0xc8, 0x4f, 0xe0, 0x0d, 0x89, 0x17, 0xf8, 0x09, 0x7d, 0x8c, 0x78, 0xaa, 0x78, 0xa8, 0x88, 0xf3, - 0xc2, 0x63, 0x7f, 0x02, 0xda, 0x99, 0xdd, 0xf5, 0xae, 0x6b, 0x8b, 0x20, 0xd4, 0x37, 0xcf, 0x39, - 0xdf, 0xf9, 0xe6, 0x5c, 0xe6, 0x9c, 0xb3, 0x86, 0x4f, 0x06, 0x16, 0x3f, 0xf6, 0x0f, 0x1b, 0x3d, - 0xcf, 0x69, 0xf6, 0x3c, 0xca, 0xc9, 0x77, 0x43, 0xea, 0x7d, 0x4d, 0x7a, 0x3c, 0x3c, 0x35, 0x87, - 0x8f, 0x06, 0x4d, 0xcb, 0x1d, 0x10, 0xc6, 0x09, 0x6d, 0xf6, 0x6c, 0x8b, 0xb8, 0x91, 0xaa, 0x31, - 0xa4, 0x1e, 0xf7, 0x50, 0x5e, 0x9e, 0xea, 0xb7, 0x12, 0x4c, 0x03, 0x6f, 0xe0, 0x35, 0x85, 0xfa, - 0xd0, 0x3f, 0x12, 0x27, 0x71, 0x10, 0xbf, 0xa4, 0x19, 0xfe, 0x5d, 0x85, 0xd2, 0xe7, 0xd4, 0xe2, - 0x44, 0x27, 0x8f, 0x7d, 0xc2, 0x38, 0xda, 0x01, 0xe0, 0x96, 0x43, 0x18, 0xa1, 0x16, 0x61, 0x35, - 0xf5, 0x7a, 0x66, 0xa5, 0xb8, 0x8a, 0x1a, 0xe1, 0x55, 0x5d, 0xcb, 0x21, 0x1d, 0xa1, 0xd9, 0xa8, - 0x3f, 0x79, 0xb6, 0xac, 0xfc, 0xf9, 0x6c, 0x19, 0xed, 0x51, 0x62, 0xda, 0xb6, 0xd7, 0xeb, 0xc6, - 0x56, 0x7a, 0x82, 0x01, 0xbd, 0x0f, 0xf9, 0x8e, 0xe7, 0xd3, 0x1e, 0xa9, 0xcd, 0x5d, 0x57, 0x57, - 0x2a, 0xab, 0xcb, 0x11, 0x57, 0xf2, 0xd6, 0x86, 0x84, 0x6c, 0xba, 0xbe, 0xa3, 0xe7, 0x99, 0xf8, - 0x8d, 0x97, 0x01, 0xc6, 0x52, 0x34, 0x0f, 0x99, 0xd6, 0x5e, 0xbb, 0xaa, 0xa0, 0x05, 0xc8, 0xea, - 0x07, 0x5b, 0x9b, 0x55, 0x15, 0x5f, 0x82, 0x72, 0xc8, 0xc1, 0x86, 0x9e, 0xcb, 0x08, 0xbe, 0x03, - 0x45, 0x9d, 0x98, 0xfd, 0x28, 0x92, 0x06, 0xcc, 0x3f, 0xf6, 0x93, 0x61, 0x5c, 0x89, 0xae, 0xde, - 0xf7, 0x09, 0x3d, 0x09, 0x61, 0x7a, 0x04, 0xc2, 0x1f, 0x43, 0x49, 0x9a, 0x4b, 0x3a, 0xd4, 0x84, - 0x79, 0x4a, 0x98, 0x6f, 0xf3, 0xc8, 0xfe, 0xea, 0x84, 0xbd, 0xc4, 0xe9, 0x11, 0x0a, 0xff, 0xa4, - 0x42, 0x29, 0x49, 0x8d, 0xde, 0x06, 0xc4, 0xb8, 0x49, 0xb9, 0x21, 0xf2, 0xc1, 0x4d, 0x67, 0x68, - 0x38, 0x01, 0x99, 0xba, 0x92, 0xd1, 0xab, 0x42, 0xd3, 0x8d, 0x14, 0xdb, 0x0c, 0xad, 0x40, 0x95, - 0xb8, 0xfd, 0x34, 0x76, 0x4e, 0x60, 0x2b, 0xc4, 0xed, 0x27, 0x91, 0xb7, 0x61, 0xc1, 0x31, 0x79, - 0xef, 0x98, 0x50, 0x56, 0xcb, 0xa4, 0x43, 0xdb, 0x32, 0x0f, 0x89, 0xbd, 0x2d, 0x95, 0x7a, 0x8c, - 0xc2, 0x6d, 0x28, 0xa7, 0x9c, 0x46, 0xeb, 0x17, 0x2c, 0x73, 0x36, 0x28, 0x73, 0xb2, 0xa0, 0xb8, - 0x0b, 0x8b, 0x82, 0xaa, 0xc3, 0x29, 0x31, 0x9d, 0x98, 0xf0, 0xce, 0x14, 0xc2, 0xa5, 0x17, 0x09, - 0xef, 0x1e, 0xfb, 0xee, 0xa3, 0x29, 0xac, 0x6b, 0x80, 0x84, 0xeb, 0x9f, 0x99, 0xb6, 0x4f, 0x58, - 0x94, 0xc0, 0xd7, 0x00, 0xec, 0x40, 0x6a, 0xb8, 0xa6, 0x43, 0x44, 0xe2, 0x0a, 0x7a, 0x41, 0x48, - 0x76, 0x4c, 0x87, 0xe0, 0x75, 0x58, 0x4c, 0x19, 0x85, 0xae, 0xbc, 0x0e, 0x25, 0x69, 0xf5, 0x8d, - 0x90, 0x0b, 0x67, 0x0a, 0x7a, 0xd1, 0x1e, 0x43, 0x31, 0x82, 0xea, 0x01, 0x23, 0xb4, 0xc3, 0x4d, - 0x1e, 0x5d, 0x86, 0x7f, 0x53, 0xe1, 0x72, 0x42, 0x18, 0x92, 0xdd, 0x80, 0x8a, 0xec, 0x3b, 0xcb, - 0x73, 0x0d, 0x6a, 0x72, 0xe9, 0x86, 0xaa, 0x97, 0x63, 0xa9, 0x6e, 0x72, 0x12, 0x78, 0xea, 0xfa, - 0x8e, 0x11, 0x86, 0x1f, 0x94, 0x2d, 0xab, 0x17, 0x5c, 0xdf, 0x91, 0x51, 0x07, 0x2f, 0xc1, 0x1c, - 0x5a, 0xc6, 0x04, 0x53, 0x46, 0x30, 0x55, 0xcd, 0xa1, 0xd5, 0x4e, 0x91, 0x35, 0x60, 0x91, 0xfa, - 0x36, 0x99, 0x84, 0x67, 0x05, 0xfc, 0x72, 0xa0, 0x4a, 0xe1, 0xf1, 0x57, 0xb0, 0x18, 0x38, 0xde, - 0xbe, 0x97, 0x76, 0x7d, 0x09, 0xe6, 0x7d, 0x46, 0xa8, 0x61, 0xf5, 0xc3, 0xd4, 0xe5, 0x83, 0x63, - 0xbb, 0x8f, 0x6e, 0x41, 0xb6, 0x6f, 0x72, 0x53, 0xb8, 0x59, 0x5c, 0x7d, 0x35, 0xaa, 0xd2, 0x0b, - 0xc1, 0xeb, 0x02, 0x86, 0x1f, 0x00, 0x0a, 0x54, 0x2c, 0xcd, 0xfe, 0x0e, 0xe4, 0x58, 0x20, 0x08, - 0x6b, 0x7d, 0x2d, 0xc9, 0x32, 0xe1, 0x89, 0x2e, 0x91, 0xf8, 0x57, 0x15, 0xb4, 0x6d, 0xc2, 0xa9, - 0xd5, 0x63, 0xf7, 0x3d, 0x9a, 0x7c, 0xaa, 0xec, 0x65, 0xb7, 0xcc, 0x3a, 0x94, 0xa2, 0x66, 0x30, - 0x18, 0xe1, 0x61, 0xdb, 0x5c, 0x9d, 0xd6, 0x36, 0x4c, 0x2f, 0x46, 0xd0, 0x0e, 0xe1, 0xb8, 0x0d, - 0xcb, 0x33, 0x7d, 0x0e, 0x53, 0x71, 0x13, 0xf2, 0x8e, 0x80, 0x84, 0xb9, 0xa8, 0x44, 0xb4, 0xd2, - 0x50, 0x0f, 0xb5, 0x41, 0xfc, 0x97, 0x26, 0x5a, 0x21, 0x08, 0xe1, 0x88, 0x7a, 0x8e, 0x11, 0x0d, - 0xf7, 0x71, 0xb5, 0x2a, 0x81, 0xbc, 0x1d, 0x8a, 0xdb, 0xfd, 0x64, 0x39, 0xe7, 0x52, 0xe5, 0x6c, - 0x42, 0x5e, 0xbc, 0xed, 0x68, 0x18, 0x5c, 0x4e, 0x45, 0xb5, 0x67, 0x5a, 0x34, 0x6c, 0xb8, 0x10, - 0x86, 0xde, 0x82, 0x7c, 0x2f, 0xb8, 0x9c, 0xd5, 0xb2, 0xc2, 0xa0, 0x1c, 0x19, 0x24, 0xbb, 0x33, - 0x84, 0xe0, 0x1f, 0x54, 0xc8, 0x49, 0x57, 0x5f, 0x56, 0x6d, 0xea, 0xb0, 0x40, 0xdc, 0x9e, 0xd7, - 0xb7, 0xdc, 0x81, 0x68, 0x89, 0x9c, 0x1e, 0x9f, 0x11, 0x0a, 0x9f, 0x6a, 0xf0, 0xf6, 0x4b, 0xe1, - 0x7b, 0xac, 0xc1, 0x2b, 0x5d, 0x6a, 0xba, 0xec, 0x88, 0x50, 0xe1, 0x58, 0x5c, 0x08, 0xec, 0x00, - 0x8c, 0xf3, 0x9b, 0xc8, 0x8b, 0x7a, 0xb1, 0xbc, 0x34, 0x60, 0x9e, 0x99, 0xce, 0xd0, 0x16, 0x1d, - 0x9c, 0x2a, 0x64, 0x47, 0x88, 0x43, 0x78, 0x04, 0xc2, 0xbf, 0xa8, 0x50, 0x88, 0xb9, 0xd0, 0x2e, - 0x64, 0xe3, 0x31, 0x55, 0xda, 0xf8, 0x30, 0xdc, 0x8f, 0x6b, 0x17, 0xd9, 0xec, 0x3e, 0xb7, 0xec, - 0xe6, 0xb7, 0x16, 0x25, 0x8d, 0x8d, 0x13, 0x4e, 0x98, 0x2e, 0x88, 0xd0, 0x3e, 0xe4, 0xc4, 0x04, - 0x13, 0x69, 0xfb, 0x9f, 0x8c, 0x92, 0x09, 0xb7, 0x20, 0x2f, 0x43, 0x41, 0x57, 0x22, 0x72, 0x39, - 0xce, 0xe4, 0x21, 0x18, 0x9d, 0x53, 0x0a, 0x56, 0xe4, 0xe3, 0x6a, 0xe1, 0x16, 0x94, 0x53, 0x5d, - 0x90, 0xda, 0x46, 0xea, 0x85, 0xb6, 0xd1, 0x07, 0x90, 0x97, 0x9d, 0xf1, 0x9f, 0x4b, 0x84, 0x0d, - 0x28, 0x25, 0x49, 0xd1, 0x0d, 0xc8, 0xf2, 0x93, 0xa1, 0x8c, 0xa2, 0x32, 0x36, 0x17, 0xea, 0xee, - 0xc9, 0x90, 0xe8, 0x42, 0x1d, 0x3c, 0x23, 0x51, 0x1b, 0xd9, 0x38, 0x32, 0xbd, 0x71, 0x06, 0x32, - 0x42, 0x28, 0x0f, 0x6f, 0x7e, 0x0a, 0x85, 0xd8, 0x18, 0x15, 0x20, 0xb7, 0xb9, 0x7f, 0xd0, 0xda, - 0xaa, 0x2a, 0xa8, 0x0c, 0x85, 0x9d, 0xdd, 0xae, 0x21, 0x8f, 0x2a, 0xba, 0x04, 0x45, 0x7d, 0xf3, - 0xc1, 0xe6, 0x17, 0xc6, 0x76, 0xab, 0x7b, 0xf7, 0x61, 0x75, 0x0e, 0x21, 0xa8, 0x48, 0xc1, 0xce, - 0x6e, 0x28, 0xcb, 0xac, 0xfe, 0x91, 0x85, 0x85, 0xa8, 0x81, 0xd1, 0xbb, 0x90, 0xdd, 0xf3, 0xd9, - 0x31, 0xba, 0x32, 0xed, 0x03, 0xa8, 0x7e, 0x75, 0x42, 0x1a, 0x3e, 0x68, 0x05, 0xbd, 0x07, 0x39, - 0xb1, 0x6e, 0xd1, 0xd4, 0xaf, 0x97, 0xfa, 0xf4, 0x6f, 0x12, 0xac, 0xa0, 0x7b, 0x50, 0x4c, 0xac, - 0xe9, 0x19, 0xd6, 0xd7, 0x52, 0xd2, 0xf4, 0x46, 0xc7, 0xca, 0x6d, 0x15, 0x3d, 0x84, 0x62, 0x62, - 0xc3, 0xa2, 0x7a, 0xaa, 0x3c, 0xa9, 0x5d, 0x3d, 0xe6, 0x9a, 0xb2, 0x92, 0xb1, 0x82, 0x36, 0xa0, - 0x10, 0xef, 0x17, 0x54, 0x9b, 0xb2, 0x72, 0x24, 0xcb, 0xec, 0x65, 0x84, 0x15, 0x74, 0x1f, 0x4a, - 0x2d, 0xdb, 0xbe, 0x08, 0x4d, 0x3d, 0xa9, 0x61, 0x93, 0x3c, 0x36, 0x2c, 0xcd, 0x18, 0xe9, 0xe8, - 0x66, 0x7a, 0x74, 0xcf, 0xda, 0x53, 0xf5, 0x37, 0xfe, 0x15, 0x17, 0xdf, 0xb6, 0x0d, 0x95, 0xf4, - 0xb8, 0x42, 0xb3, 0xbe, 0x8b, 0xea, 0x5a, 0xac, 0x98, 0x3e, 0xdf, 0x94, 0x15, 0x75, 0xe3, 0xa3, - 0xd3, 0x33, 0x4d, 0x79, 0x7a, 0xa6, 0x29, 0xcf, 0xcf, 0x34, 0xf5, 0xfb, 0x91, 0xa6, 0xfe, 0x3c, - 0xd2, 0xd4, 0x27, 0x23, 0x4d, 0x3d, 0x1d, 0x69, 0xea, 0x5f, 0x23, 0x4d, 0xfd, 0x7b, 0xa4, 0x29, - 0xcf, 0x47, 0x9a, 0xfa, 0xe3, 0xb9, 0xa6, 0x9c, 0x9e, 0x6b, 0xca, 0xd3, 0x73, 0x4d, 0xf9, 0x32, - 0x2f, 0xff, 0x33, 0x1c, 0xe6, 0xc5, 0x67, 0xff, 0xda, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x93, - 0x31, 0xd2, 0x02, 0x71, 0x0c, 0x00, 0x00, + proto.RegisterFile("github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto", fileDescriptor_cortex_dc30309a17c87a98) +} + +var fileDescriptor_cortex_dc30309a17c87a98 = []byte{ + // 1247 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0x4f, 0x6f, 0x1b, 0x45, + 0x14, 0xdf, 0x8d, 0xff, 0x24, 0x7e, 0x76, 0x5c, 0x67, 0xd2, 0xd2, 0xd4, 0x15, 0xeb, 0x32, 0x52, + 0x4b, 0x04, 0xd4, 0x2e, 0xa9, 0x0a, 0x45, 0x50, 0x81, 0xd3, 0xba, 0xad, 0x51, 0x92, 0xa6, 0x6b, + 0x17, 0x10, 0x12, 0x5a, 0x6d, 0xec, 0xa9, 0xb3, 0x74, 0xff, 0xb8, 0x33, 0xb3, 0x40, 0x6e, 0x7c, + 0x03, 0x38, 0xc2, 0x37, 0xe0, 0x86, 0xc4, 0x05, 0x3e, 0x42, 0x8f, 0x3d, 0x56, 0x1c, 0x2a, 0xea, + 0x5e, 0x38, 0xf6, 0x23, 0xa0, 0x9d, 0x99, 0x5d, 0xef, 0xba, 0xb6, 0x08, 0x42, 0xbd, 0x79, 0xde, + 0xfb, 0xbd, 0xdf, 0xbe, 0xbf, 0xf3, 0xc6, 0xf0, 0xc9, 0xc8, 0xe1, 0x87, 0xe1, 0x41, 0x73, 0x10, + 0x78, 0xad, 0x41, 0x40, 0x39, 0xf9, 0x6e, 0x4c, 0x83, 0xaf, 0xc9, 0x80, 0xab, 0x53, 0x6b, 0xfc, + 0x60, 0xd4, 0x72, 0xfc, 0x11, 0x61, 0x9c, 0xd0, 0xd6, 0xc0, 0x75, 0x88, 0x1f, 0xab, 0x9a, 0x63, + 0x1a, 0xf0, 0x00, 0x15, 0xe5, 0xa9, 0x7e, 0x31, 0xc5, 0x34, 0x0a, 0x46, 0x41, 0x4b, 0xa8, 0x0f, + 0xc2, 0xfb, 0xe2, 0x24, 0x0e, 0xe2, 0x97, 0x34, 0xc3, 0x7f, 0xe8, 0x50, 0xf9, 0x9c, 0x3a, 0x9c, + 0x98, 0xe4, 0x61, 0x48, 0x18, 0x47, 0x7b, 0x00, 0xdc, 0xf1, 0x08, 0x23, 0xd4, 0x21, 0x6c, 0x43, + 0x3f, 0x97, 0xdb, 0x2c, 0x6f, 0xa1, 0xa6, 0xfa, 0x54, 0xdf, 0xf1, 0x48, 0x4f, 0x68, 0xb6, 0xeb, + 0x8f, 0x9e, 0x36, 0xb4, 0x3f, 0x9f, 0x36, 0xd0, 0x3e, 0x25, 0xb6, 0xeb, 0x06, 0x83, 0x7e, 0x62, + 0x65, 0xa6, 0x18, 0xd0, 0xfb, 0x50, 0xec, 0x05, 0x21, 0x1d, 0x90, 0x8d, 0xa5, 0x73, 0xfa, 0x66, + 0x75, 0xab, 0x11, 0x73, 0xa5, 0xbf, 0xda, 0x94, 0x90, 0x8e, 0x1f, 0x7a, 0x66, 0x91, 0x89, 0xdf, + 0xb8, 0x01, 0x30, 0x95, 0xa2, 0x65, 0xc8, 0xb5, 0xf7, 0xbb, 0x35, 0x0d, 0xad, 0x40, 0xde, 0xbc, + 0xb7, 0xd3, 0xa9, 0xe9, 0xf8, 0x04, 0xac, 0x2a, 0x0e, 0x36, 0x0e, 0x7c, 0x46, 0xf0, 0x35, 0x28, + 0x9b, 0xc4, 0x1e, 0xc6, 0x91, 0x34, 0x61, 0xf9, 0x61, 0x98, 0x0e, 0xe3, 0x64, 0xfc, 0xe9, 0xbb, + 0x21, 0xa1, 0x47, 0x0a, 0x66, 0xc6, 0x20, 0xfc, 0x31, 0x54, 0xa4, 0xb9, 0xa4, 0x43, 0x2d, 0x58, + 0xa6, 0x84, 0x85, 0x2e, 0x8f, 0xed, 0x4f, 0xcd, 0xd8, 0x4b, 0x9c, 0x19, 0xa3, 0xf0, 0x4f, 0x3a, + 0x54, 0xd2, 0xd4, 0xe8, 0x1d, 0x40, 0x8c, 0xdb, 0x94, 0x5b, 0x22, 0x1f, 0xdc, 0xf6, 0xc6, 0x96, + 0x17, 0x91, 0xe9, 0x9b, 0x39, 0xb3, 0x26, 0x34, 0xfd, 0x58, 0xb1, 0xcb, 0xd0, 0x26, 0xd4, 0x88, + 0x3f, 0xcc, 0x62, 0x97, 0x04, 0xb6, 0x4a, 0xfc, 0x61, 0x1a, 0x79, 0x09, 0x56, 0x3c, 0x9b, 0x0f, + 0x0e, 0x09, 0x65, 0x1b, 0xb9, 0x6c, 0x68, 0x3b, 0xf6, 0x01, 0x71, 0x77, 0xa5, 0xd2, 0x4c, 0x50, + 0xb8, 0x0b, 0xab, 0x19, 0xa7, 0xd1, 0xd5, 0x63, 0x96, 0x39, 0x1f, 0x95, 0x39, 0x5d, 0x50, 0xdc, + 0x87, 0x75, 0x41, 0xd5, 0xe3, 0x94, 0xd8, 0x5e, 0x42, 0x78, 0x6d, 0x0e, 0xe1, 0xe9, 0x97, 0x09, + 0xaf, 0x1f, 0x86, 0xfe, 0x83, 0x39, 0xac, 0x97, 0x01, 0x09, 0xd7, 0x3f, 0xb3, 0xdd, 0x90, 0xb0, + 0x38, 0x81, 0xaf, 0x03, 0xb8, 0x91, 0xd4, 0xf2, 0x6d, 0x8f, 0x88, 0xc4, 0x95, 0xcc, 0x92, 0x90, + 0xec, 0xd9, 0x1e, 0xc1, 0x57, 0x61, 0x3d, 0x63, 0xa4, 0x5c, 0x79, 0x03, 0x2a, 0xd2, 0xea, 0x1b, + 0x21, 0x17, 0xce, 0x94, 0xcc, 0xb2, 0x3b, 0x85, 0xe2, 0x75, 0x58, 0xdb, 0x89, 0x69, 0xe2, 0xaf, + 0xe1, 0x2b, 0xca, 0x07, 0x25, 0x54, 0x6c, 0x0d, 0x28, 0x4f, 0x7d, 0x88, 0xc9, 0x20, 0x71, 0x82, + 0x61, 0x04, 0xb5, 0x7b, 0x8c, 0xd0, 0x1e, 0xb7, 0x79, 0x42, 0xf5, 0xbb, 0x0e, 0x6b, 0x29, 0xa1, + 0xa2, 0x3a, 0x0f, 0x55, 0x39, 0xc3, 0x4e, 0xe0, 0x5b, 0xd4, 0xe6, 0x32, 0x24, 0xdd, 0x5c, 0x4d, + 0xa4, 0xa6, 0xcd, 0x49, 0x14, 0xb5, 0x1f, 0x7a, 0x96, 0x4a, 0x65, 0xd4, 0x02, 0x79, 0xb3, 0xe4, + 0x87, 0x9e, 0xcc, 0x60, 0xd4, 0x55, 0xf6, 0xd8, 0xb1, 0x66, 0x98, 0x72, 0x82, 0xa9, 0x66, 0x8f, + 0x9d, 0x6e, 0x86, 0xac, 0x09, 0xeb, 0x34, 0x74, 0xc9, 0x2c, 0x3c, 0x2f, 0xe0, 0x6b, 0x91, 0x2a, + 0x83, 0xc7, 0x5f, 0xc1, 0x7a, 0xe4, 0x78, 0xf7, 0x46, 0xd6, 0xf5, 0xd3, 0xb0, 0x1c, 0x32, 0x42, + 0x2d, 0x67, 0xa8, 0xca, 0x50, 0x8c, 0x8e, 0xdd, 0x21, 0xba, 0x08, 0xf9, 0xa1, 0xcd, 0x6d, 0xe1, + 0x66, 0x79, 0xeb, 0x4c, 0x5c, 0xf1, 0x97, 0x82, 0x37, 0x05, 0x0c, 0xdf, 0x02, 0x14, 0xa9, 0x58, + 0x96, 0xfd, 0x5d, 0x28, 0xb0, 0x48, 0xa0, 0xfa, 0xe6, 0x6c, 0x9a, 0x65, 0xc6, 0x13, 0x53, 0x22, + 0xf1, 0x6f, 0x3a, 0x18, 0xbb, 0x84, 0x53, 0x67, 0xc0, 0x6e, 0x06, 0x34, 0xdd, 0xf6, 0xec, 0x55, + 0x8f, 0xdf, 0x55, 0xa8, 0xc4, 0x83, 0x65, 0x31, 0xc2, 0xd5, 0x08, 0x9e, 0x9a, 0x37, 0x82, 0xcc, + 0x2c, 0xc7, 0xd0, 0x1e, 0xe1, 0xb8, 0x0b, 0x8d, 0x85, 0x3e, 0xab, 0x54, 0x5c, 0x80, 0xa2, 0x27, + 0x20, 0x2a, 0x17, 0xd5, 0x98, 0x56, 0x1a, 0x9a, 0x4a, 0x1b, 0xc5, 0x7f, 0x62, 0x66, 0xac, 0xa2, + 0x10, 0xee, 0xd3, 0xc0, 0xb3, 0xe2, 0x45, 0x31, 0xad, 0x56, 0x35, 0x92, 0x77, 0x95, 0xb8, 0x3b, + 0x4c, 0x97, 0x73, 0x29, 0x53, 0xce, 0x16, 0x14, 0x45, 0x6b, 0xc7, 0x17, 0xcb, 0x5a, 0x26, 0xaa, + 0x7d, 0xdb, 0xa1, 0x6a, 0x78, 0x15, 0x0c, 0xbd, 0x0d, 0xc5, 0x41, 0xf4, 0x71, 0xb6, 0x91, 0x17, + 0x06, 0xab, 0xb1, 0x41, 0x7a, 0xd2, 0x15, 0x04, 0xff, 0xa0, 0x43, 0x41, 0xba, 0xfa, 0xaa, 0x6a, + 0x53, 0x87, 0x15, 0xe2, 0x0f, 0x82, 0xa1, 0xe3, 0x8f, 0xc4, 0x48, 0x14, 0xcc, 0xe4, 0x8c, 0x90, + 0x6a, 0xd5, 0xa8, 0xf7, 0x2b, 0xaa, 0x1f, 0x37, 0xe0, 0xb5, 0x3e, 0xb5, 0x7d, 0x76, 0x9f, 0x50, + 0xe1, 0x58, 0x52, 0x08, 0xec, 0x01, 0x4c, 0xf3, 0x9b, 0xca, 0x8b, 0x7e, 0xbc, 0xbc, 0x34, 0x61, + 0x99, 0xd9, 0xde, 0xd8, 0x15, 0x13, 0x9c, 0x29, 0x64, 0x4f, 0x88, 0x15, 0x3c, 0x06, 0xe1, 0x5f, + 0x75, 0x28, 0x25, 0x5c, 0xe8, 0x0e, 0xe4, 0x93, 0x2b, 0xaf, 0xb2, 0xfd, 0xa1, 0xda, 0xb5, 0x97, + 0x8f, 0xf3, 0x4a, 0x08, 0xb9, 0xe3, 0xb6, 0xbe, 0x75, 0x28, 0x69, 0x6e, 0x1f, 0x71, 0xc2, 0x4c, + 0x41, 0x84, 0xee, 0x42, 0x41, 0xdc, 0x86, 0x22, 0x6d, 0xff, 0x93, 0x51, 0x32, 0xe1, 0x36, 0x14, + 0x65, 0x28, 0xe8, 0x64, 0x4c, 0x2e, 0xaf, 0x33, 0x79, 0x88, 0xae, 0xe1, 0x39, 0x05, 0x2b, 0xf3, + 0x69, 0xb5, 0x70, 0x1b, 0x56, 0x33, 0x53, 0x90, 0xd9, 0x6c, 0xfa, 0xb1, 0x36, 0xdb, 0x07, 0x50, + 0x94, 0x93, 0xf1, 0x9f, 0x4b, 0x84, 0x2d, 0xa8, 0xa4, 0x49, 0xd1, 0x79, 0xc8, 0xf3, 0xa3, 0xb1, + 0x8c, 0xa2, 0x3a, 0x35, 0x17, 0xea, 0xfe, 0xd1, 0x98, 0x98, 0x42, 0x1d, 0xb5, 0x91, 0xa8, 0x8d, + 0x1c, 0x1c, 0x99, 0xde, 0x24, 0x03, 0x39, 0x21, 0x94, 0x87, 0xb7, 0x3e, 0x85, 0x52, 0x62, 0x8c, + 0x4a, 0x50, 0xe8, 0xdc, 0xbd, 0xd7, 0xde, 0xa9, 0x69, 0x68, 0x15, 0x4a, 0x7b, 0x77, 0xfa, 0x96, + 0x3c, 0xea, 0xe8, 0x04, 0x94, 0xcd, 0xce, 0xad, 0xce, 0x17, 0xd6, 0x6e, 0xbb, 0x7f, 0xfd, 0x76, + 0x6d, 0x09, 0x21, 0xa8, 0x4a, 0xc1, 0xde, 0x1d, 0x25, 0xcb, 0x6d, 0xfd, 0x5c, 0x80, 0x95, 0x78, + 0x80, 0xd1, 0x15, 0xc8, 0xef, 0x87, 0xec, 0x10, 0x9d, 0x9c, 0xf7, 0x98, 0xaa, 0x9f, 0x9a, 0x91, + 0xaa, 0x86, 0xd6, 0xd0, 0x7b, 0x50, 0x10, 0xab, 0x1b, 0xcd, 0x7d, 0x09, 0xd5, 0xe7, 0xbf, 0x6f, + 0xb0, 0x86, 0x6e, 0x40, 0x39, 0xb5, 0xf2, 0x17, 0x58, 0x9f, 0xcd, 0x48, 0xb3, 0xaf, 0x03, 0xac, + 0x5d, 0xd2, 0xd1, 0x6d, 0x28, 0xa7, 0xb6, 0x35, 0xaa, 0x67, 0xca, 0x93, 0xd9, 0xfb, 0x53, 0xae, + 0x39, 0xeb, 0x1d, 0x6b, 0xa8, 0x03, 0x30, 0x5d, 0xd4, 0xe8, 0x4c, 0x06, 0x9c, 0xde, 0xe8, 0xf5, + 0xfa, 0x3c, 0x55, 0x42, 0xb3, 0x0d, 0xa5, 0x64, 0x4d, 0xa1, 0x8d, 0x39, 0x9b, 0x4b, 0x92, 0x2c, + 0xde, 0x69, 0x58, 0x43, 0x37, 0xa1, 0xd2, 0x76, 0xdd, 0xe3, 0xd0, 0xd4, 0xd3, 0x1a, 0x36, 0xcb, + 0xe3, 0xc2, 0xe9, 0x05, 0x9b, 0x01, 0x5d, 0xc8, 0x6e, 0x80, 0x45, 0xeb, 0xae, 0xfe, 0xe6, 0xbf, + 0xe2, 0x92, 0xaf, 0xed, 0x42, 0x35, 0x7b, 0xeb, 0xa1, 0x45, 0x4f, 0xb5, 0xba, 0x91, 0x28, 0xe6, + 0x5f, 0x93, 0xda, 0xa6, 0xbe, 0xfd, 0xd1, 0xe3, 0x67, 0x86, 0xf6, 0xe4, 0x99, 0xa1, 0xbd, 0x78, + 0x66, 0xe8, 0xdf, 0x4f, 0x0c, 0xfd, 0x97, 0x89, 0xa1, 0x3f, 0x9a, 0x18, 0xfa, 0xe3, 0x89, 0xa1, + 0xff, 0x35, 0x31, 0xf4, 0xbf, 0x27, 0x86, 0xf6, 0x62, 0x62, 0xe8, 0x3f, 0x3e, 0x37, 0xb4, 0xc7, + 0xcf, 0x0d, 0xed, 0xc9, 0x73, 0x43, 0xfb, 0xb2, 0x28, 0xff, 0xc6, 0x1c, 0x14, 0xc5, 0x3f, 0x91, + 0xcb, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x33, 0xbf, 0x53, 0xf9, 0x04, 0x0d, 0x00, 0x00, } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto index 0900898ccc4e..02e17e35b8a1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto @@ -15,6 +15,7 @@ service Ingester { rpc QueryStream(QueryRequest) returns (stream QueryStreamResponse) {}; rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse) {}; + rpc LabelNames(LabelNamesRequest) returns (LabelNamesResponse) {}; rpc UserStats(UserStatsRequest) returns (UserStatsResponse) {}; rpc AllUserStats(UserStatsRequest) returns (UsersStatsResponse) {}; rpc MetricsForLabelMatchers(MetricsForLabelMatchersRequest) returns (MetricsForLabelMatchersResponse) {}; @@ -65,6 +66,13 @@ message LabelValuesResponse { repeated string label_values = 1; } +message LabelNamesRequest { +} + +message LabelNamesResponse { + repeated string label_names = 1; +} + message UserStatsRequest {} message UserStatsResponse { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go index 84ab8d2f3d04..860804df0bd7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go @@ -78,7 +78,7 @@ func (ii *InvertedIndex) LabelValues(name model.LabelName) model.LabelValues { } // Delete a fingerprint with the given label pairs. -func (ii *InvertedIndex) Delete(labels []client.LabelPair, fp model.Fingerprint) { +func (ii *InvertedIndex) Delete(labels labels.Labels, fp model.Fingerprint) { shard := &ii.shards[util.HashFP(fp)%indexShards] shard.delete(labels, fp) } @@ -187,7 +187,7 @@ func (shard *indexShard) labelValues(name model.LabelName) model.LabelValues { return results } -func (shard *indexShard) delete(labels []client.LabelPair, fp model.Fingerprint) { +func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) { shard.mtx.Lock() defer shard.mtx.Unlock() diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go b/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go new file mode 100644 index 000000000000..fcd447aae385 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/batch.go @@ -0,0 +1,110 @@ +package ring + +import ( + "context" + "sync/atomic" +) + +type batchTracker struct { + rpcsPending int32 + rpcsFailed int32 + done chan struct{} + err chan error +} + +type ingester struct { + desc IngesterDesc + itemTrackers []*itemTracker + indexes []int +} + +type itemTracker struct { + minSuccess int + maxFailures int + succeeded int32 + failed int32 +} + +// DoBatch request against a set of keys in the ring, handling replication and +// failures. For example if we want to write N items where they may all +// hit different ingesters, and we want them all replicated R ways with +// quorum writes, we track the relationship between batch RPCs and the items +// within them. +// +// Callback is passed the ingester to target, and the indexes of the keys +// to send to that ingester. +// +// Not implemented as a method on Ring so we can test separately. +func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error) error { + replicationSets, err := r.BatchGet(keys, Write) + if err != nil { + return err + } + + itemTrackers := make([]itemTracker, len(keys)) + ingesters := map[string]ingester{} + for i, replicationSet := range replicationSets { + itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors + itemTrackers[i].maxFailures = replicationSet.MaxErrors + + for _, desc := range replicationSet.Ingesters { + curr := ingesters[desc.Addr] + ingesters[desc.Addr] = ingester{ + desc: desc, + itemTrackers: append(curr.itemTrackers, &itemTrackers[i]), + indexes: append(curr.indexes, i), + } + } + } + + tracker := batchTracker{ + rpcsPending: int32(len(itemTrackers)), + done: make(chan struct{}), + err: make(chan error), + } + + for _, i := range ingesters { + go func(i ingester) { + err := callback(i.desc, i.indexes) + tracker.record(i.itemTrackers, err) + }(i) + } + + select { + case err := <-tracker.err: + return err + case <-tracker.done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { + // If we succeed, decrement each sample's pending count by one. If we reach + // the required number of successful puts on this sample, then decrement the + // number of pending samples by one. If we successfully push all samples to + // min success ingesters, wake up the waiting rpc so it can return early. + // Similarly, track the number of errors, and if it exceeds maxFailures + // shortcut the waiting rpc. + // + // The use of atomic increments here guarantees only a single sendSamples + // goroutine will write to either channel. + for i := range sampleTrackers { + if err != nil { + if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) { + continue + } + if atomic.AddInt32(&b.rpcsFailed, 1) == 1 { + b.err <- err + } + } else { + if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) { + continue + } + if atomic.AddInt32(&b.rpcsPending, -1) == 0 { + b.done <- struct{}{} + } + } + } +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go b/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go index 70e11ec066ef..ef31aac07335 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go @@ -153,30 +153,38 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life return l, nil } -// IsReady is used to rate limit the number of ingesters that can be coming or +// CheckReady is used to rate limit the number of ingesters that can be coming or // going at any one time, by only returning true if all ingesters are active. -func (i *Lifecycler) IsReady(ctx context.Context) bool { +// The state latches: once we have gone ready we don't go un-ready +func (i *Lifecycler) CheckReady(ctx context.Context) error { i.readyLock.Lock() defer i.readyLock.Unlock() if i.ready { - return true + return nil } // Ingester always take at least minReadyDuration to become ready to work // around race conditions with ingesters exiting and updating the ring if time.Now().Sub(i.startTime) < i.cfg.MinReadyDuration { - return false + return fmt.Errorf("waiting for %v after startup", i.cfg.MinReadyDuration) } ringDesc, err := i.KVStore.Get(ctx, ConsulKey) if err != nil { level.Error(util.Logger).Log("msg", "error talking to consul", "err", err) - return false + return fmt.Errorf("error talking to consul: %s", err) } - i.ready = i.ready || ringDesc.(*Desc).Ready(i.cfg.RingConfig.HeartbeatTimeout) - return i.ready + if len(i.getTokens()) == 0 { + return fmt.Errorf("this ingester owns no tokens") + } + if err := ringDesc.(*Desc).Ready(i.cfg.RingConfig.HeartbeatTimeout); err != nil { + return err + } + + i.ready = true + return nil } // GetState returns the state of this ingester. @@ -365,7 +373,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { tokens, _ := ringDesc.TokensFor(i.ID) i.setTokens(tokens) - level.Info(util.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", tokens) + level.Info(util.Logger).Log("msg", "existing entry found in ring", "state", i.GetState(), "tokens", len(tokens)) return ringDesc, true, nil }) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/model.go b/vendor/github.com/cortexproject/cortex/pkg/ring/model.go index e12968018974..5644acaf51d7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/model.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/model.go @@ -1,6 +1,7 @@ package ring import ( + "fmt" "sort" "time" @@ -72,11 +73,17 @@ func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 { if normaliseTokens { + // If the ingester we are claiming from is normalising, get its tokens then erase them from the ring. + if fromDesc, found := d.Ingesters[from]; found { + result = fromDesc.Tokens + fromDesc.Tokens = nil + d.Ingesters[from] = fromDesc + } + // If we are storing the tokens in a normalise form, we need to deal with // the migration from denormalised by removing the tokens from the tokens // list. - result = d.Ingesters[from].Tokens - + // When all ingesters are in normalised mode, d.Tokens is empty here for i := 0; i < len(d.Tokens); { if d.Tokens[i].Ingester == from { result = append(result, d.Tokens[i].Token) @@ -115,19 +122,22 @@ func (d *Desc) FindIngestersByState(state IngesterState) []IngesterDesc { return result } -// Ready is true when all ingesters are active and healthy. -func (d *Desc) Ready(heartbeatTimeout time.Duration) bool { +// Ready returns no error when all ingesters are active and healthy. +func (d *Desc) Ready(heartbeatTimeout time.Duration) error { numTokens := len(d.Tokens) - for _, ingester := range d.Ingesters { + for id, ingester := range d.Ingesters { if time.Now().Sub(time.Unix(ingester.Timestamp, 0)) > heartbeatTimeout { - return false + return fmt.Errorf("ingester %s past heartbeat timeout", id) } else if ingester.State != ACTIVE { - return false + return fmt.Errorf("ingester %s in state %v", id, ingester.State) } numTokens += len(ingester.Tokens) } - return numTokens > 0 + if numTokens == 0 { + return fmt.Errorf("Not ready: no tokens in ring") + } + return nil } // TokensFor partitions the tokens into those for the given ID, and those for others. diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/backoff.go b/vendor/github.com/cortexproject/cortex/pkg/util/backoff.go index eeda1c1eba4e..b523b42238ce 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/backoff.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/backoff.go @@ -2,6 +2,7 @@ package util import ( "context" + "flag" "fmt" "math/rand" "time" @@ -14,6 +15,13 @@ type BackoffConfig struct { MaxRetries int // give up after this many; zero means infinite retries } +// RegisterFlags for BackoffConfig. +func (cfg *BackoffConfig) RegisterFlags(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.") + f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.") + f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.") +} + // Backoff implements exponential backoff with randomized wait times type Backoff struct { cfg BackoffConfig diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/backoff_retry.go b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/backoff_retry.go new file mode 100644 index 000000000000..321ffd0924b3 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/backoff_retry.go @@ -0,0 +1,30 @@ +package grpcclient + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "github.com/cortexproject/cortex/pkg/util" +) + +// NewBackoffRetry gRPC middleware. +func NewBackoffRetry(cfg util.BackoffConfig) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + backoff := util.NewBackoff(ctx, cfg) + for backoff.Ongoing() { + err := invoker(ctx, method, req, reply, cc, opts...) + if err == nil { + return nil + } + + if grpc.Code(err) != codes.ResourceExhausted { + return err + } + + backoff.Wait() + } + return backoff.Err() + } +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go index 01a1200af5da..4db8ea94d870 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/grpcclient.go @@ -3,14 +3,21 @@ package grpcclient import ( "flag" + "github.com/cortexproject/cortex/pkg/util" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "google.golang.org/grpc" ) // Config for a gRPC client. type Config struct { - MaxRecvMsgSize int `yaml:"max_recv_msg_size"` - MaxSendMsgSize int `yaml:"max_send_msg_size"` - UseGzipCompression bool `yaml:"use_gzip_compression"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size"` + MaxSendMsgSize int `yaml:"max_send_msg_size"` + UseGzipCompression bool `yaml:"use_gzip_compression"` + RateLimit float64 `yaml:"rate_limit"` + RateLimitBurst int `yaml:"rate_limit_burst"` + + BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` + BackoffConfig util.BackoffConfig `yaml:"backoff_config"` } // RegisterFlags registers flags. @@ -18,6 +25,11 @@ func (cfg *Config) RegisterFlags(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).") f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).") f.BoolVar(&cfg.UseGzipCompression, prefix+".grpc-use-gzip-compression", false, "Use compression when sending messages.") + f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.") + f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.") + f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.") + + cfg.BackoffConfig.RegisterFlags(prefix, f) } // CallOptions returns the config in terms of CallOptions. @@ -32,6 +44,18 @@ func (cfg *Config) CallOptions() []grpc.CallOption { } // DialOption returns the config as a grpc.DialOptions. -func (cfg *Config) DialOption() grpc.DialOption { - return grpc.WithDefaultCallOptions(cfg.CallOptions()...) +func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) []grpc.DialOption { + if cfg.BackoffOnRatelimits { + unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...) + } + + if cfg.RateLimit > 0 { + unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...) + } + + return []grpc.DialOption{ + grpc.WithDefaultCallOptions(cfg.CallOptions()...), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(unaryClientInterceptors...)), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(streamClientInterceptors...)), + } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/ratelimit.go b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/ratelimit.go new file mode 100644 index 000000000000..d2432bf52490 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/util/grpcclient/ratelimit.go @@ -0,0 +1,21 @@ +package grpcclient + +import ( + "context" + + "golang.org/x/time/rate" + "google.golang.org/grpc" +) + +// NewRateLimiter creates a UnaryClientInterceptor for client side rate limiting. +func NewRateLimiter(cfg *Config) grpc.UnaryClientInterceptor { + burst := cfg.RateLimitBurst + if burst == 0 { + burst = int(cfg.RateLimit) + } + limiter := rate.NewLimiter(rate.Limit(cfg.RateLimit), burst) + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + limiter.Wait(ctx) + return invoker(ctx, method, req, reply, cc, opts...) + } +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/log.go b/vendor/github.com/cortexproject/cortex/pkg/util/log.go index 76e5ac4130e9..6460e2dd1475 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/log.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/log.go @@ -1,6 +1,7 @@ package util import ( + "fmt" "os" "github.com/go-kit/kit/log" @@ -122,3 +123,16 @@ func WithTraceID(traceID string, l log.Logger) log.Logger { // See note in WithContext. return log.With(l, "trace_id", traceID) } + +// CheckFatal prints an error and exits with error code 1 if err is non-nil +func CheckFatal(location string, err error) { + if err != nil { + logger := level.Error(Logger) + if location != "" { + logger = log.With(logger, "msg", "error "+location) + } + // %+v gets the stack trace from errors using github.com/pkg/errors + logger.Log("err", fmt.Sprintf("%+v", err)) + os.Exit(1) + } +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go b/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go index 2718638cdc7c..750ceb25b910 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/validation/validate.go @@ -78,7 +78,7 @@ func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error numLabelNames := len(ls) if numLabelNames > cfg.MaxLabelNamesPerSeries(userID) { DiscardedSamples.WithLabelValues(maxLabelNamesPerSeries, userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, errTooManyLabels, metricName, numLabelNames, cfg.MaxLabelNamesPerSeries(userID)) + return httpgrpc.Errorf(http.StatusBadRequest, errTooManyLabels, client.FromLabelPairs(ls).String(), numLabelNames, cfg.MaxLabelNamesPerSeries(userID)) } maxLabelNameLength := cfg.MaxLabelNameLength(userID) @@ -102,7 +102,7 @@ func (cfg *Overrides) ValidateLabels(userID string, ls []client.LabelPair) error } if errTemplate != "" { DiscardedSamples.WithLabelValues(reason, userID).Inc() - return httpgrpc.Errorf(http.StatusBadRequest, errTemplate, cause, metricName) + return httpgrpc.Errorf(http.StatusBadRequest, errTemplate, cause, client.FromLabelPairs(ls).String()) } } return nil diff --git a/vendor/github.com/uber/jaeger-lib/metrics/local.go b/vendor/github.com/uber/jaeger-lib/metrics/local.go index 217d30600a0b..8c3624849cc6 100644 --- a/vendor/github.com/uber/jaeger-lib/metrics/local.go +++ b/vendor/github.com/uber/jaeger-lib/metrics/local.go @@ -286,11 +286,6 @@ func (l *LocalFactory) newNamespace(name string) string { if l.namespace == "" { return name } - - if name == "" { - return l.namespace - } - return l.namespace + "." + name } diff --git a/vendor/github.com/weaveworks/common/instrument/instrument.go b/vendor/github.com/weaveworks/common/instrument/instrument.go index 2994544c365e..2b6168d48b15 100644 --- a/vendor/github.com/weaveworks/common/instrument/instrument.go +++ b/vendor/github.com/weaveworks/common/instrument/instrument.go @@ -152,7 +152,9 @@ func CollectedRequest(ctx context.Context, method string, col Collector, toStatu col.After(method, toStatusCode(err), start) if err != nil { - ext.Error.Set(sp, true) + if err != context.Canceled { + ext.Error.Set(sp, true) + } sp.LogFields(otlog.Error(err)) } sp.Finish() diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index 09325fa2b301..dce01f4b7cd7 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -38,15 +38,15 @@ type Config struct { HTTPServerWriteTimeout time.Duration `yaml:"http_server_write_timeout"` HTTPServerIdleTimeout time.Duration `yaml:"http_server_idle_timeout"` - GPRCServerMaxRecvMsgSize int `yaml:"grpc_server_max_recv_msg_size"` - GRPCServerMaxSendMsgSize int `yaml:"grpc_server_max_send_msg_size"` - GPRCServerMaxConcurrentStreams uint `yaml:"grpc_server_max_concurrent_streams"` - GRPCOptions []grpc.ServerOption `yaml:"-"` GRPCMiddleware []grpc.UnaryServerInterceptor `yaml:"-"` GRPCStreamMiddleware []grpc.StreamServerInterceptor `yaml:"-"` HTTPMiddleware []middleware.Interface `yaml:"-"` + GPRCServerMaxRecvMsgSize int `yaml:"grpc_server_max_recv_msg_size"` + GRPCServerMaxSendMsgSize int `yaml:"grpc_server_max_send_msg_size"` + GPRCServerMaxConcurrentStreams uint `yaml:"grpc_server_max_concurrent_streams"` + LogLevel logging.Level `yaml:"log_level"` Log logging.Interface `yaml:"-"`