Skip to content

Commit

Permalink
fix(canary): gentle canary cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Oct 18, 2017
1 parent c29ec72 commit 9ba2ee2
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.netflix.spinnaker.orca.mine.pipeline
import com.netflix.spinnaker.orca.CancellableStage.Result
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.DestroyServerGroupTask
import com.netflix.spinnaker.orca.clouddriver.utils.OortHelper
import com.netflix.spinnaker.orca.clouddriver.KatoService

import java.util.concurrent.TimeUnit
import com.netflix.frigga.autoscaling.AutoScalingGroupNameBuilder
Expand All @@ -35,11 +36,12 @@ import org.springframework.stereotype.Component
@Component
class CanaryStage implements StageDefinitionBuilder, CancellableStage {
public static final String PIPELINE_CONFIG_TYPE = "canary"

private static Integer SLEEP_MINUTES = 2
@Autowired DeployCanaryStage deployCanaryStage
@Autowired MonitorCanaryStage monitorCanaryStage
@Autowired DestroyServerGroupTask destroyServerGroupTask
@Autowired OortHelper oortHelper
@Autowired KatoService katoService

@Override
def <T extends Execution<T>> List<Stage<T>> aroundStages(Stage<T> stage) {
Expand All @@ -63,12 +65,13 @@ class CanaryStage implements StageDefinitionBuilder, CancellableStage {
log.info("Cancelling stage (stageId: ${stage.id}, executionId: ${stage.execution.id}, context: ${stage.context as Map})")

// it's possible the server groups haven't been created yet, allow a grace period before cleanup
Thread.sleep(TimeUnit.MINUTES.toMillis(2))
Thread.sleep(TimeUnit.MINUTES.toMillis(SLEEP_MINUTES))

return cleanupCanary(stage)
}

protected Result cleanupCanary(Stage stage) {
Collection<Map<String, Object>> disableContexts = []
Collection<Map<String, Object>> destroyContexts = []

stage.context.clusterPairs.each { Map<String, Map> clusterPair ->
Expand All @@ -79,10 +82,11 @@ class CanaryStage implements StageDefinitionBuilder, CancellableStage {
builder.stack = cluster.stack
builder.detail = cluster.freeFormDetails

Map deployedCluster = oortHelper.getCluster(cluster.application, cluster.account, builder.buildGroupName(), cluster.cloudProvider ?: 'aws').orElse([:])
def cloudProvider = cluster.cloudProvider ?: 'aws'
Map deployedCluster = oortHelper.getCluster(cluster.application, cluster.account, builder.buildGroupName(), cloudProvider).orElse([:])
Long start = stage.startTime
// add a small buffer to deal with latency between the cloud provider and Orca
Long createdTimeCutoff = stage.endTime + 5000
Long createdTimeCutoff = (stage.endTime ?: System.currentTimeMillis()) + 5000

List<Map> serverGroups = deployedCluster.serverGroups ?: []

Expand All @@ -93,16 +97,38 @@ class CanaryStage implements StageDefinitionBuilder, CancellableStage {

// really hope they're not running concurrent canaries in the same cluster
matches.each {
disableContexts << [
disableServerGroup: [
serverGroupName: it.name,
region : it.region,
credentials : cluster.account,
cloudProvider : cloudProvider,
remainingEnabledServerGroups: 0,
preferLargerOverNewer : false
]
]
destroyContexts << [
serverGroupName: it.name,
region : it.region,
credentials : cluster.account,
cloudProvider : it.cloudProvider ?: 'aws'
cloudProvider : cloudProvider
]
}
}
}

if (disableContexts) {
try {
katoService.requestOperations(
disableContexts.first().disableServerGroup.cloudProvider,
disableContexts
).toBlocking().first()
Thread.sleep(TimeUnit.MINUTES.toMillis(SLEEP_MINUTES))
} catch (Exception e) {
log.error("Error disabling canary clusters in ${stage.id} with ${disableContexts}", e)
}
}

def destroyResults = destroyContexts.collect {
def destroyStage = new Stage<>()
destroyStage.execution = stage.execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package com.netflix.spinnaker.orca.mine.pipeline

import com.netflix.spinnaker.orca.CancellableStage
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask
import com.netflix.spinnaker.orca.mine.MineService
import com.netflix.spinnaker.orca.mine.tasks.CleanupCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.CompleteCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.DisableCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.MonitorCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.RegisterCanaryTask
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder
import com.netflix.spinnaker.orca.pipeline.TaskNode
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
Expand All @@ -43,6 +46,13 @@ class MonitorCanaryStage implements StageDefinitionBuilder, CancellableStage {
builder
.withTask("registerCanary", RegisterCanaryTask)
.withTask("monitorCanary", MonitorCanaryTask)
.withTask("disableCanaryCluster", DisableCanaryTask)
.withTask("monitorDisable", MonitorKatoTask)
.withTask("waitBeforeCleanup", WaitTask)
.withTask("disableBaselineCluster", DisableCanaryTask)
.withTask("monitorDisable", MonitorKatoTask)
.withTask("waitBeforeCleanup", WaitTask)
.withTask("forceCacheRefresh", ServerGroupCacheForceRefreshTask)
.withTask("cleanupCanary", CleanupCanaryTask)
.withTask("monitorCleanup", MonitorKatoTask)
.withTask("completeCanary", CompleteCanaryTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,26 @@ class DeployedClustersUtil {
}
}.flatten()
}

static List<Map> toKatoAsgOperations(String asgOperationDescription, Map stageContext, String selector) {
stageContext.deployedClusterPairs.findAll { it.canaryStage == stageContext.canaryStageId }.collect {
[it."$selector"].collect {
[
(asgOperationDescription): [
serverGroupName: it.serverGroup, asgName: it.serverGroup, regions: [it.region], region: it.region, credentials: it.clusterAccount ?: it.accountName, cloudProvider: it.type ?: 'aws'
]
]
}
}.flatten()
}

static Map<String, List<String>> getDeployServerGroups(Map stageContext) {
def dSG = [:]
stageContext.deployedClusterPairs.findAll { it.canaryStage == stageContext.canaryStageId }.findAll {
[it.canaryCluster, it.baselineCluster].findAll {
dSG."$it.region" = dSG."$it.region" ? dSG."$it.region" << it.serverGroup : [it.serverGroup]
}
}
return dSG
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.netflix.spinnaker.orca.mine.tasks

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.clouddriver.KatoService
import com.netflix.spinnaker.orca.clouddriver.tasks.AbstractCloudProviderAwareTask
import com.netflix.spinnaker.orca.mine.MineService
import com.netflix.spinnaker.orca.pipeline.model.Stage
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import retrofit.RetrofitError

@Component
@Slf4j
class DisableCanaryTask extends AbstractCloudProviderAwareTask implements Task {

@Autowired MineService mineService
@Autowired KatoService katoService

@Override
TaskResult execute(Stage stage) {

try {
def canary = mineService.getCanary(stage.context.canary.id)
if (canary.health?.health == 'UNHEALTHY') {
// If unhealthy, already disabled in MonitorCanaryTask
return TaskResult.SUCCEEDED
}
} catch (RetrofitError e) {
log.error("Exception occurred while getting canary status with id {} from mine, continuing with disable",
stage.context.canary.id, e)
}

def selector = stage.context.containsKey('disabledCluster') ? 'baselineCluster' : 'canaryCluster'
def ops = DeployedClustersUtil.toKatoAsgOperations('disableServerGroup', stage.context, selector)
def dSG = DeployedClustersUtil.getDeployServerGroups(stage.context)

log.info "Disabling ${selector} in ${stage.id} with ${ops}"
String cloudProvider = ops && !ops.empty ? ops.first()?.values().first()?.cloudProvider : getCloudProvider(stage) ?: 'aws'
def taskId = katoService.requestOperations(cloudProvider, ops).toBlocking().first()

stage.context.remove('waitTaskState')
return new TaskResult(ExecutionStatus.SUCCEEDED, [
'kato.last.task.id' : taskId,
'deploy.server.groups' : dSG,
disabledCluster : selector,
waitTime : 120 // Seconds
])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ class RegisterCanaryTask implements Task {
def outputs = [
canary : canary,
stageTimeoutMs : getMonitorTimeout(canary),
deployedClusterPairs: deployStage.context.deployedClusterPairs
deployedClusterPairs: deployStage.context.deployedClusterPairs,
application : c.application
]

if (deployStage.context.deployedClusterPairs[0]?.canaryCluster?.accountName) {
outputs.account = deployStage.context.deployedClusterPairs[0].canaryCluster.accountName
}

return new TaskResult(ExecutionStatus.SUCCEEDED, outputs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package com.netflix.spinnaker.orca.mine.pipeline

import com.netflix.spinnaker.orca.CancellableStage.Result
import com.netflix.spinnaker.orca.clouddriver.model.TaskId
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.DestroyServerGroupTask
import com.netflix.spinnaker.orca.clouddriver.KatoService
import com.netflix.spinnaker.orca.clouddriver.utils.OortHelper
import com.netflix.spinnaker.orca.pipeline.model.Stage
import rx.Observable
import spock.lang.Specification
import spock.lang.Unroll

import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.stage

class CanaryStageSpec extends Specification {
Expand All @@ -38,16 +42,39 @@ class CanaryStageSpec extends Specification {
]
]

def disableOperation = [
[disableServerGroup:
[
serverGroupName: "app-stack1-baseline-v003", region: "us-east-1", credentials: "test",
cloudProvider : "aws", remainingEnabledServerGroups: 0, preferLargerOverNewer: false
]
],
[disableServerGroup:
[
serverGroupName: "app-stack1-canary-v003", region: "us-east-1", credentials: "test",
cloudProvider : "aws", remainingEnabledServerGroups: 0, preferLargerOverNewer: false
]
]
]

TaskId taskId = new TaskId(UUID.randomUUID().toString())

Stage canceledStage = stage {
context = stageContext
startTime = 5
endTime = 10
}

OortHelper oortHelper = Mock(OortHelper)
KatoService katoService = Mock(KatoService)
DestroyServerGroupTask destroyServerGroupTask = Mock(DestroyServerGroupTask)

CanaryStage canaryStage = new CanaryStage(oortHelper: oortHelper, destroyServerGroupTask: destroyServerGroupTask)
CanaryStage canaryStage = new CanaryStage(
oortHelper: oortHelper,
katoService: katoService,
destroyServerGroupTask: destroyServerGroupTask,
SLEEP_MINUTES: 0
)

when:
Result result = canaryStage.cleanupCanary(canceledStage)
Expand All @@ -61,14 +88,16 @@ class CanaryStageSpec extends Specification {
serverGroups: [[region: "us-east-1", createdTime: createdTime, name: "app-stack1-canary-v003"]]
]

disableOps * katoService.requestOperations("aws", disableOperation) >> { Observable.from(taskId) }

where:
createdTime || destroyedServerGroups
4 || 0
5 || 0
6 || 2
10 || 2
5010 || 0
5011 || 0
createdTime | disableOps | destroyedServerGroups
4 | 0 | 0
5 | 0 | 0
6 | 1 | 2
10 | 1 | 2
5010 | 0 | 0
5011 | 0 | 0

}

Expand Down

0 comments on commit 9ba2ee2

Please sign in to comment.