Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to disable http2 for GCS. #4942

Merged
merged 10 commits into from
Jan 3, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [4892](https://github.com/grafana/loki/pull/4892) **cristaloleg**: Loki: upgrade cristalhq/hedgedhttp from v0.6.0 to v0.7.0
* [4902](https://github.com/grafana/loki/pull/4902) **cyriltovena**: Fixes 500 when query is outside of max_query_lookback.
* [4904](https://github.com/grafana/loki/pull/4904) **bboreham**: Fixes rare race condition that could crash an ingester.
* [4942](https://github.com/grafana/loki/pull/4942) **cyriltovena**: Allow to disable HTTP/2 for GCS.
* [4876](https://github.com/grafana/loki/pull/4876) **trevorwhitney**: Docs: add simple, scalable example using docker-compose

# 2.4.1 (2021/11/07)
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ The `gcs_storage_config` configures GCS as a general storage for different data
# The duration after which the requests to GCS should be timed out.
# CLI flag: -<prefix>.gcs.request-timeout
[request_timeout: <duration> | default = 0s]

# Enable HTTP/2 when connecting to GCS.
# CLI flag: -<prefix>.gcs.enable-http2
[enable_http2: <bool> | default = true]
```

## s3_storage_config
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ func buildS3Client(cfg S3Config, hedgingCfg hedging.Config, hedging bool) (*s3.S
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
MaxIdleConns: 200,
IdleConnTimeout: cfg.HTTPConfig.IdleConnTimeout,
MaxIdleConnsPerHost: 100,
MaxIdleConnsPerHost: 200,
TLSHandshakeTimeout: 3 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: cfg.HTTPConfig.ResponseHeaderTimeout,
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/chunk/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ var (
KeepAlive: 30 * time.Second,
DualStack: true,
}).Dial,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 100,
MaxIdleConns: 200,
MaxIdleConnsPerHost: 200,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
Expand Down Expand Up @@ -292,7 +292,6 @@ func (b *BlobStorage) newPipeline(hedgingCfg hedging.Config, hedging bool) (pipe
}

return azblob.NewPipeline(*tokenCredential, opts), nil

}

func (b *BlobStorage) getOAuthToken() (*azblob.TokenCredential, error) {
Expand Down Expand Up @@ -331,7 +330,6 @@ func (b *BlobStorage) fetchMSIToken() (*adal.ServicePrincipalToken, error) {

// both can be empty, systemAssignedMSI scenario
spt, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, "https://storage.azure.com/")

if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cassandra/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (f *fixture) Clients() (chunk.IndexClient, chunk.Client, chunk.TableClient,
return nil, nil, nil, schemaConfig, nil, err
}

objectClient, err := NewObjectClient(cfg, schemaConfig, nil)
objectClient, err := NewObjectClient(cfg, schemaConfig, nil, 150)
if err != nil {
return nil, nil, nil, schemaConfig, nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/chunk/cassandra/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,11 @@ type ObjectClient struct {
readSession *gocql.Session
writeSession *gocql.Session
querySemaphore *semaphore.Weighted
maxGetParallel int
}

// NewObjectClient returns a new ObjectClient.
func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (*ObjectClient, error) {
func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer, maxGetParallel int) (*ObjectClient, error) {
readSession, err := cfg.session("chunks-read", registerer)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -471,6 +472,7 @@ func NewObjectClient(cfg Config, schemaCfg chunk.SchemaConfig, registerer promet
readSession: readSession,
writeSession: writeSession,
querySemaphore: querySemaphore,
maxGetParallel: maxGetParallel,
}
return client, nil
}
Expand Down Expand Up @@ -501,7 +503,7 @@ func (s *ObjectClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) erro

// GetChunks implements chunk.ObjectClient.
func (s *ObjectClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) {
return util.GetParallelChunks(ctx, input, s.getChunk)
return util.GetParallelChunks(ctx, s.maxGetParallel, input, s.getChunk)
}

func (s *ObjectClient) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, input chunk.Chunk) (chunk.Chunk, error) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/chunk/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type GCSConfig struct {
ChunkBufferSize int `yaml:"chunk_buffer_size"`
RequestTimeout time.Duration `yaml:"request_timeout"`
EnableOpenCensus bool `yaml:"enable_opencensus"`
EnableHTTP2 bool `yaml:"enable_http2"`

Insecure bool `yaml:"-"`
}
Expand All @@ -47,7 +48,8 @@ func (cfg *GCSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.BucketName, prefix+"gcs.bucketname", "", "Name of GCS bucket. Please refer to https://cloud.google.com/docs/authentication/production for more information about how to configure authentication.")
f.IntVar(&cfg.ChunkBufferSize, prefix+"gcs.chunk-buffer-size", 0, "The size of the buffer that GCS client for each PUT request. 0 to disable buffering.")
f.DurationVar(&cfg.RequestTimeout, prefix+"gcs.request-timeout", 0, "The duration after which the requests to GCS should be timed out.")
f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enabled OpenCensus (OC) instrumentation for all requests.")
f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enable OpenCensus (OC) instrumentation for all requests.")
f.BoolVar(&cfg.EnableHTTP2, prefix+"gcs.enable-http2", true, "Enable HTTP2 connections.")
}

func (cfg *GCSConfig) ToCortexGCSConfig() cortex_gcp.GCSConfig {
Expand Down Expand Up @@ -82,7 +84,7 @@ func newGCSObjectClient(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.C

func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Config, hedging bool, clientFactory ClientFactory) (*storage.BucketHandle, error) {
var opts []option.ClientOption
httpClient, err := gcsInstrumentation(ctx, storage.ScopeReadWrite, cfg.Insecure)
httpClient, err := gcsInstrumentation(ctx, storage.ScopeReadWrite, cfg.Insecure, cfg.EnableHTTP2)
if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/chunk/gcp/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ func bigtableInstrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClie
}
}

func gcsInstrumentation(ctx context.Context, scope string, insecure bool) (*http.Client, error) {
func gcsInstrumentation(ctx context.Context, scope string, insecure bool, http2 bool) (*http.Client, error) {
// start with default transport
customTransport := http.DefaultTransport.(*http.Transport).Clone()
customTransport.MaxIdleConnsPerHost = 200
customTransport.MaxIdleConns = 200
if !http2 {
// disable HTTP/2 by setting TLSNextProto to non-nil empty map, as per the net/http documentation.
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
// see http2 section of https://pkg.go.dev/net/http
customTransport.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
}
if insecure {
customTransport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/storage/chunk/objectclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,25 @@ var Base64Encoder = func(key string) string {
return base64.StdEncoding.EncodeToString([]byte(key))
}

const defaultMaxParallel = 150

// Client is used to store chunks in object store backends
type Client struct {
store chunk.ObjectClient
keyEncoder KeyEncoder
store chunk.ObjectClient
keyEncoder KeyEncoder
getChunkMaxParallel int
}

// NewClient wraps the provided ObjectClient with a chunk.Client implementation
func NewClient(store chunk.ObjectClient, encoder KeyEncoder) *Client {
return NewClientWithMaxParallel(store, encoder, defaultMaxParallel)
}

func NewClientWithMaxParallel(store chunk.ObjectClient, encoder KeyEncoder, maxParallel int) *Client {
return &Client{
store: store,
keyEncoder: encoder,
store: store,
keyEncoder: encoder,
getChunkMaxParallel: maxParallel,
}
}

Expand Down Expand Up @@ -82,7 +90,11 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {

// GetChunks retrieves the specified chunks from the configured backend
func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
return util.GetParallelChunks(ctx, chunks, o.getChunk)
getChunkMaxParallel := o.getChunkMaxParallel
if getChunkMaxParallel == 0 {
getChunkMaxParallel = defaultMaxParallel
}
return util.GetParallelChunks(ctx, getChunkMaxParallel, chunks, o.getChunk)
}

func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/chunk/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

var defaultTransport http.RoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxIdleConnsPerHost: 512,
MaxIdleConnsPerHost: 200,
MaxIdleConns: 200,
ExpectContinueTimeout: 5 * time.Second,
}

Expand Down
37 changes: 24 additions & 13 deletions pkg/storage/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type Config struct {

IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config"`
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"`

GrpcConfig grpc.Config `yaml:"grpc_store"`

Expand All @@ -117,6 +118,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading.", f)
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
f.BoolVar(&cfg.DisableBroadIndexQueries, "store.disable-broad-index-queries", false, "Disable broad index queries which results in reduced cache usage and faster query performance at the expense of somewhat higher QPS on the index store.")
f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.")
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -271,7 +273,11 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis
case StorageTypeInMemory:
return chunk.NewMockStorage(), nil
case StorageTypeAWS, StorageTypeS3:
return newChunkClientFromStore(aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging))
c, err := aws.NewS3ObjectClient(cfg.AWSStorageConfig.S3Config, cfg.Hedging)
if err != nil {
return nil, err
}
return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil
case StorageTypeAWSDynamo:
if cfg.AWSStorageConfig.DynamoDB.URL == nil {
return nil, fmt.Errorf("Must set -dynamodb.url in aws mode")
Expand All @@ -282,37 +288,42 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, regis
}
return aws.NewDynamoDBChunkClient(cfg.AWSStorageConfig.DynamoDBConfig, schemaCfg, registerer)
case StorageTypeAzure:
return newChunkClientFromStore(azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging))
c, err := azure.NewBlobStorage(&cfg.AzureStorageConfig, cfg.Hedging)
if err != nil {
return nil, err
}
return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil
case StorageTypeGCP:
return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case StorageTypeGCPColumnKey, StorageTypeBigTable, StorageTypeBigTableHashed:
return gcp.NewBigtableObjectClient(context.Background(), cfg.GCPStorageConfig, schemaCfg)
case StorageTypeGCS:
return newChunkClientFromStore(gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging))
c, err := gcp.NewGCSObjectClient(context.Background(), cfg.GCSConfig, cfg.Hedging)
if err != nil {
return nil, err
}
return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil
case StorageTypeSwift:
return newChunkClientFromStore(openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging))
c, err := openstack.NewSwiftObjectClient(cfg.Swift, cfg.Hedging)
if err != nil {
return nil, err
}
return objectclient.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk), nil
case StorageTypeCassandra:
return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer)
return cassandra.NewObjectClient(cfg.CassandraStorageConfig, schemaCfg, registerer, cfg.MaxParallelGetChunk)
case StorageTypeFileSystem:
store, err := local.NewFSObjectClient(cfg.FSConfig)
if err != nil {
return nil, err
}
return objectclient.NewClient(store, objectclient.Base64Encoder), nil
return objectclient.NewClientWithMaxParallel(store, objectclient.Base64Encoder, cfg.MaxParallelGetChunk), nil
case StorageTypeGrpc:
return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg)
default:
return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: %v, %v, %v, %v, %v, %v, %v, %v", name, StorageTypeAWS, StorageTypeAzure, StorageTypeCassandra, StorageTypeInMemory, StorageTypeGCP, StorageTypeBigTable, StorageTypeBigTableHashed, StorageTypeGrpc)
}
}

func newChunkClientFromStore(store chunk.ObjectClient, err error) (chunk.Client, error) {
if err != nil {
return nil, err
}
return objectclient.NewClient(store, nil), nil
}

// NewTableClient makes a new table client based on the configuration.
func NewTableClient(name string, cfg Config, registerer prometheus.Registerer) (chunk.TableClient, error) {
if indexClientFactory, ok := customIndexStores[name]; ok {
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/chunk/util/parallel_chunk_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
)

const maxParallel = 1000

var decodeContextPool = sync.Pool{
New: func() interface{} {
return chunk.NewDecodeContext()
},
}

// GetParallelChunks fetches chunks in parallel (up to maxParallel).
func GetParallelChunks(ctx context.Context, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) {
func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chunk, f func(context.Context, *chunk.DecodeContext, chunk.Chunk) (chunk.Chunk, error)) ([]chunk.Chunk, error) {
log, ctx := spanlogger.New(ctx, "GetParallelChunks")
defer log.Finish()
log.LogFields(otlog.Int("requested", len(chunks)))
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/util/parallel_chunk_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func BenchmarkGetParallelChunks(b *testing.B) {
in := make([]chunk.Chunk, 1024)
b.ResetTimer()
for i := 0; i < b.N; i++ {
res, err := GetParallelChunks(ctx, in,
res, err := GetParallelChunks(ctx, 150, in,
func(_ context.Context, d *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {
return c, nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func newTestStore(t testing.TB) *testStore {
FSConfig: local.FSConfig{
Directory: chunkDir,
},
MaxParallelGetChunk: 150,
},
BoltDBShipperConfig: shipper.Config{
ActiveIndexDirectory: indexDir,
Expand Down