From 6fff135b31df2f9a9b617a816600c15695f59853 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Thu, 23 Apr 2020 10:20:55 -0400 Subject: [PATCH 1/3] Update cortex vendoring to latest master checksum `36496a074bc4530fb1d7c74136cab1566fbd9583` Signed-off-by: Ed Welch --- go.mod | 2 +- go.sum | 4 +- .../pkg/chunk/aws/dynamodb_storage_client.go | 4 +- .../cortex/pkg/chunk/aws/s3_storage_client.go | 13 ++- .../pkg/chunk/azure/blob_storage_client.go | 16 ++- .../pkg/chunk/cassandra/storage_client.go | 4 +- .../cortex/pkg/chunk/chunk_store.go | 4 +- .../pkg/chunk/gcp/bigtable_index_client.go | 4 +- .../cortex/pkg/chunk/gcp/gcs_object_client.go | 13 ++- .../pkg/chunk/inmemory_storage_client.go | 4 +- .../pkg/chunk/local/boltdb_index_client.go | 6 +- .../pkg/chunk/local/fs_object_client.go | 19 +-- .../cortexproject/cortex/pkg/chunk/opts.go | 4 +- .../cortex/pkg/chunk/purger/purger.go | 64 +++++++++- .../pkg/chunk/purger/request_handler.go | 29 ++++- .../cortex/pkg/chunk/series_store.go | 6 + .../cortex/pkg/chunk/storage/factory.go | 7 +- .../cortex/pkg/chunk/storage/metrics.go | 110 ++++++++++++++++++ .../cortex/pkg/chunk/storage_client.go | 6 +- .../cortex/pkg/chunk/util/util.go | 21 ++-- .../pkg/distributor/distributor_ring.go | 2 +- .../pkg/distributor/ingester_client_pool.go | 2 +- .../cortex/pkg/ingester/index/index.go | 8 ++ .../cortex/pkg/querier/frontend/frontend.go | 17 ++- .../pkg/querier/queryrange/roundtrip.go | 24 +++- .../cortex/pkg/querier/series/series_set.go | 11 ++ .../cortex/pkg/ring/client/pool.go | 29 +++-- .../pkg/ring/client/ring_service_discovery.go | 20 ++++ .../cortex/pkg/ring/kv/consul/client.go | 16 +++ .../cortex/pkg/ring/lifecycler.go | 6 +- .../cortex/pkg/ring/replication_strategy.go | 6 +- .../cortexproject/cortex/pkg/ring/ring.go | 21 ++-- .../cortex/pkg/util/test/poll.go | 2 +- vendor/modules.txt | 2 +- 34 files changed, 411 insertions(+), 95 deletions(-) create mode 100644 vendor/github.com/cortexproject/cortex/pkg/chunk/storage/metrics.go create mode 100644 vendor/github.com/cortexproject/cortex/pkg/ring/client/ring_service_discovery.go diff --git a/go.mod b/go.mod index f03c7d95fc44..832c749d4230 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/containerd/containerd v1.3.2 // indirect github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e - github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8 + github.com/cortexproject/cortex v1.0.1-0.20200423101820-36496a074bc4 github.com/davecgh/go-spew v1.1.1 github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/docker v0.7.3-0.20190817195342-4760db040282 diff --git a/go.sum b/go.sum index e19f081107ad..8aa8cf368421 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995/go.mod h1:3Xa3DjJxtpXqxcMGdk850lcIRb81M0fyY1MQ6udY134= -github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8 h1:A7nGtA5pj10j5bwbLPqf5C+WAhVzFaOt1c/uen6202o= -github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8/go.mod h1:5NXU+UpV8NW6I3teskVmxn45xcq4+IbtSOINfRf+jds= +github.com/cortexproject/cortex v1.0.1-0.20200423101820-36496a074bc4 h1:SNBpM6lX8ZjDsSrQWbxP1FRO8KXirnRwFvtcLA8+DCc= +github.com/cortexproject/cortex v1.0.1-0.20200423101820-36496a074bc4/go.mod h1:S2BogfHdb0YCo5Zly3vOEsqzsE7YXdumHBMRJkgDZm4= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= github.com/cznic/fileutil v0.0.0-20180108211300-6a051e75936f/go.mod h1:8S58EK26zhXSxzv7NQFpnliaOQsmDUxvoQO3rt154Vg= diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go index d4395038c4ad..a39a0ef42205 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_storage_client.go @@ -287,7 +287,7 @@ func (a dynamoDBStorageClient) QueryPages(ctx context.Context, queries []chunk.I return chunk_util.DoParallelQueries(ctx, a.query, queries, callback) } -func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { input := &dynamodb.QueryInput{ TableName: aws.String(query.TableName), KeyConditions: map[string]*dynamodb.Condition{ @@ -341,7 +341,7 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery return err } - if !callback(response) { + if !callback(query, response) { if err != nil { return fmt.Errorf("QueryPages error: table=%v, err=%v", *input.TableName, page.Error()) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go index f9fc0c540114..9744d3bf0da3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/aws/s3_storage_client.go @@ -168,9 +168,10 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object }) } -// List only objects from the store non-recursively -func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { +// List objects and common-prefixes i.e synthetic directories from the store non-recursively +func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { var storageObjects []chunk.StorageObject + var commonPrefixes []chunk.StorageCommonPrefix for i := range a.bucketNames { err := instrument.CollectedRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { @@ -193,6 +194,10 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.Stora }) } + for _, commonPrefix := range output.CommonPrefixes { + commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(commonPrefix.String())) + } + if !*output.IsTruncated { // No more results to fetch break @@ -205,9 +210,9 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix string) ([]chunk.Stora }) if err != nil { - return nil, err + return nil, nil, err } } - return storageObjects, nil + return storageObjects, commonPrefixes, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/azure/blob_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/azure/blob_storage_client.go index e38ca50e572b..fd856302ed47 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/azure/blob_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/azure/blob_storage_client.go @@ -167,18 +167,19 @@ func (b *BlobStorage) newPipeline() (pipeline.Pipeline, error) { }), nil } -// List only objects from the store non-recursively -func (b *BlobStorage) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { +// List objects and common-prefixes i.e synthetic directories from the store non-recursively +func (b *BlobStorage) List(ctx context.Context, prefix string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { var storageObjects []chunk.StorageObject + var commonPrefixes []chunk.StorageCommonPrefix for marker := (azblob.Marker{}); marker.NotDone(); { if ctx.Err() != nil { - return nil, ctx.Err() + return nil, nil, ctx.Err() } listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, b.delimiter, azblob.ListBlobsSegmentOptions{Prefix: prefix}) if err != nil { - return nil, err + return nil, nil, err } marker = listBlob.NextMarker @@ -190,9 +191,14 @@ func (b *BlobStorage) List(ctx context.Context, prefix string) ([]chunk.StorageO ModifiedAt: blobInfo.Properties.LastModified, }) } + + // Process the BlobPrefixes so called commonPrefixes or synthetic directories in the listed synthetic directory + for _, blobPrefix := range listBlob.Segment.BlobPrefixes { + commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(blobPrefix.Name)) + } } - return storageObjects, nil + return storageObjects, commonPrefixes, nil } func (b *BlobStorage) DeleteObject(ctx context.Context, chunkID string) error { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go index ce2b662cd9fc..34480c595b6b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go @@ -276,7 +276,7 @@ func (s *StorageClient) QueryPages(ctx context.Context, queries []chunk.IndexQue return util.DoParallelQueries(ctx, s.query, queries, callback) } -func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callback util.Callback) error { var q *gocql.Query switch { @@ -313,7 +313,7 @@ func (s *StorageClient) query(ctx context.Context, query chunk.IndexQuery, callb if err := scanner.Scan(&b.rangeValue, &b.value); err != nil { return errors.WithStack(err) } - if !callback(b) { + if !callback(query, b) { return nil } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index 02cc7da879ba..6e904f6d85da 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -441,8 +441,8 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro } else if matcher.Type == labels.MatchEqual { labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value) - } else if matcher.Type == labels.MatchRegexp && len(findSetMatches(matcher.Value)) > 0 { - set := findSetMatches(matcher.Value) + } else if matcher.Type == labels.MatchRegexp && len(FindSetMatches(matcher.Value)) > 0 { + set := FindSetMatches(matcher.Value) for _, v := range set { var qs []IndexQuery qs, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, v) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go index 9e2628825e6b..f5822bdc5a7e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go @@ -317,7 +317,7 @@ func (s *storageClientV1) QueryPages(ctx context.Context, queries []chunk.IndexQ return chunk_util.DoParallelQueries(ctx, s.query, queries, callback) } -func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { +func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { const null = string('\xff') sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue}) @@ -346,7 +346,7 @@ func (s *storageClientV1) query(ctx context.Context, query chunk.IndexQuery, cal err := table.ReadRows(ctx, rowRange, func(r bigtable.Row) bool { if query.ValueEqual == nil || bytes.Equal(r[columnFamily][0].Value, query.ValueEqual) { - return callback(&rowBatch{ + return callback(query, &rowBatch{ row: r, }) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go index 49c497c7367d..2029e9e11b26 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/gcs_object_client.go @@ -107,14 +107,15 @@ func (s *GCSObjectClient) PutObject(ctx context.Context, objectKey string, objec return nil } -// List only objects from the store non-recursively -func (s *GCSObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { +// List objects and common-prefixes i.e synthetic directories from the store non-recursively +func (s *GCSObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { var storageObjects []chunk.StorageObject + var commonPrefixes []chunk.StorageCommonPrefix iter := s.bucket.Objects(ctx, &storage.Query{Prefix: prefix, Delimiter: s.delimiter}) for { if ctx.Err() != nil { - return nil, ctx.Err() + return nil, nil, ctx.Err() } attr, err := iter.Next() @@ -122,12 +123,12 @@ func (s *GCSObjectClient) List(ctx context.Context, prefix string) ([]chunk.Stor if err == iterator.Done { break } - return nil, err + return nil, nil, err } // When doing query with Delimiter, Prefix is the only field set for entries which represent synthetic "directory entries". - // We do not want to consider those entries since we are doing only non-recursive listing of objects for now. if attr.Name == "" { + commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(attr.Prefix)) continue } @@ -137,7 +138,7 @@ func (s *GCSObjectClient) List(ctx context.Context, prefix string) ([]chunk.Stor }) } - return storageObjects, nil + return storageObjects, commonPrefixes, nil } // DeleteObject deletes the specified object key from the configured GCS bucket. If the diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go index 80c17678b928..5a3aed3b6db3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go @@ -374,7 +374,7 @@ func (m *MockStorage) DeleteObject(ctx context.Context, objectKey string) error return nil } -func (m *MockStorage) List(ctx context.Context, prefix string) ([]StorageObject, error) { +func (m *MockStorage) List(ctx context.Context, prefix string) ([]StorageObject, []StorageCommonPrefix, error) { m.mtx.RLock() defer m.mtx.RUnlock() @@ -384,7 +384,7 @@ func (m *MockStorage) List(ctx context.Context, prefix string) ([]StorageObject, storageObjects = append(storageObjects, StorageObject{Key: key}) } - return storageObjects, nil + return storageObjects, []StorageCommonPrefix{}, nil } type mockWriteBatch struct { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go index b9aadd338219..8dad54ee5e10 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go @@ -228,7 +228,7 @@ func (b *BoltIndexClient) QueryPages(ctx context.Context, queries []chunk.IndexQ return chunk_util.DoParallelQueries(ctx, b.query, queries, callback) } -func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error { +func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { db, err := b.GetDB(query.TableName, DBOperationRead) if err != nil { if err == ErrUnexistentBoltDB { @@ -241,7 +241,7 @@ func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, cal return b.QueryDB(ctx, db, query, callback) } -func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error { +func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { var start []byte if len(query.RangeValuePrefix) > 0 { start = []byte(query.HashValue + separator + string(query.RangeValuePrefix)) @@ -276,7 +276,7 @@ func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk batch.rangeValue = k[len(rowPrefix):] batch.value = v - if !callback(&batch) { + if !callback(query, &batch) { break } } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go index 40b081c78052..1cb066532a45 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/local/fs_object_client.go @@ -90,35 +90,40 @@ func (f *FSObjectClient) PutObject(ctx context.Context, objectKey string, object return fl.Close() } -// List only objects from the store non-recursively -func (f *FSObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { +// List objects and common-prefixes i.e directories from the store non-recursively +func (f *FSObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { var storageObjects []chunk.StorageObject + var commonPrefixes []chunk.StorageCommonPrefix + folderPath := filepath.Join(f.cfg.Directory, prefix) _, err := os.Stat(folderPath) if err != nil { if os.IsNotExist(err) { - return storageObjects, nil + return storageObjects, commonPrefixes, nil } - return nil, err + return nil, nil, err } filesInfo, err := ioutil.ReadDir(folderPath) if err != nil { - return nil, err + return nil, nil, err } for _, fileInfo := range filesInfo { + nameWithPrefix := filepath.Join(prefix, fileInfo.Name()) + if fileInfo.IsDir() { + commonPrefixes = append(commonPrefixes, chunk.StorageCommonPrefix(nameWithPrefix+chunk.DirDelim)) continue } storageObjects = append(storageObjects, chunk.StorageObject{ - Key: filepath.Join(prefix, fileInfo.Name()), + Key: nameWithPrefix, ModifiedAt: fileInfo.ModTime(), }) } - return storageObjects, nil + return storageObjects, commonPrefixes, nil } func (f *FSObjectClient) DeleteObject(ctx context.Context, objectKey string) error { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/opts.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/opts.go index 10d07b012f75..3384ecd7a430 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/opts.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/opts.go @@ -19,9 +19,9 @@ func init() { } } +// FindSetMatches returns list of values that can be equality matched on. // copied from Prometheus querier.go, removed check for Prometheus wrapper. -// Returns list of values that can regex matches. -func findSetMatches(pattern string) []string { +func FindSetMatches(pattern string) []string { escaped := false sets := []*strings.Builder{{}} for i := 0; i < len(pattern); i++ { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go index 9441d42e2380..709074a9f82b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go @@ -12,6 +12,8 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/user" @@ -24,6 +26,34 @@ import ( const millisecondPerDay = int64(24 * time.Hour / time.Millisecond) +type purgerMetrics struct { + deleteRequestsProcessedTotal *prometheus.CounterVec + deleteRequestsChunksSelectedTotal *prometheus.CounterVec + deleteRequestsProcessingFailures *prometheus.CounterVec +} + +func newPurgerMetrics(r prometheus.Registerer) *purgerMetrics { + m := purgerMetrics{} + + m.deleteRequestsProcessedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "purger_delete_requests_processed_total", + Help: "Number of delete requests processed per user", + }, []string{"user"}) + m.deleteRequestsChunksSelectedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "purger_delete_requests_chunks_selected_total", + Help: "Number of chunks selected while building delete plans per user", + }, []string{"user"}) + m.deleteRequestsProcessingFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "purger_delete_requests_processing_failures_total", + Help: "Number of delete requests processing failures per user", + }, []string{"user"}) + + return &m +} + type deleteRequestWithLogger struct { DeleteRequest logger log.Logger // logger is initialized with userID and requestID to add context to every log generated using this @@ -58,6 +88,7 @@ type DataPurger struct { deleteStore *DeleteStore chunkStore chunk.Store objectClient chunk.ObjectClient + metrics *purgerMetrics executePlansChan chan deleteRequestWithLogger workerJobChan chan workerJob @@ -74,7 +105,7 @@ type DataPurger struct { } // NewDataPurger creates a new DataPurger -func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient) (*DataPurger, error) { +func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (*DataPurger, error) { util.WarnExperimentalUse("Delete series API") dataPurger := DataPurger{ @@ -82,6 +113,7 @@ func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, deleteStore: deleteStore, chunkStore: chunkStore, objectClient: storageClient, + metrics: newPurgerMetrics(registerer), executePlansChan: make(chan deleteRequestWithLogger, 50), workerJobChan: make(chan workerJob, 50), inProcessRequestIDs: map[string]string{}, @@ -140,6 +172,7 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) { level.Error(job.logger).Log("msg", "error updating delete request status to process", "err", err) } + dp.metrics.deleteRequestsProcessedTotal.WithLabelValues(job.userID).Inc() delete(dp.pendingPlansCount, job.deleteRequestID) dp.pendingPlansCountMtx.Unlock() @@ -182,6 +215,7 @@ func (dp *DataPurger) worker() { for job := range dp.workerJobChan { err := dp.executePlan(job.userID, job.deleteRequestID, job.planNo, job.logger) if err != nil { + dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Inc() level.Error(job.logger).Log("msg", "error executing delete plan", "plan_no", job.planNo, "err", err) continue @@ -267,7 +301,9 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error { dp.inProcessRequestIDs[deleteRequest.UserID] = deleteRequest.RequestID err := dp.buildDeletePlan(req) if err != nil { + dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() level.Error(req.logger).Log("msg", "error building delete plan", "err", err) + continue } level.Info(req.logger).Log("msg", "sending delete request for execution") @@ -329,6 +365,8 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { err := dp.buildDeletePlan(req) if err != nil { + dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() + // We do not want to remove this delete request from inProcessRequestIDs to make sure // we do not move multiple deleting requests in deletion process. // None of the other delete requests from the user would be considered for processing until then. @@ -355,6 +393,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { level.Info(req.logger).Log("msg", "building delete plan", "num_plans", len(perDayTimeRange)) plans := make([][]byte, len(perDayTimeRange)) + includedChunkIDs := map[string]struct{}{} + for i, planRange := range perDayTimeRange { chunksGroups := []ChunksGroup{} @@ -364,13 +404,17 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { return err } - // ToDo: remove duplicate chunks chunks, err := dp.chunkStore.Get(ctx, req.UserID, planRange.Start, planRange.End, matchers...) if err != nil { return err } - chunksGroups = append(chunksGroups, groupChunks(chunks, req.StartTime, req.EndTime)...) + var cg []ChunksGroup + cg, includedChunkIDs = groupChunks(chunks, req.StartTime, req.EndTime, includedChunkIDs) + + if len(cg) != 0 { + chunksGroups = append(chunksGroups, cg...) + } } plan := DeletePlan{ @@ -399,6 +443,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { return err } + dp.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(req.UserID).Add(float64(len(includedChunkIDs))) + level.Info(req.logger).Log("msg", "built delete plans", "num_plans", len(perDayTimeRange)) return nil @@ -482,10 +528,15 @@ func numPlans(start, end model.Time) int { // groups chunks together by unique label sets i.e all the chunks with same labels would be stored in a group // chunk details are stored in groups for each unique label set to avoid storing them repetitively for each chunk -func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []ChunksGroup { +func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time, includedChunkIDs map[string]struct{}) ([]ChunksGroup, map[string]struct{}) { metricToChunks := make(map[string]ChunksGroup) for _, chk := range chunks { + chunkID := chk.ExternalKey() + + if _, ok := includedChunkIDs[chunkID]; ok { + continue + } // chunk.Metric are assumed to be sorted which should give same value from String() for same series. // If they stop being sorted then in the worst case we would lose the benefit of grouping chunks to avoid storing labels repetitively. metricString := chk.Metric.String() @@ -494,7 +545,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C group = ChunksGroup{Labels: client.FromLabelsToLabelAdapters(chk.Metric)} } - chunkDetails := ChunkDetails{ID: chk.ExternalKey()} + chunkDetails := ChunkDetails{ID: chunkID} if deleteFrom > chk.From || deleteThrough < chk.Through { partiallyDeletedInterval := Interval{StartTimestampMs: int64(chk.From), EndTimestampMs: int64(chk.Through)} @@ -510,6 +561,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C } group.Chunks = append(group.Chunks, chunkDetails) + includedChunkIDs[chunkID] = struct{}{} metricToChunks[metricString] = group } @@ -519,7 +571,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C chunksGroups = append(chunksGroups, group) } - return chunksGroups + return chunksGroups, includedChunkIDs } func isMissingChunkErr(err error) bool { diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/request_handler.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/request_handler.go index 0ec5bd35aac5..8a933a540025 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/request_handler.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/request_handler.go @@ -5,22 +5,42 @@ import ( "fmt" "net/http" - "github.com/cortexproject/cortex/pkg/util" - + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/util" ) +type deleteRequestHandlerMetrics struct { + deleteRequestsReceivedTotal *prometheus.CounterVec +} + +func newDeleteRequestHandlerMetrics(r prometheus.Registerer) *deleteRequestHandlerMetrics { + m := deleteRequestHandlerMetrics{} + + m.deleteRequestsReceivedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "purger_delete_requests_received_total", + Help: "Number of delete requests received per user", + }, []string{"user"}) + + return &m +} + // DeleteRequestHandler provides handlers for delete requests type DeleteRequestHandler struct { deleteStore *DeleteStore + metrics *deleteRequestHandlerMetrics } // NewDeleteRequestHandler creates a DeleteRequestHandler -func NewDeleteRequestHandler(deleteStore *DeleteStore) *DeleteRequestHandler { +func NewDeleteRequestHandler(deleteStore *DeleteStore, registerer prometheus.Registerer) *DeleteRequestHandler { deleteMgr := DeleteRequestHandler{ deleteStore: deleteStore, + metrics: newDeleteRequestHandlerMetrics(registerer), } return &deleteMgr @@ -83,7 +103,10 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r if err := dm.deleteStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), match); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) + return } + + dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc() } // GetAllDeleteRequestsHandler handles get all delete requests diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 3c94143c073c..1495d6579fb1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -61,6 +61,11 @@ var ( // For 100k series for 7 week, could be 1.2m - 10*(8^(7-1)) = 2.6m. Buckets: prometheus.ExponentialBuckets(10, 8, 7), }) + dedupedChunksTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_deduped_chunks_total", + Help: "Count of chunks which were not stored because they have already been stored by another replica.", + }) ) // seriesStore implements Store @@ -414,6 +419,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun // If this chunk is in cache it must already be in the database so we don't need to write it again found, _, _ := c.cache.Fetch(ctx, []string{chunk.ExternalKey()}) if len(found) > 0 { + dedupedChunksTotal.Inc() return nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go index 436f6e92ba43..99d53c5b15d4 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/factory.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/aws" @@ -100,7 +101,9 @@ func (cfg *Config) Validate() error { } // NewStore makes the storage clients based on the configuration. -func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) { +func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, reg prometheus.Registerer) (chunk.Store, error) { + chunkMetrics := newChunkClientMetrics(reg) + indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig) if err != nil { return nil, err @@ -146,6 +149,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf return nil, errors.Wrap(err, "error creating object client") } + chunks = newMetricsChunkClient(chunks, chunkMetrics) + err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) if err != nil { return nil, err diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/metrics.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/metrics.go new file mode 100644 index 000000000000..28feece364cc --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage/metrics.go @@ -0,0 +1,110 @@ +package storage + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/chunk" +) + +// takes a chunk client and exposes metrics for its operations. +type metricsChunkClient struct { + client chunk.Client + + metrics chunkClientMetrics +} + +func newMetricsChunkClient(client chunk.Client, metrics chunkClientMetrics) metricsChunkClient { + return metricsChunkClient{ + client: client, + metrics: metrics, + } +} + +type chunkClientMetrics struct { + chunksPutPerUser *prometheus.CounterVec + chunksSizePutPerUser *prometheus.CounterVec + chunksFetchedPerUser *prometheus.CounterVec + chunksSizeFetchedPerUser *prometheus.CounterVec +} + +func newChunkClientMetrics(reg prometheus.Registerer) chunkClientMetrics { + return chunkClientMetrics{ + chunksPutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_stored_chunks_total", + Help: "Total stored chunks per user.", + }, []string{"user"}), + chunksSizePutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_stored_chunk_bytes_total", + Help: "Total bytes stored in chunks per user.", + }, []string{"user"}), + chunksFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_fetched_chunks_total", + Help: "Total fetched chunks per user.", + }, []string{"user"}), + chunksSizeFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_fetched_chunk_bytes_total", + Help: "Total bytes fetched in chunks per user.", + }, []string{"user"}), + } +} + +func (c metricsChunkClient) Stop() { + c.client.Stop() +} + +func (c metricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { + if err := c.client.PutChunks(ctx, chunks); err != nil { + return err + } + + // For PutChunks, we explicitly encode the userID in the chunk and don't use context. + userSizes := map[string]int{} + userCounts := map[string]int{} + for _, c := range chunks { + userSizes[c.UserID] += c.Data.Size() + userCounts[c.UserID]++ + } + for user, size := range userSizes { + c.metrics.chunksSizePutPerUser.WithLabelValues(user).Add(float64(size)) + } + for user, num := range userCounts { + c.metrics.chunksPutPerUser.WithLabelValues(user).Add(float64(num)) + } + + return nil +} + +func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { + chks, err := c.client.GetChunks(ctx, chunks) + if err != nil { + return chks, err + } + + // For GetChunks, userID is the chunk and we don't need to use context. + // For now, we just load one user chunks at once, but the interface lets us do it for multiple users. + userSizes := map[string]int{} + userCounts := map[string]int{} + for _, c := range chks { + userSizes[c.UserID] += c.Data.Size() + userCounts[c.UserID]++ + } + for user, size := range userSizes { + c.metrics.chunksSizeFetchedPerUser.WithLabelValues(user).Add(float64(size)) + } + for user, num := range userCounts { + c.metrics.chunksFetchedPerUser.WithLabelValues(user).Add(float64(num)) + } + + return chks, nil +} + +func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error { + return c.client.DeleteChunk(ctx, chunkID) +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go index 85c4babddca3..6797e1493f04 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/storage_client.go @@ -65,7 +65,7 @@ type ReadBatchIterator interface { type ObjectClient interface { PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) - List(ctx context.Context, prefix string) ([]StorageObject, error) + List(ctx context.Context, prefix string) ([]StorageObject, []StorageCommonPrefix, error) DeleteObject(ctx context.Context, objectKey string) error Stop() } @@ -75,3 +75,7 @@ type StorageObject struct { Key string ModifiedAt time.Time } + +// StorageCommonPrefix represents a common prefix aka a synthetic directory in Object Store. +// It is guaranteed to always end with DirDelim +type StorageCommonPrefix string diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/util/util.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/util/util.go index 17ce7830951a..64ae96a969e4 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/util/util.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/util/util.go @@ -12,11 +12,11 @@ import ( "github.com/cortexproject/cortex/pkg/util" ) +// Callback from an IndexQuery. +type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool + // DoSingleQuery is the interface for indexes that don't support batching yet. -type DoSingleQuery func( - ctx context.Context, query chunk.IndexQuery, - callback func(chunk.ReadBatch) bool, -) error +type DoSingleQuery func(context.Context, chunk.IndexQuery, Callback) error // QueryParallelism is the maximum number of subqueries run in // parallel per higher-level query @@ -26,8 +26,12 @@ var QueryParallelism = 100 // and indexes that don't yet support batching. func DoParallelQueries( ctx context.Context, doSingleQuery DoSingleQuery, queries []chunk.IndexQuery, - callback func(chunk.IndexQuery, chunk.ReadBatch) bool, + callback Callback, ) error { + if len(queries) == 1 { + return doSingleQuery(ctx, queries[0], callback) + } + queue := make(chan chunk.IndexQuery) incomingErrors := make(chan error) n := util.Min(len(queries), QueryParallelism) @@ -41,9 +45,7 @@ func DoParallelQueries( if !ok { return } - incomingErrors <- doSingleQuery(ctx, query, func(r chunk.ReadBatch) bool { - return callback(query, r) - }) + incomingErrors <- doSingleQuery(ctx, query, callback) } }() } @@ -67,9 +69,6 @@ func DoParallelQueries( return lastErr } -// Callback from an IndexQuery. -type Callback func(chunk.IndexQuery, chunk.ReadBatch) bool - type filteringBatch struct { query chunk.IndexQuery chunk.ReadBatch diff --git a/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor_ring.go b/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor_ring.go index 3251cdc7c45f..2f3fba14b211 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/distributor/distributor_ring.go @@ -71,7 +71,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { // Configure lifecycler lc.RingConfig = rc - lc.ListenPort = &cfg.ListenPort + lc.ListenPort = cfg.ListenPort lc.Addr = cfg.InstanceAddr lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID diff --git a/vendor/github.com/cortexproject/cortex/pkg/distributor/ingester_client_pool.go b/vendor/github.com/cortexproject/cortex/pkg/distributor/ingester_client_pool.go index e8b6e03a4092..cc0299bc02f2 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/distributor/ingester_client_pool.go +++ b/vendor/github.com/cortexproject/cortex/pkg/distributor/ingester_client_pool.go @@ -38,5 +38,5 @@ func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory HealthCheckTimeout: cfg.RemoteTimeout, } - return ring_client.NewPool("ingester", poolCfg, ring, factory, clients, logger) + return ring_client.NewPool("ingester", poolCfg, ring_client.NewRingServiceDiscovery(ring), factory, clients, logger) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go index 7c1c46c5708a..001dd1a817a3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" ) @@ -168,6 +169,13 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint if matcher.Type == labels.MatchEqual { fps := values.fps[matcher.Value] toIntersect = append(toIntersect, fps.fps...) // deliberate copy + } else if matcher.Type == labels.MatchRegexp && len(chunk.FindSetMatches(matcher.Value)) > 0 { + // The lookup is of the form `=~"a|b|c|d"` + set := chunk.FindSetMatches(matcher.Value) + for _, value := range set { + toIntersect = append(toIntersect, values.fps[value].fps...) + } + sort.Sort(toIntersect) } else { // accumulate the matching fingerprints (which are all distinct) // then sort to maintain the invariant diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go b/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go index 1306646c5684..1c5fce638f1d 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go @@ -163,7 +163,16 @@ func (f *Frontend) handle(w http.ResponseWriter, r *http.Request) { queryResponseTime := time.Since(startTime) if f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan { - level.Info(f.log).Log("msg", "slow query", "org_id", userID, "url", fmt.Sprintf("http://%s", r.Host+r.RequestURI), "time_taken", queryResponseTime.String()) + logMessage := []interface{}{"msg", "slow query", + "org_id", userID, + "url", fmt.Sprintf("http://%s", r.Host+r.RequestURI), + "time_taken", queryResponseTime.String(), + } + pf := r.PostForm.Encode() + if pf != "" { + logMessage = append(logMessage, "body", pf) + } + level.Info(f.log).Log(logMessage...) } if err != nil { @@ -176,7 +185,11 @@ func (f *Frontend) handle(w http.ResponseWriter, r *http.Request) { hs[h] = vs } w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) + + if _, err = io.Copy(w, resp.Body); err != nil { + server.WriteError(w, err) + return + } } func writeError(w http.ResponseWriter, err error) { diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/roundtrip.go b/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/roundtrip.go index fb6047347431..018c71942b0b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/roundtrip.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/roundtrip.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -68,6 +69,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ResultsCacheConfig.RegisterFlags(f) } +// Validate validates the config. func (cfg *Config) Validate(log log.Logger) error { // SplitQueriesByDay is deprecated use SplitQueriesByInterval. if cfg.SplitQueriesByDay { @@ -130,6 +132,13 @@ func NewTripperware( minShardingLookback time.Duration, registerer prometheus.Registerer, ) (frontend.Tripperware, cache.Cache, error) { + // Per tenant query metrics. + queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "query_frontend_queries_total", + Help: "Total queries sent per tenant.", + }, []string{"op", "user"}) + // Metric used to keep track of each middleware execution duration. metrics := NewInstrumentMiddlewareMetrics(registerer) @@ -181,7 +190,20 @@ func NewTripperware( if len(queryRangeMiddleware) > 0 { queryrange := NewRoundTripper(next, codec, queryRangeMiddleware...) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { - if !strings.HasSuffix(r.URL.Path, "/query_range") { + isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range") + op := "query" + if isQueryRange { + op = "query_range" + } + + user, err := user.ExtractOrgID(r.Context()) + // This should never happen anyways because we have auth middleware before this. + if err != nil { + return nil, err + } + queriesPerTenant.WithLabelValues(op, user).Inc() + + if !isQueryRange { return next.RoundTrip(r) } return queryrange.RoundTrip(r) diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/series/series_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/series/series_set.go index bdf8920192b3..33bdcaef7c5d 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/series/series_set.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/series/series_set.go @@ -346,3 +346,14 @@ func (emptySeriesIterator) Next() bool { func (emptySeriesIterator) Err() error { return nil } + +type emptySeriesSet struct{} + +func (emptySeriesSet) Next() bool { return false } +func (emptySeriesSet) At() storage.Series { return nil } +func (emptySeriesSet) Err() error { return nil } + +// NewEmptySeriesSet returns a new series set that contains no series. +func NewEmptySeriesSet() storage.SeriesSet { + return emptySeriesSet{} +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/client/pool.go b/vendor/github.com/cortexproject/cortex/pkg/ring/client/pool.go index bc1931a2eca6..41bc8728abb3 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/client/pool.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/client/pool.go @@ -13,7 +13,6 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -28,6 +27,12 @@ type PoolClient interface { // PoolFactory defines the signature for a client factory. type PoolFactory func(addr string) (PoolClient, error) +// PoolServiceDiscovery defines the signature of a function returning the list +// of known service endpoints. This function is used to remove stale clients from +// the pool (a stale client is a client connected to a service endpoint no more +// active). +type PoolServiceDiscovery func() ([]string, error) + // PoolConfig is config for creating a Pool. type PoolConfig struct { CheckInterval time.Duration @@ -40,7 +45,7 @@ type Pool struct { services.Service cfg PoolConfig - ring ring.ReadRing + discovery PoolServiceDiscovery factory PoolFactory logger log.Logger clientName string @@ -52,10 +57,10 @@ type Pool struct { } // NewPool creates a new Pool. -func NewPool(clientName string, cfg PoolConfig, ring ring.ReadRing, factory PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger) *Pool { +func NewPool(clientName string, cfg PoolConfig, discovery PoolServiceDiscovery, factory PoolFactory, clientsMetric prometheus.Gauge, logger log.Logger) *Pool { p := &Pool{ cfg: cfg, - ring: ring, + discovery: discovery, factory: factory, logger: logger, clientName: clientName, @@ -127,7 +132,7 @@ func (p *Pool) RemoveClientFor(addr string) { } } -// RegisteredAddresses returns all the addresses that a client is cached for +// RegisteredAddresses returns all the service addresses for which there's an active client. func (p *Pool) RegisteredAddresses() []string { result := []string{} p.RLock() @@ -146,19 +151,19 @@ func (p *Pool) Count() int { } func (p *Pool) removeStaleClients() { - clients := map[string]struct{}{} - replicationSet, err := p.ring.GetAll() - if err != nil { - level.Error(util.Logger).Log("msg", "error removing stale clients", "err", err) + // Only if service discovery has been configured. + if p.discovery == nil { return } - for _, ing := range replicationSet.Ingesters { - clients[ing.Addr] = struct{}{} + serviceAddrs, err := p.discovery() + if err != nil { + level.Error(util.Logger).Log("msg", "error removing stale clients", "err", err) + return } for _, addr := range p.RegisteredAddresses() { - if _, ok := clients[addr]; ok { + if util.StringsContain(serviceAddrs, addr) { continue } level.Info(util.Logger).Log("msg", "removing stale client", "addr", addr) diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/client/ring_service_discovery.go b/vendor/github.com/cortexproject/cortex/pkg/ring/client/ring_service_discovery.go new file mode 100644 index 000000000000..4f02233d9257 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/client/ring_service_discovery.go @@ -0,0 +1,20 @@ +package client + +import ( + "github.com/cortexproject/cortex/pkg/ring" +) + +func NewRingServiceDiscovery(r ring.ReadRing) PoolServiceDiscovery { + return func() ([]string, error) { + replicationSet, err := r.GetAll() + if err != nil { + return nil, err + } + + var addrs []string + for _, instance := range replicationSet.Ingesters { + addrs = append(addrs, instance.Addr) + } + return addrs, nil + } +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/consul/client.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/consul/client.go index aa5cc7f260a5..31bb4e58db32 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/consul/client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/consul/client.go @@ -92,6 +92,22 @@ func NewClient(cfg Config, codec codec.Codec) (*Client, error) { return c, nil } +// Put is mostly here for testing. +func (c *Client) Put(ctx context.Context, key string, value interface{}) error { + bytes, err := c.codec.Encode(value) + if err != nil { + return err + } + + return instrument.CollectedRequest(ctx, "Put", consulRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + _, err := c.kv.Put(&consul.KVPair{ + Key: key, + Value: bytes, + }, nil) + return err + }) +} + // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go b/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go index 1aa0f7aadf53..4a09480bd4c6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go @@ -46,7 +46,6 @@ type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control - ListenPort *int `yaml:"-"` NumTokens int `yaml:"num_tokens"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` ObservePeriod time.Duration `yaml:"observe_period"` @@ -62,6 +61,9 @@ type LifecyclerConfig struct { Port int `doc:"hidden"` ID string `doc:"hidden"` SkipUnregister bool `yaml:"-"` + + // Injected internally + ListenPort int `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -143,7 +145,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa if err != nil { return nil, err } - port := GetInstancePort(cfg.Port, *cfg.ListenPort) + port := GetInstancePort(cfg.Port, cfg.ListenPort) codec := GetCodec() store, err := kv.NewClient(cfg.RingConfig.KVStore, codec) if err != nil { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go b/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go index 9231b8d8fd3b..c5da5c6fd869 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/replication_strategy.go @@ -34,7 +34,6 @@ func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati } minSuccess := (replicationFactor / 2) + 1 - maxFailure := replicationFactor - minSuccess // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters @@ -44,19 +43,18 @@ func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati i++ } else { ingesters = append(ingesters[:i], ingesters[i+1:]...) - maxFailure-- } } // This is just a shortcut - if there are not minSuccess available ingesters, // after filtering out dead ones, don't even bother trying. - if maxFailure < 0 || len(ingesters) < minSuccess { + if len(ingesters) < minSuccess { err := fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(ingesters)) return nil, 0, err } - return ingesters, maxFailure, nil + return ingesters, len(ingesters) - minSuccess, nil } func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go index 33751d1b9b4d..db873d5a0640 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/ring.go @@ -256,24 +256,29 @@ func (r *Ring) GetAll() (ReplicationSet, error) { return ReplicationSet{}, ErrEmptyRing } - ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) - maxErrors := r.cfg.ReplicationFactor / 2 + // Calculate the number of required ingesters; + // ensure we always require at least RF-1 when RF=3. + numRequired := len(r.ringDesc.Ingesters) + if numRequired < r.cfg.ReplicationFactor { + numRequired = r.cfg.ReplicationFactor + } + maxUnavailable := r.cfg.ReplicationFactor / 2 + numRequired -= maxUnavailable + ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters)) for _, ingester := range r.ringDesc.Ingesters { - if !r.IsHealthy(&ingester, Read) { - maxErrors-- - continue + if r.IsHealthy(&ingester, Read) { + ingesters = append(ingesters, ingester) } - ingesters = append(ingesters, ingester) } - if maxErrors < 0 { + if len(ingesters) < numRequired { return ReplicationSet{}, fmt.Errorf("too many failed ingesters") } return ReplicationSet{ Ingesters: ingesters, - MaxErrors: maxErrors, + MaxErrors: len(ingesters) - numRequired, }, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/util/test/poll.go b/vendor/github.com/cortexproject/cortex/pkg/util/test/poll.go index 115b97f26360..168eb75a8237 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/util/test/poll.go +++ b/vendor/github.com/cortexproject/cortex/pkg/util/test/poll.go @@ -17,7 +17,7 @@ func Poll(t *testing.T, d time.Duration, want interface{}, have func() interface if reflect.DeepEqual(want, have()) { return } - time.Sleep(d / 10) + time.Sleep(d / 100) } h := have() if !reflect.DeepEqual(want, h) { diff --git a/vendor/modules.txt b/vendor/modules.txt index 398715cb9b23..8406a409c569 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -138,7 +138,7 @@ github.com/coreos/go-systemd/sdjournal # github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f github.com/coreos/pkg/capnslog github.com/coreos/pkg/dlopen -# github.com/cortexproject/cortex v1.0.1-0.20200416152925-3fe04dcff1d8 +# github.com/cortexproject/cortex v1.0.1-0.20200423101820-36496a074bc4 github.com/cortexproject/cortex/pkg/alertmanager github.com/cortexproject/cortex/pkg/alertmanager/alerts github.com/cortexproject/cortex/pkg/alertmanager/alerts/configdb From ee15752eaf0a77a7f52f1850bb38532be33bb19a Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 23 Apr 2020 21:09:14 +0530 Subject: [PATCH 2/3] handle all the breaking changes from cortes Signed-off-by: Sandeep Sukhani --- pkg/ingester/flush_test.go | 2 +- pkg/loki/modules.go | 2 +- pkg/storage/store.go | 3 ++- pkg/storage/stores/local/boltdb_index_client.go | 2 +- pkg/storage/stores/local/downloads.go | 4 ++-- pkg/storage/stores/local/downloads_test.go | 2 +- pkg/storage/stores/util/object_client.go | 8 ++++---- 7 files changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index ff2ac662d8ce..d636fd8bc799 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -192,7 +192,7 @@ func defaultIngesterTestConfig(t *testing.T) Config { cfg.ConcurrentFlushes = 1 cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvClient cfg.LifecyclerConfig.NumTokens = 1 - cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) + cfg.LifecyclerConfig.ListenPort = 0 cfg.LifecyclerConfig.Addr = "localhost" cfg.LifecyclerConfig.ID = "localhost" cfg.LifecyclerConfig.FinalSleep = 0 diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2f3504eed1a8..adfbaee5e2a1 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -191,7 +191,7 @@ func (t *Loki) initQuerier() (services.Service, error) { func (t *Loki) initIngester() (_ services.Service, err error) { t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort + t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort // We want ingester to also query the store when using boltdb-shipper if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 5402f0144965..93a805d88c6e 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -8,6 +8,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" cortex_local "github.com/cortexproject/cortex/pkg/chunk/local" "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" @@ -51,7 +52,7 @@ type store struct { func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) { registerCustomIndexClients(cfg, schemaCfg) - s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits) + s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go index 286780d79966..2d1c80b3c8e1 100644 --- a/pkg/storage/stores/local/boltdb_index_client.go +++ b/pkg/storage/stores/local/boltdb_index_client.go @@ -43,7 +43,7 @@ func (b *BoltdbIndexClientWithShipper) QueryPages(ctx context.Context, queries [ return chunk_util.DoParallelQueries(ctx, b.query, queries, callback) } -func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error { +func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error { db, err := b.GetDB(query.TableName, local.DBOperationRead) if err != nil && err != local.ErrUnexistentBoltDB { return err diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go index b98e2df0215e..bd7112ddbaa6 100644 --- a/pkg/storage/stores/local/downloads.go +++ b/pkg/storage/stores/local/downloads.go @@ -23,7 +23,7 @@ func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc // listing tables from store var objects []chunk.StorageObject - objects, err = s.storageClient.List(ctx, period+"/") + objects, _, err = s.storageClient.List(ctx, period+"/") if err != nil { return } @@ -160,7 +160,7 @@ func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc fc.Lock() defer fc.Unlock() - objects, err := s.storageClient.List(ctx, period+"/") + objects, _, err := s.storageClient.List(ctx, period+"/") if err != nil { return err } diff --git a/pkg/storage/stores/local/downloads_test.go b/pkg/storage/stores/local/downloads_test.go index 47710e892243..f9a233741896 100644 --- a/pkg/storage/stores/local/downloads_test.go +++ b/pkg/storage/stores/local/downloads_test.go @@ -18,7 +18,7 @@ import ( func queryTestBoltdb(t *testing.T, boltdbIndexClient *BoltdbIndexClientWithShipper, query chunk.IndexQuery) map[string]string { resp := map[string]string{} - require.NoError(t, boltdbIndexClient.query(context.Background(), query, func(batch chunk.ReadBatch) (shouldContinue bool) { + require.NoError(t, boltdbIndexClient.query(context.Background(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { itr := batch.Iterator() for itr.Next() { resp[string(itr.RangeValue())] = string(itr.Value()) diff --git a/pkg/storage/stores/util/object_client.go b/pkg/storage/stores/util/object_client.go index bb95ca3ee17a..8101dc702d84 100644 --- a/pkg/storage/stores/util/object_client.go +++ b/pkg/storage/stores/util/object_client.go @@ -21,17 +21,17 @@ func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) ( return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) } -func (p PrefixedObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { - objects, err := p.downstreamClient.List(ctx, p.prefix+prefix) +func (p PrefixedObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, []chunk.StorageCommonPrefix, error) { + objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix) if err != nil { - return nil, err + return nil, nil, err } for i := range objects { objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix) } - return objects, nil + return objects, commonPrefixes, nil } func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error { From 3df327495c3070a8e80188e129aadb01ecb0a77a Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Thu, 23 Apr 2020 16:38:09 -0400 Subject: [PATCH 3/3] rebasing on master, re-runing go mod vendor seemed to bring in some more changes... Signed-off-by: Ed Welch --- .../cortexproject/cortex/pkg/api/api.go | 4 +- .../cortex/pkg/compactor/compactor_ring.go | 2 +- .../cortexproject/cortex/pkg/cortex/cortex.go | 2 +- .../cortex/pkg/cortex/modules.go | 23 +- .../cortex/pkg/ingester/flush.go | 4 - .../cortex/pkg/ingester/ingester.go | 7 + .../cortex/pkg/ingester/metrics.go | 10 - .../cortexproject/cortex/pkg/ingester/wal.go | 38 +- .../pkg/querier/blocks_store_balanced_set.go | 71 ++++ .../pkg/querier/blocks_store_queryable.go | 327 ++++++++++++++++++ .../querier/blocks_store_replicated_set.go | 150 ++++++++ .../cortex/pkg/querier/querier.go | 14 + .../pkg/querier/store_gateway_client.go | 38 +- .../cortex/pkg/ruler/ruler_ring.go | 2 +- .../ruler/rules/objectclient/rule_store.go | 4 +- .../cortex/pkg/storage/tsdb/config.go | 2 + .../cortex/pkg/storegateway/gateway_ring.go | 14 +- 17 files changed, 670 insertions(+), 42 deletions(-) create mode 100644 vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go create mode 100644 vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go create mode 100644 vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go diff --git a/vendor/github.com/cortexproject/cortex/pkg/api/api.go b/vendor/github.com/cortexproject/cortex/pkg/api/api.go index ad392849d28a..7aeb6e16f338 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/api/api.go +++ b/vendor/github.com/cortexproject/cortex/pkg/api/api.go @@ -7,6 +7,8 @@ import ( "regexp" "strings" + "github.com/prometheus/client_golang/prometheus" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/route" @@ -177,7 +179,7 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf // match the Prometheus API but mirror it closely enough to justify their routing under the Prometheus // component/ func (a *API) RegisterPurger(store *purger.DeleteStore) { - deleteRequestHandler := purger.NewDeleteRequestHandler(store) + deleteRequestHandler := purger.NewDeleteRequestHandler(store, prometheus.DefaultRegisterer) a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST") a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET") diff --git a/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go b/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go index 6caa4bd501e7..4db5f2747883 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/compactor/compactor_ring.go @@ -71,7 +71,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { // Configure lifecycler lc.RingConfig = rc - lc.ListenPort = &cfg.ListenPort + lc.ListenPort = cfg.ListenPort lc.Addr = cfg.InstanceAddr lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID diff --git a/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go b/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go index d441967e1f52..e79b102b13c1 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go +++ b/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go @@ -92,7 +92,7 @@ type Config struct { Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. TSDB tsdb.Config `yaml:"tsdb"` Compactor compactor.Config `yaml:"compactor"` - StoreGateway storegateway.Config `yaml:"store_gateway" doc:"hidden"` // this component is not yet finished. + StoreGateway storegateway.Config `yaml:"store_gateway"` DataPurgerConfig purger.Config `yaml:"purger"` Ruler ruler.Config `yaml:"ruler"` diff --git a/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go b/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go index adaeb89b3487..3975f84756ac 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go +++ b/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go @@ -224,7 +224,7 @@ func (t *Cortex) initStoreQueryable(cfg *Config) (services.Service, error) { return nil, nil } - if cfg.Storage.Engine == storage.StorageEngineTSDB { + if cfg.Storage.Engine == storage.StorageEngineTSDB && !cfg.TSDB.StoreGatewayEnabled { storeQueryable, err := querier.NewBlockQueryable(cfg.TSDB, cfg.Server.LogLevel, prometheus.DefaultRegisterer) if err != nil { return nil, err @@ -233,13 +233,28 @@ func (t *Cortex) initStoreQueryable(cfg *Config) (services.Service, error) { return storeQueryable, nil } + if cfg.Storage.Engine == storage.StorageEngineTSDB && cfg.TSDB.StoreGatewayEnabled { + // When running in single binary, if the blocks sharding is disabled and no custom + // store-gateway address has been configured, we can set it to the running process. + if cfg.Target == All && !cfg.StoreGateway.ShardingEnabled && cfg.Querier.StoreGatewayAddresses == "" { + cfg.Querier.StoreGatewayAddresses = fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) + } + + storeQueryable, err := querier.NewBlocksStoreQueryableFromConfig(cfg.Querier, cfg.StoreGateway, cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + t.storeQueryable = storeQueryable + return storeQueryable, nil + } + return nil, fmt.Errorf("unknown storage engine '%s'", cfg.Storage.Engine) } func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) { cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort + cfg.Ingester.LifecyclerConfig.ListenPort = cfg.Server.GRPCListenPort cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB cfg.Ingester.TSDBConfig = cfg.TSDB cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels @@ -278,7 +293,7 @@ func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) { return } - t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides) + t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides, prometheus.DefaultRegisterer) if err != nil { return } @@ -497,7 +512,7 @@ func (t *Cortex) initDataPurger(cfg *Config) (services.Service, error) { return nil, err } - t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient) + t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go index 46de627fd76d..7dee669454be 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/flush.go @@ -339,8 +339,6 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing return err } - sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID) - countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID) // Record statistics only when actual put request did not return error. for _, chunkDesc := range chunkDescs { utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() @@ -348,8 +346,6 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing i.metrics.chunkUtilization.Observe(utilization) i.metrics.chunkLength.Observe(float64(length)) i.metrics.chunkSize.Observe(float64(size)) - sizePerUser.Add(float64(size)) - countPerUser.Inc() i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go index 6fa1e0fa7fde..43eb996c3b41 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/ingester.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "net/http" + "os" "sync" "time" @@ -157,6 +158,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c } } + if cfg.WALConfig.WALEnabled || cfg.WALConfig.Recover { + if err := os.MkdirAll(cfg.WALConfig.Dir, os.ModePerm); err != nil { + return nil, err + } + } + i := &Ingester{ cfg: cfg, clientConfig: clientConfig, diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go index f1b9548902b6..636fd9ac209b 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/metrics.go @@ -45,8 +45,6 @@ type ingesterMetrics struct { chunkUtilization prometheus.Histogram chunkLength prometheus.Histogram chunkSize prometheus.Histogram - chunksPerUser *prometheus.CounterVec - chunkSizePerUser *prometheus.CounterVec chunkAge prometheus.Histogram memoryChunks prometheus.Gauge flushReasons *prometheus.CounterVec @@ -153,14 +151,6 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD Help: "Distribution of stored chunk sizes (when stored).", Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000 }), - chunksPerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_chunks_stored_total", - Help: "Total stored chunks per user.", - }, []string{"user"}), - chunkSizePerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_chunk_stored_bytes_total", - Help: "Total bytes stored in chunks per user.", - }, []string{"user"}), chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_chunk_age_seconds", Help: "Distribution of chunk ages (when stored).", diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go index 398a42388ba6..89a722a90898 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/wal.go @@ -35,6 +35,8 @@ type WALConfig struct { Recover bool `yaml:"recover_from_wal"` Dir string `yaml:"wal_dir"` CheckpointDuration time.Duration `yaml:"checkpoint_duration"` + // We always checkpoint during shutdown. This option exists for the tests. + checkpointDuringShutdown bool } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -44,6 +46,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", true, "Enable checkpointing of in-memory chunks. It should always be true when using normally. Set it to false iff you are doing some small tests as there is no mechanism to delete the old WAL yet if checkpoint is disabled.") f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.") + cfg.checkpointDuringShutdown = true } // WAL interface allows us to have a no-op WAL when the WAL is disabled. @@ -69,11 +72,13 @@ type walWrapper struct { checkpointMtx sync.Mutex // Checkpoint metrics. - checkpointDeleteFail prometheus.Counter - checkpointDeleteTotal prometheus.Counter - checkpointCreationFail prometheus.Counter - checkpointCreationTotal prometheus.Counter - checkpointDuration prometheus.Summary + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter + checkpointDuration prometheus.Summary + checkpointLoggedBytesTotal prometheus.Counter + walLoggedBytesTotal prometheus.Counter } // newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. @@ -121,6 +126,14 @@ func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState, register Help: "Time taken to create a checkpoint.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) + w.checkpointLoggedBytesTotal = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_checkpoint_logged_bytes_total", + Help: "Total number of bytes written to disk for checkpointing.", + }) + w.walLoggedBytesTotal = promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_wal_logged_bytes_total", + Help: "Total number of bytes written to disk for WAL records.", + }) w.wait.Add(1) go w.run() @@ -145,6 +158,7 @@ func (w *walWrapper) Log(record *Record) error { if err != nil { return err } + w.walLoggedBytesTotal.Add(float64(len(buf))) return w.wal.Log(buf) } } @@ -172,9 +186,11 @@ func (w *walWrapper) run() { level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String()) w.checkpointDuration.Observe(elapsed.Seconds()) case <-w.quit: - level.Info(util.Logger).Log("msg", "creating checkpoint before shutdown") - if err := w.performCheckpoint(true); err != nil { - level.Error(util.Logger).Log("msg", "error checkpointing series during shutdown", "err", err) + if w.cfg.checkpointDuringShutdown { + level.Info(util.Logger).Log("msg", "creating checkpoint before shutdown") + if err := w.performCheckpoint(true); err != nil { + level.Error(util.Logger).Log("msg", "error checkpointing series during shutdown", "err", err) + } } return } @@ -396,7 +412,11 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge return wireChunks, err } - return wireChunks, cp.Log(buf) + err = cp.Log(buf) + if err == nil { + w.checkpointLoggedBytesTotal.Add(float64(len(buf))) + } + return wireChunks, err } type walRecoveryParameters struct { diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go new file mode 100644 index 000000000000..ff54044a1d55 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go @@ -0,0 +1,71 @@ +package querier + +import ( + "context" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/extprom" + + "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// BlocksStoreSet implementation used when the blocks are not sharded in the store-gateway +// and so requests are balanced across the set of store-gateway instances. +type blocksStoreBalancedSet struct { + services.Service + + serviceAddresses []string + clientsPool *client.Pool + dnsProvider *dns.Provider +} + +func newBlocksStoreBalancedSet(serviceAddresses []string, logger log.Logger, reg prometheus.Registerer) *blocksStoreBalancedSet { + const dnsResolveInterval = 10 * time.Second + + dnsProviderReg := extprom.WrapRegistererWithPrefix("cortex_storegateway_client_", reg) + + s := &blocksStoreBalancedSet{ + serviceAddresses: serviceAddresses, + dnsProvider: dns.NewProvider(logger, dnsProviderReg, dns.GolangResolverType), + clientsPool: newStoreGatewayClientPool(nil, logger, reg), + } + + s.Service = services.NewTimerService(dnsResolveInterval, s.starting, s.resolve, nil) + return s +} + +func (s *blocksStoreBalancedSet) starting(ctx context.Context) error { + // Initial DNS resolution. + return s.resolve(ctx) +} + +func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error { + s.dnsProvider.Resolve(ctx, s.serviceAddresses) + return nil +} + +func (s *blocksStoreBalancedSet) GetClientsFor(_ []*metadata.Meta) ([]storegatewaypb.StoreGatewayClient, error) { + addresses := s.dnsProvider.Addresses() + if len(addresses) == 0 { + return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ",")) + } + + // Pick a random address and return its client from the pool. + addr := addresses[rand.Intn(len(addresses))] + c, err := s.clientsPool.GetClientFor(addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get store-gateway client for %s", addr) + } + + return []storegatewaypb.StoreGatewayClient{c.(storegatewaypb.StoreGatewayClient)}, nil +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go new file mode 100644 index 000000000000..68b2f3f87774 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_queryable.go @@ -0,0 +1,327 @@ +package querier + +import ( + "context" + "io" + "sync" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/user" + "golang.org/x/sync/errgroup" + grpc_metadata "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storegateway" + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/spanlogger" +) + +var ( + errNoStoreGatewayAddress = errors.New("no store-gateway address configured") +) + +// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. +type BlocksStoreSet interface { + services.Service + + // GetClientsFor returns the store gateway clients that should be used to + // query the set of blocks in input. + GetClientsFor(metas []*metadata.Meta) ([]storegatewaypb.StoreGatewayClient, error) +} + +// BlocksFinder is the interface used to find blocks for a given user and time range. +type BlocksFinder interface { + services.Service + + // GetBlocks returns known blocks for userID containing samples within the range minT + // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. + GetBlocks(userID string, minT, maxT int64) ([]*metadata.Meta, error) +} + +// BlocksStoreQueryable is a queryable which queries blocks storage via +// the store-gateway. +type BlocksStoreQueryable struct { + services.Service + + stores BlocksStoreSet + finder BlocksFinder + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + // Metrics. + storesHit prometheus.Histogram +} + +func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { + util.WarnExperimentalUse("Blocks storage engine") + + manager, err := services.NewManager(stores, finder) + if err != nil { + return nil, errors.Wrap(err, "register blocks storage queryable subservices") + } + + q := &BlocksStoreQueryable{ + stores: stores, + finder: finder, + subservices: manager, + subservicesWatcher: services.NewFailureWatcher(), + storesHit: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "querier_storegateway_instances_hit_per_query", + Help: "Number of store-gateway instances hit for a single query.", + Buckets: []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }), + } + + q.Service = services.NewBasicService(q.starting, q.running, q.stopping) + + return q, nil +} + +func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.Config, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { + var stores BlocksStoreSet + + bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg, "querier", logger) + if err != nil { + return nil, errors.Wrap(err, "failed to create bucket client") + } + + scanner := NewBlocksScanner(BlocksScannerConfig{ + ScanInterval: storageCfg.BucketStore.SyncInterval, + TenantsConcurrency: storageCfg.BucketStore.TenantSyncConcurrency, + MetasConcurrency: storageCfg.BucketStore.BlockSyncConcurrency, + CacheDir: storageCfg.BucketStore.SyncDir, + ConsistencyDelay: storageCfg.BucketStore.ConsistencyDelay, + IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, + }, bucketClient, logger, reg) + + if gatewayCfg.ShardingEnabled { + storesRingCfg := gatewayCfg.ShardingRing.ToRingConfig() + storesRingBackend, err := kv.NewClient(storesRingCfg.KVStore, ring.GetCodec()) + if err != nil { + return nil, errors.Wrap(err, "failed to create store-gateway ring backend") + } + + storesRing, err := ring.NewWithStoreClientAndStrategy(storesRingCfg, storegateway.RingNameForClient, storegateway.RingKey, storesRingBackend, &storegateway.BlocksReplicationStrategy{}) + if err != nil { + return nil, errors.Wrap(err, "failed to create store-gateway ring client") + } + + if reg != nil { + reg.MustRegister(storesRing) + } + + stores, err = newBlocksStoreReplicationSet(storesRing, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "failed to create store set") + } + } else { + if len(querierCfg.GetStoreGatewayAddresses()) == 0 { + return nil, errNoStoreGatewayAddress + } + + stores = newBlocksStoreBalancedSet(querierCfg.GetStoreGatewayAddresses(), logger, reg) + } + + return NewBlocksStoreQueryable(stores, scanner, reg) +} + +func (q *BlocksStoreQueryable) starting(ctx context.Context) error { + q.subservicesWatcher.WatchManager(q.subservices) + + if err := services.StartManagerAndAwaitHealthy(ctx, q.subservices); err != nil { + return errors.Wrap(err, "unable to start blocks storage queryable subservices") + } + + return nil +} + +func (q *BlocksStoreQueryable) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-q.subservicesWatcher.Chan(): + return errors.Wrap(err, "block storage queryable subservice failed") + } + } +} + +func (q *BlocksStoreQueryable) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) +} + +// Querier returns a new Querier on the storage. +func (q *BlocksStoreQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + if s := q.State(); s != services.Running { + return nil, promql.ErrStorage{Err: errors.Errorf("BlocksStoreQueryable is not running: %v", s)} + } + + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, promql.ErrStorage{Err: err} + } + + return &blocksStoreQuerier{ + ctx: ctx, + minT: mint, + maxT: maxt, + userID: userID, + finder: q.finder, + stores: q.stores, + storesHit: q.storesHit, + }, nil +} + +type blocksStoreQuerier struct { + ctx context.Context + minT, maxT int64 + userID string + finder BlocksFinder + stores BlocksStoreSet + storesHit prometheus.Histogram +} + +func (q *blocksStoreQuerier) Select(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + return q.SelectSorted(sp, matchers...) +} + +func (q *blocksStoreQuerier) SelectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + set, warnings, err := q.selectSorted(sp, matchers...) + + // We need to wrap the error in order to have Prometheus returning a 5xx error. + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + err = promql.ErrStorage{Err: err} + } + + return set, warnings, err +} + +func (q *blocksStoreQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { + // Cortex doesn't use this. It will ask ingesters for metadata. + return nil, nil, errors.New("not implemented") +} + +func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) { + // Cortex doesn't use this. It will ask ingesters for metadata. + return nil, nil, errors.New("not implemented") +} + +func (q *blocksStoreQuerier) Close() error { + return nil +} + +func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + log, _ := spanlogger.New(q.ctx, "blocksStoreQuerier.selectSorted") + defer log.Span.Finish() + + minT, maxT := q.minT, q.maxT + if sp != nil { + minT, maxT = sp.Start, sp.End + } + + // Find the list of blocks we need to query given the time range. + metas, err := q.finder.GetBlocks(q.userID, minT, maxT) + if err != nil { + return nil, nil, err + } + + if len(metas) == 0 { + if q.storesHit != nil { + q.storesHit.Observe(0) + } + + return series.NewEmptySeriesSet(), nil, nil + } + + // Find the set of store-gateway instances having the blocks. + clients, err := q.stores.GetClientsFor(metas) + if err != nil { + return nil, nil, err + } + + req := &storepb.SeriesRequest{ + MinTime: minT, + MaxTime: maxT, + Matchers: convertMatchersToLabelMatcher(matchers), + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + + var ( + reqCtx = grpc_metadata.AppendToOutgoingContext(q.ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) + g, gCtx = errgroup.WithContext(reqCtx) + mtx = sync.Mutex{} + seriesSets = []storage.SeriesSet(nil) + warnings = storage.Warnings(nil) + ) + + // Concurrently fetch series from all clients. + for _, c := range clients { + // Change variable scope since it will be used in a goroutine. + c := c + + g.Go(func() error { + stream, err := c.Series(gCtx, req) + if err != nil { + return errors.Wrapf(err, "failed to fetch series from %s", c) + } + + mySeries := []*storepb.Series(nil) + myWarnings := storage.Warnings(nil) + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrapf(err, "failed to receive series from %s", c) + } + + // Response may either contain series or warning. If it's warning, we get nil here. + if s := resp.GetSeries(); s != nil { + mySeries = append(mySeries, s) + } + + // Collect and return warnings too. + if w := resp.GetWarning(); w != "" { + myWarnings = append(myWarnings, errors.New(w)) + } + } + + // Store the result. + mtx.Lock() + seriesSets = append(seriesSets, &blockQuerierSeriesSet{series: mySeries}) + warnings = append(warnings, myWarnings...) + mtx.Unlock() + + return nil + }) + } + + // Wait until all client requests complete. + if err := g.Wait(); err != nil { + return nil, nil, err + } + + if q.storesHit != nil { + q.storesHit.Observe(float64(len(clients))) + } + + return storage.NewMergeSeriesSet(seriesSets, nil), warnings, nil +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go new file mode 100644 index 000000000000..ecaa0a4350b2 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go @@ -0,0 +1,150 @@ +package querier + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/client" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// BlocksStoreSet implementation used when the blocks are sharded and replicated across +// a set of store-gateway instances. +type blocksStoreReplicationSet struct { + services.Service + + storesRing *ring.Ring + clientsPool *client.Pool + + // Subservices manager. + subservices *services.Manager + subservicesWatcher *services.FailureWatcher +} + +func newBlocksStoreReplicationSet(storesRing *ring.Ring, logger log.Logger, reg prometheus.Registerer) (*blocksStoreReplicationSet, error) { + s := &blocksStoreReplicationSet{ + storesRing: storesRing, + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), logger, reg), + } + + var err error + s.subservices, err = services.NewManager(s.storesRing, s.clientsPool) + if err != nil { + return nil, err + } + + s.Service = services.NewBasicService(s.starting, s.running, s.stopping) + + return s, nil +} + +func (s *blocksStoreReplicationSet) starting(ctx context.Context) error { + s.subservicesWatcher.WatchManager(s.subservices) + + if err := services.StartManagerAndAwaitHealthy(ctx, s.subservices); err != nil { + return errors.Wrap(err, "unable to start blocks store set subservices") + } + + return nil +} + +func (s *blocksStoreReplicationSet) running(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case err := <-s.subservicesWatcher.Chan(): + return errors.Wrap(err, "blocks store set subservice failed") + } + } +} + +func (s *blocksStoreReplicationSet) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) +} + +func (s *blocksStoreReplicationSet) GetClientsFor(metas []*metadata.Meta) ([]storegatewaypb.StoreGatewayClient, error) { + var sets []ring.ReplicationSet + + // Find the replication set of each block we need to query. + for _, m := range metas { + // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). + // Do not reuse the same buffer across multiple Get() calls because we do retain the + // returned replication set. + buf := make([]ring.IngesterDesc, 0, s.storesRing.ReplicationFactor()+2) + + set, err := s.storesRing.Get(cortex_tsdb.HashBlockID(m.ULID), ring.BlocksRead, buf) + if err != nil { + return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", m.ULID.String()) + } + + sets = append(sets, set) + } + + var clients []storegatewaypb.StoreGatewayClient + + // Get the client for each store-gateway. + for _, addr := range findSmallestInstanceSet(sets) { + c, err := s.clientsPool.GetClientFor(addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get store-gateway client for %s", addr) + } + + clients = append(clients, c.(storegatewaypb.StoreGatewayClient)) + } + + return clients, nil +} + +// findSmallestInstanceSet returns the minimal set of store-gateway instances including all required blocks. +// Blocks may be replicated across store-gateway instances, but we want to query the lowest number of instances +// possible, so this function tries to find the smallest set of store-gateway instances containing all blocks +// we need to query. +func findSmallestInstanceSet(sets []ring.ReplicationSet) []string { + addr := findHighestInstanceOccurrences(sets) + if addr == "" { + return nil + } + + // Remove any replication set containing the selected instance address. + for i := 0; i < len(sets); { + if sets[i].Includes(addr) { + sets = append(sets[:i], sets[i+1:]...) + } else { + i++ + } + } + + return append([]string{addr}, findSmallestInstanceSet(sets)...) +} + +func findHighestInstanceOccurrences(sets []ring.ReplicationSet) string { + var highestAddr string + var highestCount int + + occurrences := map[string]int{} + + for _, set := range sets { + for _, i := range set.Ingesters { + if v, ok := occurrences[i.Addr]; ok { + occurrences[i.Addr] = v + 1 + } else { + occurrences[i.Addr] = 1 + } + + if occurrences[i.Addr] > highestCount { + highestAddr = i.Addr + highestCount = occurrences[i.Addr] + } + } + } + + return highestAddr +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go b/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go index bd0cec1545d1..3e2b3373a79e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/querier.go @@ -4,6 +4,7 @@ import ( "context" "errors" "flag" + "strings" "time" "github.com/cortexproject/cortex/pkg/chunk/purger" @@ -48,6 +49,9 @@ type Config struct { // However, we need to use active query tracker, otherwise we cannot limit Max Concurrent queries in the PromQL // engine. ActiveQueryTrackerDir string `yaml:"active_query_tracker_dir"` + + // Blocks storage only. + StoreGatewayAddresses string `yaml:"store_gateway_addresses"` } var ( @@ -70,6 +74,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.") f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.") + f.StringVar(&cfg.StoreGatewayAddresses, "experimental.querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the experimental blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).") } // Validate the config @@ -85,6 +90,14 @@ func (cfg *Config) Validate() error { return nil } +func (cfg *Config) GetStoreGatewayAddresses() []string { + if cfg.StoreGatewayAddresses == "" { + return nil + } + + return strings.Split(cfg.StoreGatewayAddresses, ",") +} + func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { if cfg.BatchIterators { return batch.NewChunkMergeIterator @@ -94,6 +107,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { return mergeChunks } +// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store. func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable { return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg)) } diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go b/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go index dd3f936b0b52..98d0ef0648b6 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/store_gateway_client.go @@ -1,6 +1,9 @@ package querier import ( + "time" + + "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -12,11 +15,11 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpcclient" ) -func NewStoreGatewayClientFactory(cfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newStoreGatewayClientFactory(cfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "storegateway_client_request_duration_seconds", - Help: "Time spent executing requests on store-gateway.", + Help: "Time spent executing requests to the store-gateway.", Buckets: prometheus.ExponentialBuckets(0.008, 4, 7), ConstLabels: prometheus.Labels{"client": "querier"}, }, []string{"operation", "status_code"}) @@ -50,3 +53,34 @@ type storeGatewayClient struct { func (c *storeGatewayClient) Close() error { return c.conn.Close() } + +func (c *storeGatewayClient) String() string { + return c.conn.Target() +} + +func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, logger log.Logger, reg prometheus.Registerer) *client.Pool { + // We prefer sane defaults instead of exposing further config options. + clientCfg := grpcclient.Config{ + MaxRecvMsgSize: 100 << 20, + MaxSendMsgSize: 16 << 20, + UseGzipCompression: false, + RateLimit: 0, + RateLimitBurst: 0, + BackoffOnRatelimits: false, + } + + poolCfg := client.PoolConfig{ + CheckInterval: time.Minute, + HealthCheckEnabled: true, + HealthCheckTimeout: 10 * time.Second, + } + + clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "storegateway_clients", + Help: "The current number of store-gateway clients in the pool.", + ConstLabels: map[string]string{"client": "querier"}, + }) + + return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger) +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go index fb45cb2c8b1d..566474da126a 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ruler/ruler_ring.go @@ -76,7 +76,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig { // Configure lifecycler lc.RingConfig = rc - lc.ListenPort = &cfg.ListenPort + lc.ListenPort = cfg.ListenPort lc.Addr = cfg.InstanceAddr lc.Port = cfg.InstancePort lc.ID = cfg.InstanceID diff --git a/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go b/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go index f03a3d5dbf73..b76074efdefb 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ruler/rules/objectclient/rule_store.go @@ -64,7 +64,7 @@ func (o *RuleStore) getRuleGroup(ctx context.Context, objectKey string) (*rules. // ListAllRuleGroups returns all the active rule groups func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.RuleGroupList, error) { - ruleGroupObjects, err := o.client.List(ctx, generateRuleObjectKey("", "", "")) + ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey("", "", "")) if err != nil { return nil, err } @@ -93,7 +93,7 @@ func (o *RuleStore) ListAllRuleGroups(ctx context.Context) (map[string]rules.Rul // ListRuleGroups returns all the active rule groups for a user func (o *RuleStore) ListRuleGroups(ctx context.Context, userID, namespace string) (rules.RuleGroupList, error) { - ruleGroupObjects, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, "")) + ruleGroupObjects, _, err := o.client.List(ctx, generateRuleObjectKey(userID, namespace, "")) if err != nil { return nil, err } diff --git a/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go b/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go index 3bab9993b210..075249a97737 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go +++ b/vendor/github.com/cortexproject/cortex/pkg/storage/tsdb/config.go @@ -58,6 +58,7 @@ type Config struct { HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"` HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"` StripeSize int `yaml:"stripe_size"` + StoreGatewayEnabled bool `yaml:"store_gateway_enabled"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -128,6 +129,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") f.IntVar(&cfg.StripeSize, "experimental.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") + f.BoolVar(&cfg.StoreGatewayEnabled, "experimental.tsdb.store-gateway-enabled", false, "True if the Cortex cluster is running the store-gateway service and the querier should query the bucket store via the store-gateway.") } // Validate the config. diff --git a/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go b/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go index 48e96d08ad53..7f34f0b203db 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go +++ b/vendor/github.com/cortexproject/cortex/pkg/storegateway/gateway_ring.go @@ -61,18 +61,18 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { } // Ring flags - cfg.KVStore.RegisterFlagsWithPrefix("experimental.store-gateway.ring.", "collectors/", f) - f.DurationVar(&cfg.HeartbeatPeriod, "experimental.store-gateway.ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") - f.DurationVar(&cfg.HeartbeatTimeout, "experimental.store-gateway.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring.") + cfg.KVStore.RegisterFlagsWithPrefix("experimental.store-gateway.sharding-ring.", "collectors/", f) + f.DurationVar(&cfg.HeartbeatPeriod, "experimental.store-gateway.sharding-ring.heartbeat-period", 15*time.Second, "Period at which to heartbeat to the ring.") + f.DurationVar(&cfg.HeartbeatTimeout, "experimental.store-gateway.sharding-ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring.") f.IntVar(&cfg.ReplicationFactor, "experimental.store-gateway.replication-factor", 3, "The replication factor to use when sharding blocks.") f.StringVar(&cfg.TokensFilePath, "experimental.store-gateway.tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") // Instance flags cfg.InstanceInterfaceNames = []string{"eth0", "en0"} - f.Var((*flagext.Strings)(&cfg.InstanceInterfaceNames), "experimental.store-gateway.ring.instance-interface", "Name of network interface to read address from.") - f.StringVar(&cfg.InstanceAddr, "experimental.store-gateway.ring.instance-addr", "", "IP address to advertise in the ring.") - f.IntVar(&cfg.InstancePort, "experimental.store-gateway.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") - f.StringVar(&cfg.InstanceID, "experimental.store-gateway.ring.instance-id", hostname, "Instance ID to register in the ring.") + f.Var((*flagext.Strings)(&cfg.InstanceInterfaceNames), "experimental.store-gateway.sharding-ring.instance-interface", "Name of network interface to read address from.") + f.StringVar(&cfg.InstanceAddr, "experimental.store-gateway.sharding-ring.instance-addr", "", "IP address to advertise in the ring.") + f.IntVar(&cfg.InstancePort, "experimental.store-gateway.sharding-ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") + f.StringVar(&cfg.InstanceID, "experimental.store-gateway.sharding-ring.instance-id", hostname, "Instance ID to register in the ring.") // Defaults for internal settings. cfg.RingCheckPeriod = 5 * time.Second