diff --git a/CHANGELOG.md b/CHANGELOG.md index 13fc23103d35..2a4901169bcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * [4892](https://github.com/grafana/loki/pull/4892) **cristaloleg**: Loki: upgrade cristalhq/hedgedhttp from v0.6.0 to v0.7.0 * [4902](https://github.com/grafana/loki/pull/4902) **cyriltovena**: Fixes 500 when query is outside of max_query_lookback. * [4904](https://github.com/grafana/loki/pull/4904) **bboreham**: Fixes rare race condition that could crash an ingester. +* [4942](https://github.com/grafana/loki/pull/4942) **cyriltovena**: Allow to disable HTTP/2 for GCS. * [4876](https://github.com/grafana/loki/pull/4876) **trevorwhitney**: Docs: add simple, scalable example using docker-compose # 2.4.1 (2021/11/07) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index a4dd72899368..ce6e7abb181a 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -723,6 +723,10 @@ The `gcs_storage_config` configures GCS as a general storage for different data # The duration after which the requests to GCS should be timed out. # CLI flag: -.gcs.request-timeout [request_timeout: | default = 0s] + +# Enable HTTP/2 when connecting to GCS. +# CLI flag: -.gcs.enable-http2 +[enable_http2: | default = true] ``` ## s3_storage_config diff --git a/pkg/storage/chunk/aws/s3_storage_client.go b/pkg/storage/chunk/aws/s3_storage_client.go index 0df2baffa40c..cbc83586160b 100644 --- a/pkg/storage/chunk/aws/s3_storage_client.go +++ b/pkg/storage/chunk/aws/s3_storage_client.go @@ -299,9 +299,9 @@ func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, - MaxIdleConns: 100, + MaxIdleConns: 200, IdleConnTimeout: cfg.HTTPConfig.IdleConnTimeout, - MaxIdleConnsPerHost: 100, + MaxIdleConnsPerHost: 200, TLSHandshakeTimeout: 3 * time.Second, ExpectContinueTimeout: 1 * time.Second, ResponseHeaderTimeout: cfg.HTTPConfig.ResponseHeaderTimeout, diff --git a/pkg/storage/chunk/azure/blob_storage_client.go b/pkg/storage/chunk/azure/blob_storage_client.go index 778c879ef642..dba7815d418a 100644 --- a/pkg/storage/chunk/azure/blob_storage_client.go +++ b/pkg/storage/chunk/azure/blob_storage_client.go @@ -68,8 +68,8 @@ var ( KeepAlive: 30 * time.Second, DualStack: true, }).Dial, - MaxIdleConns: 0, - MaxIdleConnsPerHost: 100, + MaxIdleConns: 200, + MaxIdleConnsPerHost: 200, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, @@ -292,7 +292,6 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe } return azblob.NewPipeline(*tokenCredential, opts), nil - } func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) { @@ -331,7 +330,6 @@ func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) { // both can be empty, systemAssignedMSI scenario spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/") - if err != nil { return nil, err } diff --git a/pkg/storage/chunk/cassandra/fixtures.go b/pkg/storage/chunk/cassandra/fixtures.go index c7d567e3557e..c146b57317d6 100644 --- a/pkg/storage/chunk/cassandra/fixtures.go +++ b/pkg/storage/chunk/cassandra/fixtures.go @@ -41,7 +41,7 @@ func (f *fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient, return nil, nil, nil, schemaConfig, nil, err } - objectClient, err := NewObjectClient(cfg, schemaConfig, nil) + objectClient, err := NewObjectClient(cfg, schemaConfig, nil, 150) if err != nil { return nil, nil, nil, schemaConfig, nil, err } diff --git a/pkg/storage/chunk/cassandra/storage_client.go b/pkg/storage/chunk/cassandra/storage_client.go index 848dcca0710c..30952895dc2b 100644 --- a/pkg/storage/chunk/cassandra/storage_client.go +++ b/pkg/storage/chunk/cassandra/storage_client.go @@ -446,10 +446,11 @@ type ObjectClient struct { readSession *gocql.Session writeSession *gocql.Session querySemaphore *semaphore.Weighted + maxGetParallel int } // NewObjectClient returns a new ObjectClient. -func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*ObjectClient, error) { +func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer, maxGetParallel int) (*ObjectClient, error) { readSession, err := cfg.session("chunks-read", registerer) if err != nil { return nil, errors.WithStack(err) @@ -471,6 +472,7 @@ func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer promet readSession: readSession, writeSession: writeSession, querySemaphore: querySemaphore, + maxGetParallel: maxGetParallel, } return client, nil } @@ -501,7 +503,7 @@ func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) erro // GetChunks implements chunk.ObjectClient. func (s *ObjectClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) { - return util.GetParallelChunks(ctx, input, s.getChunk) + return util.GetParallelChunks(ctx, s.maxGetParallel, input, s.getChunk) } func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) { diff --git a/pkg/storage/chunk/gcp/gcs_object_client.go b/pkg/storage/chunk/gcp/gcs_object_client.go index f01fa06d722c..f21caf5f0f12 100644 --- a/pkg/storage/chunk/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/gcp/gcs_object_client.go @@ -33,6 +33,7 @@ type GCSConfig struct { ChunkBufferSize int `yaml:"chunk_buffer_size"` RequestTimeout time.Duration `yaml:"request_timeout"` EnableOpenCensus bool `yaml:"enable_opencensus"` + EnableHTTP2 bool `yaml:"enable_http2"` Insecure bool `yaml:"-"` } @@ -47,7 +48,8 @@ func (cfg *GCSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.BucketName, prefix+"gcs.bucketname", "", "Name of GCS bucket. Please refer to https://cloud.google.com/docs/authentication/production for more information about how to configure authentication.") f.IntVar(&cfg.ChunkBufferSize, prefix+"gcs.chunk-buffer-size", 0, "The size of the buffer that GCS client for each PUT request. 0 to disable buffering.") f.DurationVar(&cfg.RequestTimeout, prefix+"gcs.request-timeout", 0, "The duration after which the requests to GCS should be timed out.") - f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enabled OpenCensus (OC) instrumentation for all requests.") + f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enable OpenCensus (OC) instrumentation for all requests.") + f.BoolVar(&cfg.EnableHTTP2, prefix+"gcs.enable-http2", true, "Enable HTTP2 connections.") } func (cfg *GCSConfig) ToCortexGCSConfig() cortex_gcp.GCSConfig { @@ -82,7 +84,7 @@ func newGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.C func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config, hedging bool, clientFactory ClientFactory) (*storage.BucketHandle, error) { var opts []option.ClientOption - httpClient, err := gcsInstrumentation(ctx, storage.ScopeReadWrite, cfg.Insecure) + httpClient, err := gcsInstrumentation(ctx, storage.ScopeReadWrite, cfg.Insecure, cfg.EnableHTTP2) if err != nil { return nil, err } diff --git a/pkg/storage/chunk/gcp/instrumentation.go b/pkg/storage/chunk/gcp/instrumentation.go index b661d2c6c314..83f7e3cfc219 100644 --- a/pkg/storage/chunk/gcp/instrumentation.go +++ b/pkg/storage/chunk/gcp/instrumentation.go @@ -50,9 +50,16 @@ func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClie } } -func gcsInstrumentation(ctx context.Context, scope string, insecure bool) (*http.Client, error) { +func gcsInstrumentation(ctx context.Context, scope string, insecure bool, http2 bool) (*http.Client, error) { // start with default transport customTransport := http.DefaultTransport.(*http.Transport).Clone() + customTransport.MaxIdleConnsPerHost = 200 + customTransport.MaxIdleConns = 200 + if !http2 { + // disable HTTP/2 by setting TLSNextProto to non-nil empty map, as per the net/http documentation. + // see http2 section of https://pkg.go.dev/net/http + customTransport.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper) + } if insecure { customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 395b5f41cbad..7e9ebe519d84 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -22,17 +22,25 @@ var Base64Encoder = func(key string) string { return base64.StdEncoding.EncodeToString([]byte(key)) } +const defaultMaxParallel = 150 + // Client is used to store chunks in object store backends type Client struct { - store chunk.ObjectClient - keyEncoder KeyEncoder + store chunk.ObjectClient + keyEncoder KeyEncoder + getChunkMaxParallel int } // NewClient wraps the provided ObjectClient with a chunk.Client implementation func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client { + return NewClientWithMaxParallel(store, encoder, defaultMaxParallel) +} + +func NewClientWithMaxParallel(store chunk.ObjectClient, encoder KeyEncoder, maxParallel int) *Client { return &Client{ - store: store, - keyEncoder: encoder, + store: store, + keyEncoder: encoder, + getChunkMaxParallel: maxParallel, } } @@ -82,7 +90,11 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { // GetChunks retrieves the specified chunks from the configured backend func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { - return util.GetParallelChunks(ctx, chunks, o.getChunk) + getChunkMaxParallel := o.getChunkMaxParallel + if getChunkMaxParallel == 0 { + getChunkMaxParallel = defaultMaxParallel + } + return util.GetParallelChunks(ctx, getChunkMaxParallel, chunks, o.getChunk) } func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { diff --git a/pkg/storage/chunk/openstack/swift_object_client.go b/pkg/storage/chunk/openstack/swift_object_client.go index 04d03e6ab2a5..3060a8851c9a 100644 --- a/pkg/storage/chunk/openstack/swift_object_client.go +++ b/pkg/storage/chunk/openstack/swift_object_client.go @@ -24,7 +24,8 @@ import ( var defaultTransport http.RoundTripper = &http.Transport{ Proxy: http.ProxyFromEnvironment, - MaxIdleConnsPerHost: 512, + MaxIdleConnsPerHost: 200, + MaxIdleConns: 200, ExpectContinueTimeout: 5 * time.Second, } diff --git a/pkg/storage/chunk/storage/factory.go b/pkg/storage/chunk/storage/factory.go index ec95d113d920..f80cb210f40a 100644 --- a/pkg/storage/chunk/storage/factory.go +++ b/pkg/storage/chunk/storage/factory.go @@ -94,6 +94,7 @@ type Config struct { IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config"` DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` + MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"` GrpcConfig grpc.Config `yaml:"grpc_store"` @@ -117,6 +118,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading.", f) f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.") f.BoolVar(&cfg.DisableBroadIndexQueries, "store.disable-broad-index-queries", false, "Disable broad index queries which results in reduced cache usage and faster query performance at the expense of somewhat higher QPS on the index store.") + f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.") } // Validate config and returns error on failure @@ -271,7 +273,11 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis case StorageTypeInMemory: return chunk.NewMockStorage(), nil case StorageTypeAWS, StorageTypeS3: - return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging)) + c, err := aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging) + if err != nil { + return nil, err + } + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil case StorageTypeAWSDynamo: if cfg.AWSStorageConfig.DynamoDB.URL == nil { return nil, fmt.Errorf("Must set -dynamodb.url in aws mode") @@ -282,23 +288,35 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis } return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer) case StorageTypeAzure: - return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging)) + c, err := azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging) + if err != nil { + return nil, err + } + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil case StorageTypeGCP: return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case StorageTypeGCPColumnKey, StorageTypeBigTable, StorageTypeBigTableHashed: return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case StorageTypeGCS: - return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging)) + c, err := gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging) + if err != nil { + return nil, err + } + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil case StorageTypeSwift: - return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging)) + c, err := openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging) + if err != nil { + return nil, err + } + return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil case StorageTypeCassandra: - return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer) + return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer, cfg.MaxParallelGetChunk) case StorageTypeFileSystem: store, err := local.NewFSObjectClient(cfg.FSConfig) if err != nil { return nil, err } - return objectclient.NewClient(store, objectclient.Base64Encoder), nil + return objectclient.NewClientWithMaxParallel(store, objectclient.Base64Encoder, cfg.MaxParallelGetChunk), nil case StorageTypeGrpc: return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg) default: @@ -306,13 +324,6 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis } } -func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client, error) { - if err != nil { - return nil, err - } - return objectclient.NewClient(store, nil), nil -} - // NewTableClient makes a new table client based on the configuration. func NewTableClient(name string, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { diff --git a/pkg/storage/chunk/util/parallel_chunk_fetch.go b/pkg/storage/chunk/util/parallel_chunk_fetch.go index 4f17b0ddeabf..27de32b7ffaa 100644 --- a/pkg/storage/chunk/util/parallel_chunk_fetch.go +++ b/pkg/storage/chunk/util/parallel_chunk_fetch.go @@ -11,8 +11,6 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" ) -const maxParallel = 1000 - var decodeContextPool = sync.Pool{ New: func() interface{} { return chunk.NewDecodeContext() @@ -20,7 +18,7 @@ var decodeContextPool = sync.Pool{ } // GetParallelChunks fetches chunks in parallel (up to maxParallel). -func GetParallelChunks(ctx context.Context, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) { +func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) { log, ctx := spanlogger.New(ctx, "GetParallelChunks") defer log.Finish() log.LogFields(otlog.Int("requested", len(chunks))) diff --git a/pkg/storage/chunk/util/parallel_chunk_fetch_test.go b/pkg/storage/chunk/util/parallel_chunk_fetch_test.go index 359c94bb66dd..7fc48ca5366c 100644 --- a/pkg/storage/chunk/util/parallel_chunk_fetch_test.go +++ b/pkg/storage/chunk/util/parallel_chunk_fetch_test.go @@ -12,7 +12,7 @@ func BenchmarkGetParallelChunks(b *testing.B) { in := make([]chunk.Chunk, 1024) b.ResetTimer() for i := 0; i < b.N; i++ { - res, err := GetParallelChunks(ctx, in, + res, err := GetParallelChunks(ctx, 150, in, func(_ context.Context, d *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) { return c, nil }) diff --git a/pkg/storage/stores/shipper/compactor/retention/util_test.go b/pkg/storage/stores/shipper/compactor/retention/util_test.go index e6473f93c3e0..e54c9e46e563 100644 --- a/pkg/storage/stores/shipper/compactor/retention/util_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/util_test.go @@ -229,6 +229,7 @@ func newTestStore(t testing.TB) *testStore { FSConfig: local.FSConfig{ Directory: chunkDir, }, + MaxParallelGetChunk: 150, }, BoltDBShipperConfig: shipper.Config{ ActiveIndexDirectory: indexDir,