Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(canary): gentle canary cleanup #1711

Merged
merged 1 commit into from
Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package com.netflix.spinnaker.orca.mine.pipeline

import com.netflix.spinnaker.orca.CancellableStage.Result
import com.netflix.spinnaker.orca.RetrySupport
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.DestroyServerGroupTask
import com.netflix.spinnaker.orca.clouddriver.utils.OortHelper
import com.netflix.spinnaker.orca.clouddriver.KatoService

import java.util.concurrent.TimeUnit
import com.netflix.frigga.autoscaling.AutoScalingGroupNameBuilder
Expand All @@ -35,11 +37,13 @@ import org.springframework.stereotype.Component
@Component
class CanaryStage implements StageDefinitionBuilder, CancellableStage {
public static final String PIPELINE_CONFIG_TYPE = "canary"

public static final Integer DEFAULT_CLUSTER_DISABLE_WAIT_TIME = 180
@Autowired DeployCanaryStage deployCanaryStage
@Autowired MonitorCanaryStage monitorCanaryStage
@Autowired DestroyServerGroupTask destroyServerGroupTask
@Autowired OortHelper oortHelper
@Autowired KatoService katoService
@Autowired RetrySupport retrySupport

@Override
def <T extends Execution<T>> List<Stage<T>> aroundStages(Stage<T> stage) {
Expand All @@ -60,17 +64,11 @@ class CanaryStage implements StageDefinitionBuilder, CancellableStage {

@Override
Result cancel(Stage stage) {
log.info("Cancelling stage (stageId: ${stage.id}, executionId: ${stage.execution.id}, context: ${stage.context as Map})")

// it's possible the server groups haven't been created yet, allow a grace period before cleanup
Thread.sleep(TimeUnit.MINUTES.toMillis(2))

return cleanupCanary(stage)
}

protected Result cleanupCanary(Stage stage) {
Collection<Map<String, Object>> disableContexts = []
Collection<Map<String, Object>> destroyContexts = []

log.info("Cancelling stage (stageId: ${stage.id}, executionId: ${stage.execution.id}, context: ${stage.context as Map})")

stage.context.clusterPairs.each { Map<String, Map> clusterPair ->
[clusterPair.baseline, clusterPair.canary].each { Map<String, String> cluster ->

Expand All @@ -79,10 +77,18 @@ class CanaryStage implements StageDefinitionBuilder, CancellableStage {
builder.stack = cluster.stack
builder.detail = cluster.freeFormDetails

Map deployedCluster = oortHelper.getCluster(cluster.application, cluster.account, builder.buildGroupName(), cluster.cloudProvider ?: 'aws').orElse([:])
def cloudProvider = cluster.cloudProvider ?: 'aws'
// it's possible the server groups haven't been created yet, retry with backoff before cleanup
Map<String, Object> deployedCluster = [:]
retrySupport.retry({
deployedCluster = oortHelper.getCluster(cluster.application, cluster.account, builder.buildGroupName(), cloudProvider).orElse([:])
if (deployedCluster.serverGroups == null || deployedCluster.serverGroups?.size() == 0) {
throw new IllegalStateException("Expected serverGroup matching cluster {$cluster}")
}
}, 8, TimeUnit.SECONDS.toMillis(15), false)
Long start = stage.startTime
// add a small buffer to deal with latency between the cloud provider and Orca
Long createdTimeCutoff = stage.endTime + 5000
Long createdTimeCutoff = (stage.endTime ?: System.currentTimeMillis()) + 5000

List<Map> serverGroups = deployedCluster.serverGroups ?: []

Expand All @@ -93,16 +99,40 @@ class CanaryStage implements StageDefinitionBuilder, CancellableStage {

// really hope they're not running concurrent canaries in the same cluster
matches.each {
disableContexts << [
disableServerGroup: [
serverGroupName: it.name,
region : it.region,
credentials : cluster.account,
cloudProvider : cloudProvider,
remainingEnabledServerGroups: 0,
preferLargerOverNewer : false
]
]
destroyContexts << [
serverGroupName: it.name,
region : it.region,
credentials : cluster.account,
cloudProvider : it.cloudProvider ?: 'aws'
cloudProvider : cloudProvider
]
}
}
}

if (disableContexts) {
try {
katoService.requestOperations(
disableContexts.first().disableServerGroup.cloudProvider,
disableContexts
).toBlocking().first()
Thread.sleep(TimeUnit.SECONDS.toMillis(
stage.context.clusterDisableWaitTime != null ? stage.context.clusterDisableWaitTime : DEFAULT_CLUSTER_DISABLE_WAIT_TIME)
)
} catch (Exception e) {
log.error("Error disabling canary clusters in ${stage.id} with ${disableContexts}", e)
}
}

def destroyResults = destroyContexts.collect {
def destroyStage = new Stage<>()
destroyStage.execution = stage.execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package com.netflix.spinnaker.orca.mine.pipeline

import com.netflix.spinnaker.orca.CancellableStage
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.ServerGroupCacheForceRefreshTask
import com.netflix.spinnaker.orca.mine.MineService
import com.netflix.spinnaker.orca.mine.tasks.CleanupCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.CompleteCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.DisableCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.MonitorCanaryTask
import com.netflix.spinnaker.orca.mine.tasks.RegisterCanaryTask
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder
import com.netflix.spinnaker.orca.pipeline.TaskNode
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
Expand All @@ -43,6 +46,13 @@ class MonitorCanaryStage implements StageDefinitionBuilder, CancellableStage {
builder
.withTask("registerCanary", RegisterCanaryTask)
.withTask("monitorCanary", MonitorCanaryTask)
.withTask("disableCanaryCluster", DisableCanaryTask)
.withTask("monitorDisable", MonitorKatoTask)
.withTask("waitBeforeCleanup", WaitTask)
.withTask("disableBaselineCluster", DisableCanaryTask)
.withTask("monitorDisable", MonitorKatoTask)
.withTask("waitBeforeCleanup", WaitTask)
.withTask("forceCacheRefresh", ServerGroupCacheForceRefreshTask)
.withTask("cleanupCanary", CleanupCanaryTask)
.withTask("monitorCleanup", MonitorKatoTask)
.withTask("completeCanary", CompleteCanaryTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,26 @@ class DeployedClustersUtil {
}
}.flatten()
}

static List<Map> toKatoAsgOperations(String asgOperationDescription, Map stageContext, String selector) {
stageContext.deployedClusterPairs.findAll { it.canaryStage == stageContext.canaryStageId }.collect {
[it."$selector"].collect {
[
(asgOperationDescription): [
serverGroupName: it.serverGroup, asgName: it.serverGroup, regions: [it.region], region: it.region, credentials: it.clusterAccount ?: it.accountName, cloudProvider: it.type ?: 'aws'
]
]
}
}.flatten()
}

static Map<String, List<String>> getDeployServerGroups(Map stageContext) {
def dSG = [:]
stageContext.deployedClusterPairs.findAll { it.canaryStage == stageContext.canaryStageId }.findAll {
[it.canaryCluster, it.baselineCluster].findAll {
dSG."$it.region" = dSG."$it.region" ? dSG."$it.region" << it.serverGroup : [it.serverGroup]
}
}
return dSG
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.netflix.spinnaker.orca.mine.tasks

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.clouddriver.KatoService
import com.netflix.spinnaker.orca.clouddriver.tasks.AbstractCloudProviderAwareTask
import com.netflix.spinnaker.orca.mine.MineService
import com.netflix.spinnaker.orca.pipeline.model.Stage
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import retrofit.RetrofitError
import static com.netflix.spinnaker.orca.mine.pipeline.CanaryStage.DEFAULT_CLUSTER_DISABLE_WAIT_TIME

@Component
@Slf4j
class DisableCanaryTask extends AbstractCloudProviderAwareTask implements Task {

@Autowired MineService mineService
@Autowired KatoService katoService

@Override
TaskResult execute(Stage stage) {

Integer waitTime = stage.context.clusterDisableWaitTime != null ? stage.context.clusterDisableWaitTime : DEFAULT_CLUSTER_DISABLE_WAIT_TIME

try {
def canary = mineService.getCanary(stage.context.canary.id)
if (canary.health?.health == 'UNHEALTHY') {
// If unhealthy, already disabled in MonitorCanaryTask
return TaskResult.SUCCEEDED
}
} catch (RetrofitError e) {
log.error("Exception occurred while getting canary status with id {} from mine, continuing with disable",
stage.context.canary.id, e)
}

def selector = stage.context.containsKey('disabledCluster') ? 'baselineCluster' : 'canaryCluster'
def ops = DeployedClustersUtil.toKatoAsgOperations('disableServerGroup', stage.context, selector)
def dSG = DeployedClustersUtil.getDeployServerGroups(stage.context)

log.info "Disabling ${selector} in ${stage.id} with ${ops}"
String cloudProvider = ops && !ops.empty ? ops.first()?.values().first()?.cloudProvider : getCloudProvider(stage) ?: 'aws'
def taskId = katoService.requestOperations(cloudProvider, ops).toBlocking().first()

stage.context.remove('waitTaskState')
return new TaskResult(ExecutionStatus.SUCCEEDED, [
'kato.last.task.id' : taskId,
'deploy.server.groups' : dSG,
disabledCluster : selector,
waitTime : waitTime
])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ class RegisterCanaryTask implements Task {
def outputs = [
canary : canary,
stageTimeoutMs : getMonitorTimeout(canary),
deployedClusterPairs: deployStage.context.deployedClusterPairs
deployedClusterPairs: deployStage.context.deployedClusterPairs,
application : c.application
]

if (deployStage.context.deployedClusterPairs[0]?.canaryCluster?.accountName) {
outputs.account = deployStage.context.deployedClusterPairs[0].canaryCluster.accountName
}

return new TaskResult(ExecutionStatus.SUCCEEDED, outputs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
package com.netflix.spinnaker.orca.mine.pipeline

import com.netflix.spinnaker.orca.CancellableStage.Result
import com.netflix.spinnaker.orca.RetrySupport
import com.netflix.spinnaker.orca.clouddriver.model.TaskId
import com.netflix.spinnaker.orca.clouddriver.tasks.servergroup.DestroyServerGroupTask
import com.netflix.spinnaker.orca.clouddriver.KatoService
import com.netflix.spinnaker.orca.clouddriver.utils.OortHelper
import com.netflix.spinnaker.orca.pipeline.model.Stage
import rx.Observable
import spock.lang.Specification
import spock.lang.Unroll

import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.stage

class CanaryStageSpec extends Specification {
Expand All @@ -30,6 +35,7 @@ class CanaryStageSpec extends Specification {
void "cancel destroys canary/baseline if found and were deployed during canary stage"() {
given:
Map stageContext = [
clusterDisableWaitTime: 0,
clusterPairs: [
[
baseline: [application: "app", stack: "stack1", freeFormDetails: "baseline", region: "us-east-1", account: "test"],
Expand All @@ -38,19 +44,44 @@ class CanaryStageSpec extends Specification {
]
]

def disableOperation = [
[disableServerGroup:
[
serverGroupName: "app-stack1-baseline-v003", region: "us-east-1", credentials: "test",
cloudProvider : "aws", remainingEnabledServerGroups: 0, preferLargerOverNewer: false
]
],
[disableServerGroup:
[
serverGroupName: "app-stack1-canary-v003", region: "us-east-1", credentials: "test",
cloudProvider : "aws", remainingEnabledServerGroups: 0, preferLargerOverNewer: false
]
]
]

TaskId taskId = new TaskId(UUID.randomUUID().toString())

Stage canceledStage = stage {
context = stageContext
startTime = 5
endTime = 10
}

OortHelper oortHelper = Mock(OortHelper)
KatoService katoService = Mock(KatoService)
DestroyServerGroupTask destroyServerGroupTask = Mock(DestroyServerGroupTask)

CanaryStage canaryStage = new CanaryStage(oortHelper: oortHelper, destroyServerGroupTask: destroyServerGroupTask)
CanaryStage canaryStage = new CanaryStage(
oortHelper: oortHelper,
katoService: katoService,
destroyServerGroupTask: destroyServerGroupTask,
retrySupport: Spy(RetrySupport) {
_ * sleep(_) >> { /* do nothing */ }
}
)

when:
Result result = canaryStage.cleanupCanary(canceledStage)
Result result = canaryStage.cancel(canceledStage)

then:
result.details.destroyContexts.size() == destroyedServerGroups
Expand All @@ -61,14 +92,16 @@ class CanaryStageSpec extends Specification {
serverGroups: [[region: "us-east-1", createdTime: createdTime, name: "app-stack1-canary-v003"]]
]

disableOps * katoService.requestOperations("aws", disableOperation) >> { Observable.from(taskId) }

where:
createdTime || destroyedServerGroups
4 || 0
5 || 0
6 || 2
10 || 2
5010 || 0
5011 || 0
createdTime | disableOps | destroyedServerGroups
4 | 0 | 0
5 | 0 | 0
6 | 1 | 2
10 | 1 | 2
5010 | 0 | 0
5011 | 0 | 0

}

Expand Down