Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable write autoscaling for active DynamoDB tables #507

Merged
merged 15 commits into from
Aug 14, 2017
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "v1.10.8"

[[constraint]]
name = "github.com/prometheus/alertmanager"
Expand Down
4 changes: 4 additions & 0 deletions cmd/table-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func main() {
util.RegisterFlags(&serverConfig, &storageConfig, &schemaConfig)
flag.Parse()

if schemaConfig.ChunkTables.WriteScaleEnabled && storageConfig.ApplicationAutoScaling.URL != nil {
log.Fatal("WriteScale is enabled but no ApplicationAutoScaling URL has been provided")
}

tableClient, err := storage.NewTableClient(storageConfig)
if err != nil {
log.Fatalf("Error initializing DynamoDB table client: %v", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ go_library(
"//vendor/github.com/aws/aws-sdk-go/aws/credentials:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/session:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/applicationautoscaling:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/applicationautoscaling/applicationautoscalingiface:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/dynamodb:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/s3:go_default_library",
Expand Down
18 changes: 15 additions & 3 deletions pkg/chunk/aws_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -103,15 +104,17 @@ func init() {

// DynamoDBConfig specifies config for a DynamoDB database.
type DynamoDBConfig struct {
DynamoDB util.URLValue
APILimit float64
DynamoDB util.URLValue
APILimit float64
ApplicationAutoScaling util.URLValue
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL with escaped Key and Secret encoded. "+
"If only region is specified as a host, proper endpoint will be deduced. Use inmemory:///<table-name> to use a mock in-memory implementation.")
f.Float64Var(&cfg.APILimit, "dynamodb.api-limit", 2.0, "DynamoDB table management requests per second limit.")
f.Var(&cfg.ApplicationAutoScaling, "applicationautoscaling.url", "ApplicationAutoscaling endpoint URL with escaped Key and Secret encoded.")
}

// AWSStorageConfig specifies config for storing data on AWS.
Expand Down Expand Up @@ -806,6 +809,15 @@ func recordDynamoError(tableName string, err error, operation string) {

// dynamoClientFromURL creates a new DynamoDB client from a URL.
func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) {
dynamoDBSession, err := awsSessionFromURL(awsURL)
if err != nil {
return nil, err
}
return dynamodb.New(dynamoDBSession), nil
}

// awsSessionFromURL creates a new aws session from a URL.
func awsSessionFromURL(awsURL *url.URL) (client.ConfigProvider, error) {
if awsURL == nil {
return nil, fmt.Errorf("no URL specified for DynamoDB")
}
Expand All @@ -817,7 +829,7 @@ func dynamoClientFromURL(awsURL *url.URL) (dynamodbiface.DynamoDBAPI, error) {
if err != nil {
return nil, err
}
return dynamodb.New(session.New(config)), nil
return session.New(config), nil
}

// awsConfigFromURL returns AWS config from given URL. It expects escaped AWS Access key ID & Secret Access Key to be
Expand Down
196 changes: 192 additions & 4 deletions pkg/chunk/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,36 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/applicationautoscaling"
"github.com/aws/aws-sdk-go/service/applicationautoscaling/applicationautoscalingiface"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"golang.org/x/time/rate"

"github.com/weaveworks/common/instrument"
"github.com/weaveworks/cortex/pkg/util"
)

const (
autoScalingPolicyNamePrefix = "DynamoScalingPolicy_cortex_"
)

var applicationAutoScalingRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "application_autoscaling_request_duration_seconds",
Help: "Time spent doing ApplicationAutoScaling requests.",

// ApplicationAutoScaling latency seems to range from a few ms to a few sec and is
// important. So use 8 buckets from 128us to 2s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually true, or copy-pasted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied and pasted from another AWS request metric. We will not know for sure what the latency will be until in dev/prod. I believe the requests are non-blocking as they are just setting configs which a watcher is then using later to scale the tables. I would expect the latency to be a standard AWS latency.

I will add a TODO here to check this later.

Buckets: prometheus.ExponentialBuckets(0.000128, 4, 8),
}, []string{"operation", "status_code"})

type dynamoTableClient struct {
DynamoDB dynamodbiface.DynamoDBAPI
limiter *rate.Limiter
DynamoDB dynamodbiface.DynamoDBAPI
ApplicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI
limiter *rate.Limiter
}

// NewDynamoDBTableClient makes a new DynamoTableClient.
Expand All @@ -26,9 +44,20 @@ func NewDynamoDBTableClient(cfg DynamoDBConfig) (TableClient, error) {
if err != nil {
return nil, err
}

var applicationAutoScaling applicationautoscalingiface.ApplicationAutoScalingAPI
if cfg.ApplicationAutoScaling.URL != nil {
session, err := awsSessionFromURL(cfg.ApplicationAutoScaling.URL)
if err != nil {
return nil, err
}
applicationAutoScaling = applicationautoscaling.New(session)
}

return dynamoTableClient{
DynamoDB: dynamoDB,
limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1),
DynamoDB: dynamoDB,
ApplicationAutoScaling: applicationAutoScaling,
limiter: rate.NewLimiter(rate.Limit(cfg.APILimit), 1),
}, nil
}

Expand Down Expand Up @@ -111,6 +140,13 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc TableDesc) erro
return err
}

if desc.WriteScaleEnabled {
err := d.enableAutoScaling(ctx, desc)
if err != nil {
return err
}
}

tags := desc.Tags.AWSTags()
if len(tags) > 0 {
return d.backoffAndRetry(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -157,10 +193,70 @@ func (d dynamoTableClient) DescribeTable(ctx context.Context, name string) (desc
return err
})
})

if d.ApplicationAutoScaling != nil {
err = d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DescribeScalableTargetsWithContext", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
out, err := d.ApplicationAutoScaling.DescribeScalableTargetsWithContext(ctx, &applicationautoscaling.DescribeScalableTargetsInput{
ResourceIds: []*string{aws.String("table/" + desc.Name)},
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
})
switch l := len(out.ScalableTargets); l {
case 0:
return err
case 1:
desc.WriteScaleEnabled = true
desc.WriteScaleRoleARN = *out.ScalableTargets[0].RoleARN
desc.WriteScaleMinCapacity = *out.ScalableTargets[0].MinCapacity
desc.WriteScaleMaxCapacity = *out.ScalableTargets[0].MaxCapacity
return err
default:
return fmt.Errorf("more than one scalable target found for DynamoDB table")
}
})
})

err = d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DescribeScalingPoliciesWithContext", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
out, err := d.ApplicationAutoScaling.DescribeScalingPoliciesWithContext(ctx, &applicationautoscaling.DescribeScalingPoliciesInput{
PolicyNames: []*string{aws.String(autoScalingPolicyNamePrefix + desc.Name)},
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
})
switch l := len(out.ScalingPolicies); l {
case 0:
return err
case 1:
desc.WriteScaleInCooldown = *out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration.ScaleInCooldown
desc.WriteScaleOutCooldown = *out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration.ScaleOutCooldown
desc.WriteScaleTargetValue = *out.ScalingPolicies[0].TargetTrackingScalingPolicyConfiguration.TargetValue
return err
default:
return fmt.Errorf("more than one scaling policy found for DynamoDB table")
}
})
})
}
return
}

func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected TableDesc) error {
disableAutoScaling := current.WriteScaleEnabled && !expected.WriteScaleEnabled
enableAutoScaling := !current.WriteScaleEnabled && expected.WriteScaleEnabled
updateAutoScaling := current.WriteScaleEnabled && expected.WriteScaleEnabled && !current.AutoScalingEquals(expected)
if disableAutoScaling {
err := d.disableAutoScaling(ctx, expected)
if err != nil {
return err
}
} else if enableAutoScaling || updateAutoScaling {
err := d.enableAutoScaling(ctx, expected)
if err != nil {
return err
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but I think I'd write it like this:

var err error
if !current.WriteScaleEnabled {
	if expected.WriteScaleEnabled {
		err = d.enableAutoScaling(ctx, expected)
	}
} else {
	if !expected.WriteScaleEnabled {
		err = d.disableAutoScaling(ctx, expected)
	} else if !current.AutoScalingEquals(expected) {
		err = d.enableAutoScaling(ctx, expected)
	}
}
if err != nil {
	return err
}

But I can't articulate why, and am not sure it's better. WDYT?

Copy link
Contributor Author

@aaron7 aaron7 Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I originally extracted out the expressions into variables to make it clear what each path was for, as it may take some time to get your head around why the combination of flags mean that we should either enable or disable. However, it's not that complex and it's clear which path is update and which one is disable, so I will update.


if current.ProvisionedRead != expected.ProvisionedRead || current.ProvisionedWrite != expected.ProvisionedWrite {
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
Expand Down Expand Up @@ -208,3 +304,95 @@ func (d dynamoTableClient) UpdateTable(ctx context.Context, current, expected Ta
}
return nil
}

func (d dynamoTableClient) enableAutoScaling(ctx context.Context, desc TableDesc) error {
// Registers or updates a scallable target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalable

if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.RegisterScalableTarget", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.RegisterScalableTargetInput{
MinCapacity: aws.Int64(desc.WriteScaleMinCapacity),
MaxCapacity: aws.Int64(desc.WriteScaleMaxCapacity),
ResourceId: aws.String("table/" + desc.Name),
RoleARN: aws.String(desc.WriteScaleRoleARN),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
}
_, err := d.ApplicationAutoScaling.RegisterScalableTarget(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}

// Puts or updates a scaling policy
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.PutScalingPolicy", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.PutScalingPolicyInput{
PolicyName: aws.String(autoScalingPolicyNamePrefix + desc.Name),
PolicyType: aws.String("TargetTrackingScaling"),
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
TargetTrackingScalingPolicyConfiguration: &applicationautoscaling.TargetTrackingScalingPolicyConfiguration{
PredefinedMetricSpecification: &applicationautoscaling.PredefinedMetricSpecification{
PredefinedMetricType: aws.String("DynamoDBWriteCapacityUtilization"),
},
ScaleInCooldown: aws.Int64(desc.WriteScaleInCooldown),
ScaleOutCooldown: aws.Int64(desc.WriteScaleOutCooldown),
TargetValue: aws.Float64(desc.WriteScaleTargetValue),
},
}
_, err := d.ApplicationAutoScaling.PutScalingPolicy(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}
return nil
}

func (d dynamoTableClient) disableAutoScaling(ctx context.Context, desc TableDesc) error {
// Deregister scallable target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalable

if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DeregisterScalableTarget", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.DeregisterScalableTargetInput{
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
}
_, err := d.ApplicationAutoScaling.DeregisterScalableTarget(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}

// Delete scaling policy
if err := d.backoffAndRetry(ctx, func(ctx context.Context) error {
return instrument.TimeRequestHistogram(ctx, "ApplicationAutoScaling.DeleteScalingPolicy", applicationAutoScalingRequestDuration, func(ctx context.Context) error {
input := &applicationautoscaling.DeleteScalingPolicyInput{
PolicyName: aws.String(autoScalingPolicyNamePrefix + desc.Name),
ResourceId: aws.String("table/" + desc.Name),
ScalableDimension: aws.String("dynamodb:table:WriteCapacityUnits"),
ServiceNamespace: aws.String("dynamodb"),
}
_, err := d.ApplicationAutoScaling.DeleteScalingPolicy(input)
if err != nil {
return err
}
return nil
})
}); err != nil {
return err
}
return nil
}
Loading