Skip to content

Commit

Permalink
Limit parallelism when fetching chunks from DynamoDB (grafana#644)
Browse files Browse the repository at this point in the history
If someone sends a query that requires 260,000 chunks we don't want
to hit DynamoDB from 260 goroutines in parallel.
  • Loading branch information
bboreham authored Jan 10, 2018
1 parent 3c9f07a commit b54a047
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
12 changes: 9 additions & 3 deletions aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type DynamoDBConfig struct {
DynamoDB util.URLValue
APILimit float64
ApplicationAutoScaling util.URLValue
DynamoDBChunkGangSize int
ChunkGangSize int
ChunkGetMaxParallelism int
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -118,7 +119,8 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.")
f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.")
f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.")
f.IntVar(&cfg.DynamoDBChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
f.IntVar(&cfg.ChunkGangSize, "dynamodb.chunk.gang.size", 10, "Number of chunks to group together to parallelise fetches (zero to disable)")
f.IntVar(&cfg.ChunkGetMaxParallelism, "dynamodb.chunk.get.max.parallelism", 32, "Max number of chunk-get operations to start in parallel")
}

// AWSStorageConfig specifies config for storing data on AWS.
Expand Down Expand Up @@ -450,9 +452,13 @@ func (a awsStorageClient) GetChunks(ctx context.Context, chunks []Chunk) ([]Chun
return s3Chunks, err
}

gangSize := a.cfg.DynamoDBChunkGangSize * dynamoDBMaxReadBatchSize
gangSize := a.cfg.ChunkGangSize * dynamoDBMaxReadBatchSize
if gangSize == 0 { // zero means turn feature off
gangSize = len(dynamoDBChunks)
} else {
if len(dynamoDBChunks)/gangSize > a.cfg.ChunkGetMaxParallelism {
gangSize = len(dynamoDBChunks)/a.cfg.ChunkGetMaxParallelism + 1
}
}

results := make(chan chunksPlusError)
Expand Down
9 changes: 5 additions & 4 deletions aws_storage_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,11 @@ func TestAWSStorageClientChunks(t *testing.T) {
name string
provisionedErr int
gangSize int
maxParallelism int
}{
{"DynamoDB chunks", 0, 10},
{"DynamoDB chunks with parallel fetch disabled", 0, 0},
{"DynamoDB chunks retry logic", 2, 10},
{"DynamoDB chunks", 0, 10, 20},
{"DynamoDB chunks with parallel fetch disabled", 0, 0, 20},
{"DynamoDB chunks retry logic", 2, 10, 20},
}

for _, tt := range tests {
Expand All @@ -508,7 +509,7 @@ func TestAWSStorageClientChunks(t *testing.T) {

client := awsStorageClient{
cfg: AWSStorageConfig{
DynamoDBConfig: DynamoDBConfig{DynamoDBChunkGangSize: tt.gangSize},
DynamoDBConfig: DynamoDBConfig{ChunkGangSize: tt.gangSize, ChunkGetMaxParallelism: tt.maxParallelism},
},
DynamoDB: dynamoDB,
schemaCfg: schemaConfig,
Expand Down

0 comments on commit b54a047

Please sign in to comment.