Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ECS Service scale up/down controls #2197

Merged
merged 3 commits into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions probe/awsecs/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package awsecs

import (
"fmt"
"strings"
"sync"
"time"
Expand All @@ -20,6 +21,8 @@ const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it w
type EcsClient interface {
// Returns a EcsInfo struct containing data needed for a report.
GetInfo([]string) EcsInfo
// Scales a service up or down by amount
ScaleService(string, int) error
}

// actual implementation
Expand Down Expand Up @@ -379,3 +382,27 @@ func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo {

return info
}

// Implements EcsClient.ScaleService
func (c ecsClientImpl) ScaleService(serviceName string, amount int) error {
// Note this is inherently racey, due to needing to get, modify, then update the DesiredCount.

// refresh service in cache
c.describeServices([]string{serviceName})
// now check the cache to see if it worked
service, ok := c.getCachedService(serviceName)
if !ok {
return fmt.Errorf("Service %s not found", serviceName)
}

newCount := service.DesiredCount + int64(amount)
if newCount < 1 {
return fmt.Errorf("Cannot reduce count below one")
}
_, err := c.client.UpdateService(&ecs.UpdateServiceInput{
Cluster: &c.cluster,
Service: &serviceName,
DesiredCount: &newCount,
})
return err
}
99 changes: 85 additions & 14 deletions probe/awsecs/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand All @@ -16,6 +18,8 @@ const (
TaskFamily = "ecs_task_family"
ServiceDesiredCount = "ecs_service_desired_count"
ServiceRunningCount = "ecs_service_running_count"
ScaleUp = "ecs_scale_up"
ScaleDown = "ecs_scale_down"
)

var (
Expand Down Expand Up @@ -83,15 +87,40 @@ type Reporter struct {
ClientsByCluster map[string]EcsClient // Exported for test
cacheSize int
cacheExpiry time.Duration
handlerRegistry *controls.HandlerRegistry
probeID string
}

// Make creates a new Reporter
func Make(cacheSize int, cacheExpiry time.Duration) Reporter {
return Reporter{
func Make(cacheSize int, cacheExpiry time.Duration, handlerRegistry *controls.HandlerRegistry, probeID string) Reporter {
r := Reporter{
ClientsByCluster: map[string]EcsClient{},
cacheSize: cacheSize,
cacheExpiry: cacheExpiry,
handlerRegistry: handlerRegistry,
probeID: probeID,
}

handlerRegistry.Batch(nil, map[string]xfer.ControlHandlerFunc{
ScaleUp: r.controlScaleUp,
ScaleDown: r.controlScaleDown,
})

return r
}

func (r Reporter) getClient(cluster string) (EcsClient, error) {
client, ok := r.ClientsByCluster[cluster]
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster, r.cacheSize, r.cacheExpiry)
if err != nil {
return nil, err
}
r.ClientsByCluster[cluster] = client
}
return client, nil
}

// Tag needed for Tagger
Expand All @@ -103,15 +132,9 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
for cluster, taskMap := range clusterMap {
log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap))

client, ok := r.ClientsByCluster[cluster]
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster, r.cacheSize, r.cacheExpiry)
if err != nil {
return rpt, err
}
r.ClientsByCluster[cluster] = client
client, err := r.getClient(cluster)
if err != nil {
return rpt, nil
}

taskArns := make([]string, 0, len(taskMap))
Expand All @@ -126,9 +149,15 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
for serviceName, service := range ecsInfo.Services {
serviceID := report.MakeECSServiceNodeID(cluster, serviceName)
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
report.ControlProbeID: r.probeID,
}).WithLatestControls(map[string]report.NodeControlData{
ScaleUp: {Dead: false},
// We've decided for now to disable ScaleDown when only 1 task is desired,
// since scaling down to 0 would cause the service to disappear (#2085)
ScaleDown: {Dead: service.DesiredCount <= 1},

This comment was marked as abuse.

This comment was marked as abuse.

}))
}
log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services))
Expand Down Expand Up @@ -176,6 +205,20 @@ func (Reporter) Report() (report.Report, error) {
taskTopology := report.MakeTopology().WithMetadataTemplates(taskMetadata)
result.ECSTask = result.ECSTask.Merge(taskTopology)
serviceTopology := report.MakeTopology().WithMetadataTemplates(serviceMetadata)
serviceTopology.Controls.AddControls([]report.Control{
{
ID: ScaleDown,
Human: "Scale Down",
Icon: "fa-minus",
Rank: 0,
},
{
ID: ScaleUp,
Human: "Scale Up",
Icon: "fa-plus",
Rank: 1,
},
})
result.ECSService = result.ECSService.Merge(serviceTopology)
return result, nil
}
Expand All @@ -184,3 +227,31 @@ func (Reporter) Report() (report.Report, error) {
func (r Reporter) Name() string {
return "awsecs"
}

// Stop unregisters controls.
func (r *Reporter) Stop() {
r.handlerRegistry.Batch([]string{
ScaleUp,
ScaleDown,
}, nil)
}

func (r *Reporter) controlScaleUp(req xfer.Request) xfer.Response {
return xfer.ResponseError(r.controlScale(req, 1))
}

func (r *Reporter) controlScaleDown(req xfer.Request) xfer.Response {
return xfer.ResponseError(r.controlScale(req, -1))
}

func (r *Reporter) controlScale(req xfer.Request, amount int) error {
cluster, serviceName, ok := report.ParseECSServiceNodeID(req.NodeID)
if !ok {
return fmt.Errorf("Bad node ID")
}
client, err := r.getClient(cluster)
if err != nil {
return err
}
return client.ScaleService(serviceName, amount)
}
11 changes: 9 additions & 2 deletions probe/awsecs/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/weaveworks/scope/probe/awsecs"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand Down Expand Up @@ -36,7 +37,8 @@ func getTestContainerNode() report.Node {
}

func TestGetLabelInfo(t *testing.T) {
r := awsecs.Make(1e6, time.Hour)
hr := controls.NewDefaultHandlerRegistry()
r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id")
rpt, err := r.Report()
if err != nil {
t.Fatalf("Error making report: %v", err)
Expand Down Expand Up @@ -84,8 +86,13 @@ func (c mockEcsClient) GetInfo(taskARNs []string) awsecs.EcsInfo {
return c.info
}

func (c mockEcsClient) ScaleService(serviceName string, amount int) error {
return nil
}

func TestTagReport(t *testing.T) {
r := awsecs.Make(1e6, time.Hour)
hr := controls.NewDefaultHandlerRegistry()
r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id")

r.ClientsByCluster[testCluster] = newMockEcsClient(
t,
Expand Down
3 changes: 2 additions & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}

if flags.ecsEnabled {
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry)
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, handlerRegistry, probeID)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
}
Expand Down