Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable write autoscaling for active DynamoDB tables #507

Merged
merged 15 commits into from
Aug 14, 2017
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "v1.10.8"

[[constraint]]
name = "github.com/prometheus/alertmanager"
Expand Down
4 changes: 4 additions & 0 deletions cmd/table-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func main() {
util.RegisterFlags(&serverConfig, &storageConfig, &schemaConfig)
flag.Parse()

if schemaConfig.ChunkTables.WriteScale.Enabled && storageConfig.ApplicationAutoScaling.URL != nil {
log.Fatal("WriteScale is enabled but no ApplicationAutoScaling URL has been provided")
}

tableClient, err := storage.NewTableClient(storageConfig)
if err != nil {
log.Fatalf("Error initializing DynamoDB table client: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/aws/credentials:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/applicationautoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/applicationautoscaling/applicationautoscalingiface:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/dynamodb:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/s3:go_default_library",
Expand Down
18 changes: 15 additions & 3 deletions pkg/chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -103,15 +104,17 @@ func init() {

// DynamoDBConfig specifies config for a DynamoDB database.
type DynamoDBConfig struct {
DynamoDB util.URLValue
APILimit float64
DynamoDB util.URLValue
APILimit float64
ApplicationAutoScaling util.URLValue
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL with escaped Key and Secret encoded. "+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.")
f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.")
f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.")
}

// AWSStorageConfig specifies config for storing data on AWS.
Expand Down Expand Up @@ -806,6 +809,15 @@ func recordDynamoError(tableName string, err error, operation string) {

// dynamoClientFromURL creates a new DynamoDB client from a URL.
func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) {
dynamoDBSession, err := awsSessionFromURL(awsURL)
if err != nil {
return nil, err
}
return dynamodb.New(dynamoDBSession), nil
}

// awsSessionFromURL creates a new aws session from a URL.
func awsSessionFromURL(awsURL *url.URL) (client.ConfigProvider, error) {
if awsURL == nil {
return nil, fmt.Errorf("no URL specified for DynamoDB")
}
Expand All @@ -817,7 +829,7 @@ func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) {
if err != nil {
return nil, err
}
return dynamodb.New(session.New(config)), nil
return session.New(config), nil
}

// awsConfigFromURL returns AWS config from given URL. It expects escaped AWS Access key ID & Secret Access Key to be
Expand Down
197 changes: 193 additions & 4 deletions pkg/chunk/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,36 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/applicationautoscaling"
"github.com/aws/aws-sdk-go/service/applicationautoscaling/applicationautoscalingiface"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"golang.org/x/time/rate"

"github.com/weaveworks/common/instrument"
"github.com/weaveworks/cortex/pkg/util"
)

const (
autoScalingPolicyNamePrefix = "DynamoScalingPolicy_cortex_"
)

var applicationAutoScalingRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "application_autoscaling_request_duration_seconds",
Help: "Time spent doing ApplicationAutoScaling requests.",

// AWS latency seems to range from a few ms to a few sec. So use 8 buckets
// from 128us to 2s. TODO: Confirm that this is the case for ApplicationAutoScaling.
Buckets: prometheus.ExponentialBuckets(0.000128, 4, 8),
}, []string{"operation", "status_code"})

type dynamoTableClient struct {
DynamoDB dynamodbiface.DynamoDBAPI
limiter *rate.Limiter
DynamoDB dynamodbiface.DynamoDBAPI
ApplicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI
limiter *rate.Limiter
}

// NewDynamoDBTableClient makes a new DynamoTableClient.
Expand All @@ -26,9 +44,20 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (TableClient, error) {
if err != nil {
return nil, err
}

var applicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI
if cfg.ApplicationAutoScaling.URL != nil {
session, err := awsSessionFromURL(cfg.ApplicationAutoScaling.URL)
if err != nil {
return nil, err
}
applicationAutoScaling = applicationautoscaling.New(session)
}

return dynamoTableClient{
DynamoDB: dynamoDB,
limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1),
DynamoDB: dynamoDB,
ApplicationAutoScaling: applicationAutoScaling,
limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1),
}, nil
}

Expand Down Expand Up @@ -111,6 +140,13 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc TableDesc) erro
return err
}

if desc.WriteScale.Enabled {
err := d.enableAutoScaling(ctx, desc)
if err != nil {
return err
}
}

tags := desc.Tags.AWSTags()
if len(tags) > 0 {
return d.backoffAndRetry(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -157,10 +193,71 @@ func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc
return err
})
})

if d.ApplicationAutoScaling != nil {
err = d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DescribeScalableTargetsWithContext", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
out, err := d.ApplicationAutoScaling.DescribeScalableTargetsWithContext(ctx, &applicationautoscaling.DescribeScalableTargetsInput{
ResourceIds: []*string{aws.String("table/" + desc.Name)},
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
})
switch l := len(out.ScalableTargets); l {
case 0:
return err
case 1:
desc.WriteScale.Enabled = true
desc.WriteScale.RoleARN = *out.ScalableTargets[0].RoleARN
desc.WriteScale.MinCapacity = *out.ScalableTargets[0].MinCapacity
desc.WriteScale.MaxCapacity = *out.ScalableTargets[0].MaxCapacity
return err
default:
return fmt.Errorf("more than one scalable target found for DynamoDB table")
}
})
})

err = d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DescribeScalingPoliciesWithContext", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
out, err := d.ApplicationAutoScaling.DescribeScalingPoliciesWithContext(ctx, &applicationautoscaling.DescribeScalingPoliciesInput{
PolicyNames: []*string{aws.String(autoScalingPolicyNamePrefix + desc.Name)},
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
})
switch l := len(out.ScalingPolicies); l {
case 0:
return err
case 1:
desc.WriteScale.InCooldown = *out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration.ScaleInCooldown
desc.WriteScale.OutCooldown = *out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration.ScaleOutCooldown
desc.WriteScale.TargetValue = *out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration.TargetValue
return err
default:
return fmt.Errorf("more than one scaling policy found for DynamoDB table")
}
})
})
}
return
}

func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected TableDesc) error {
var err error
if !current.WriteScale.Enabled {
if expected.WriteScale.Enabled {
err = d.enableAutoScaling(ctx, expected)
}
} else {
if !expected.WriteScale.Enabled {
err = d.disableAutoScaling(ctx, expected)
} else if current.WriteScale != expected.WriteScale {
err = d.enableAutoScaling(ctx, expected)
}
}
if err != nil {
return err
}

if current.ProvisionedRead != expected.ProvisionedRead || current.ProvisionedWrite != expected.ProvisionedWrite {
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -208,3 +305,95 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected Ta
}
return nil
}

func (d dynamoTableClient) enableAutoScaling(ctx context.Context, desc TableDesc) error {
// Registers or updates a scalable target
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.RegisterScalableTarget", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.RegisterScalableTargetInput{
MinCapacity: aws.Int64(desc.WriteScale.MinCapacity),
MaxCapacity: aws.Int64(desc.WriteScale.MaxCapacity),
ResourceId: aws.String("table/" + desc.Name),
RoleARN: aws.String(desc.WriteScale.RoleARN),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
}
_, err := d.ApplicationAutoScaling.RegisterScalableTarget(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}

// Puts or updates a scaling policy
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.PutScalingPolicy", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.PutScalingPolicyInput{
PolicyName: aws.String(autoScalingPolicyNamePrefix + desc.Name),
PolicyType: aws.String("TargetTrackingScaling"),
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{
PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{
PredefinedMetricType: aws.String("DynamoDBWriteCapacityUtilization"),
},
ScaleInCooldown: aws.Int64(desc.WriteScale.InCooldown),
ScaleOutCooldown: aws.Int64(desc.WriteScale.OutCooldown),
TargetValue: aws.Float64(desc.WriteScale.TargetValue),
},
}
_, err := d.ApplicationAutoScaling.PutScalingPolicy(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}
return nil
}

func (d dynamoTableClient) disableAutoScaling(ctx context.Context, desc TableDesc) error {
// Deregister scalable target
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DeregisterScalableTarget", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.DeregisterScalableTargetInput{
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
}
_, err := d.ApplicationAutoScaling.DeregisterScalableTarget(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}

// Delete scaling policy
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DeleteScalingPolicy", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.DeleteScalingPolicyInput{
PolicyName: aws.String(autoScalingPolicyNamePrefix + desc.Name),
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
}
_, err := d.ApplicationAutoScaling.DeleteScalingPolicy(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}
return nil
}
Loading