diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 1ee8a9d553..d1ad708902 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -1,6 +1,7 @@ package awsecs import ( + "fmt" "strings" "sync" "time" @@ -20,6 +21,8 @@ const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it w type EcsClient interface { // Returns a EcsInfo struct containing data needed for a report. GetInfo([]string) EcsInfo + // Scales a service up or down by amount + ScaleService(string, int) error } // actual implementation @@ -379,3 +382,27 @@ func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo { return info } + +// Implements EcsClient.ScaleService +func (c ecsClientImpl) ScaleService(serviceName string, amount int) error { + // Note this is inherently racey, due to needing to get, modify, then update the DesiredCount. + + // refresh service in cache + c.describeServices([]string{serviceName}) + // now check the cache to see if it worked + service, ok := c.getCachedService(serviceName) + if !ok { + return fmt.Errorf("Service %s not found", serviceName) + } + + newCount := service.DesiredCount + int64(amount) + if newCount < 1 { + return fmt.Errorf("Cannot reduce count below one") + } + _, err := c.client.UpdateService(&ecs.UpdateServiceInput{ + Cluster: &c.cluster, + Service: &serviceName, + DesiredCount: &newCount, + }) + return err +} diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 23cbc4e113..5e4849ce0a 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -5,6 +5,8 @@ import ( "time" log "github.com/Sirupsen/logrus" + "github.com/weaveworks/scope/common/xfer" + "github.com/weaveworks/scope/probe/controls" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) @@ -16,6 +18,8 @@ const ( TaskFamily = "ecs_task_family" ServiceDesiredCount = "ecs_service_desired_count" ServiceRunningCount = "ecs_service_running_count" + ScaleUp = "ecs_scale_up" + ScaleDown = "ecs_scale_down" ) var ( @@ -83,15 +87,40 @@ type Reporter struct { ClientsByCluster map[string]EcsClient // Exported for test cacheSize int cacheExpiry time.Duration + handlerRegistry *controls.HandlerRegistry + probeID string } // Make creates a new Reporter -func Make(cacheSize int, cacheExpiry time.Duration) Reporter { - return Reporter{ +func Make(cacheSize int, cacheExpiry time.Duration, handlerRegistry *controls.HandlerRegistry, probeID string) Reporter { + r := Reporter{ ClientsByCluster: map[string]EcsClient{}, cacheSize: cacheSize, cacheExpiry: cacheExpiry, + handlerRegistry: handlerRegistry, + probeID: probeID, } + + handlerRegistry.Batch(nil, map[string]xfer.ControlHandlerFunc{ + ScaleUp: r.controlScaleUp, + ScaleDown: r.controlScaleDown, + }) + + return r +} + +func (r Reporter) getClient(cluster string) (EcsClient, error) { + client, ok := r.ClientsByCluster[cluster] + if !ok { + log.Debugf("Creating new ECS client") + var err error + client, err = newClient(cluster, r.cacheSize, r.cacheExpiry) + if err != nil { + return nil, err + } + r.ClientsByCluster[cluster] = client + } + return client, nil } // Tag needed for Tagger @@ -103,15 +132,9 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { for cluster, taskMap := range clusterMap { log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap)) - client, ok := r.ClientsByCluster[cluster] - if !ok { - log.Debugf("Creating new ECS client") - var err error - client, err = newClient(cluster, r.cacheSize, r.cacheExpiry) - if err != nil { - return rpt, err - } - r.ClientsByCluster[cluster] = client + client, err := r.getClient(cluster) + if err != nil { + return rpt, nil } taskArns := make([]string, 0, len(taskMap)) @@ -126,9 +149,15 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { for serviceName, service := range ecsInfo.Services { serviceID := report.MakeECSServiceNodeID(cluster, serviceName) rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{ - Cluster: cluster, - ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount), - ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount), + Cluster: cluster, + ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount), + ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount), + report.ControlProbeID: r.probeID, + }).WithLatestControls(map[string]report.NodeControlData{ + ScaleUp: {Dead: false}, + // We've decided for now to disable ScaleDown when only 1 task is desired, + // since scaling down to 0 would cause the service to disappear (#2085) + ScaleDown: {Dead: service.DesiredCount <= 1}, })) } log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services)) @@ -176,6 +205,20 @@ func (Reporter) Report() (report.Report, error) { taskTopology := report.MakeTopology().WithMetadataTemplates(taskMetadata) result.ECSTask = result.ECSTask.Merge(taskTopology) serviceTopology := report.MakeTopology().WithMetadataTemplates(serviceMetadata) + serviceTopology.Controls.AddControls([]report.Control{ + { + ID: ScaleDown, + Human: "Scale Down", + Icon: "fa-minus", + Rank: 0, + }, + { + ID: ScaleUp, + Human: "Scale Up", + Icon: "fa-plus", + Rank: 1, + }, + }) result.ECSService = result.ECSService.Merge(serviceTopology) return result, nil } @@ -184,3 +227,31 @@ func (Reporter) Report() (report.Report, error) { func (r Reporter) Name() string { return "awsecs" } + +// Stop unregisters controls. +func (r *Reporter) Stop() { + r.handlerRegistry.Batch([]string{ + ScaleUp, + ScaleDown, + }, nil) +} + +func (r *Reporter) controlScaleUp(req xfer.Request) xfer.Response { + return xfer.ResponseError(r.controlScale(req, 1)) +} + +func (r *Reporter) controlScaleDown(req xfer.Request) xfer.Response { + return xfer.ResponseError(r.controlScale(req, -1)) +} + +func (r *Reporter) controlScale(req xfer.Request, amount int) error { + cluster, serviceName, ok := report.ParseECSServiceNodeID(req.NodeID) + if !ok { + return fmt.Errorf("Bad node ID") + } + client, err := r.getClient(cluster) + if err != nil { + return err + } + return client.ScaleService(serviceName, amount) +} diff --git a/probe/awsecs/reporter_test.go b/probe/awsecs/reporter_test.go index 6f00f0c6ef..91c86d5c90 100644 --- a/probe/awsecs/reporter_test.go +++ b/probe/awsecs/reporter_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/weaveworks/scope/probe/awsecs" + "github.com/weaveworks/scope/probe/controls" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) @@ -36,7 +37,8 @@ func getTestContainerNode() report.Node { } func TestGetLabelInfo(t *testing.T) { - r := awsecs.Make(1e6, time.Hour) + hr := controls.NewDefaultHandlerRegistry() + r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id") rpt, err := r.Report() if err != nil { t.Fatalf("Error making report: %v", err) @@ -84,8 +86,13 @@ func (c mockEcsClient) GetInfo(taskARNs []string) awsecs.EcsInfo { return c.info } +func (c mockEcsClient) ScaleService(serviceName string, amount int) error { + return nil +} + func TestTagReport(t *testing.T) { - r := awsecs.Make(1e6, time.Hour) + hr := controls.NewDefaultHandlerRegistry() + r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id") r.ClientsByCluster[testCluster] = newMockEcsClient( t, diff --git a/prog/probe.go b/prog/probe.go index 72ce3f38c9..cfd074c464 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -220,7 +220,8 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } if flags.ecsEnabled { - reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry) + reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, handlerRegistry, probeID) + defer reporter.Stop() p.AddReporter(reporter) p.AddTagger(reporter) }