Skip to content

Commit

Permalink
refactor(provider/kubernetes): refactor delete pods operation (#2210)
Browse files Browse the repository at this point in the history
  • Loading branch information
chlung authored and lwander committed Dec 10, 2017
1 parent 50938ea commit 15a751e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ class KubernetesClientApiAdapter {
new KubernetesClientOperationException("$operation", e)
}

Boolean blockUntilResourceConsistent(Object desired, Closure<Long> getGeneration, Closure getResource) {
def current = getResource()
Boolean blockUntilResourceConsistent(Closure getResource) {
Boolean isPodRunning = getResource()

def wait = RETRY_INITIAL_WAIT_MILLIS
def attempts = 0
while (getGeneration(current) < getGeneration(desired)) {
while (!isPodRunning ) {
attempts += 1
if (attempts > RETRY_COUNT) {
return false
Expand All @@ -101,7 +101,7 @@ class KubernetesClientApiAdapter {
sleep(wait)
wait = [wait * 2, RETRY_MAX_WAIT_MILLIS].min()

current = getResource()
isPodRunning = getResource()
}

return true
Expand Down Expand Up @@ -172,7 +172,7 @@ class KubernetesClientApiAdapter {
}

return list.items
} catch (ApiException ex) {
} catch (ApiException e) {
log.debug(e.message.toString())
}
}
Expand Down Expand Up @@ -215,16 +215,20 @@ class KubernetesClientApiAdapter {
}
}

boolean deleteStatefulSetNotCascade(String name, String namespace, V1DeleteOptions deleteOptions, Boolean orphanDependents, String propagationPolicy) {
void hardDestroyStatefulSet(String name, String namespace, V1DeleteOptions deleteOptions, Boolean orphanDependents, String propagationPolicy) {
exceptionWrapper("statefulSets.delete", "Delete Stateful Set $name", namespace) {
V1Status status
V1beta1StatefulSet statefulSet = getStatefulSet(name, namespace)
resizeStatefulSet(name, namespace, 0)

getPods(namespace, statefulSet.metadata.labels).items.forEach({ item ->
deletePod(item.metadata.name, namespace, null, null, null, true)
})

try {
status = apiInstance.deleteNamespacedStatefulSet(name, namespace, deleteOptions ?: new V1DeleteOptions(), API_CALL_RESULT_FORMAT, TERMINATION_GRACE_PERIOD_SECONDS, orphanDependents, propagationPolicy)
} catch(Exception e) {
apiInstance.deleteNamespacedStatefulSet(name, namespace, deleteOptions ?: new V1DeleteOptions(), API_CALL_RESULT_FORMAT, TERMINATION_GRACE_PERIOD_SECONDS, orphanDependents, propagationPolicy)
} catch (Exception e) {
log.debug(e.message)
}

return (status?.status == "Success" ? true : false )
}
}

Expand Down Expand Up @@ -268,7 +272,7 @@ class KubernetesClientApiAdapter {
String value = entry.getValue()
label = key + "=" + value
}
coreApi.listNamespacedPod(namespace, null, null, null, false, label, null, null, API_CALL_TIMEOUT_SECONDS, false)
coreApi.listNamespacedPod(namespace, null, null, null, false, label, null, null, API_CALL_TIMEOUT_SECONDS,false)
}
}

Expand Down Expand Up @@ -298,41 +302,27 @@ class KubernetesClientApiAdapter {
exceptionWrapper("DaemonSet.get", "Get Daemon Set ${name}", namespace) {
try {
return extApi.readNamespacedDaemonSet(name, namespace, API_CALL_RESULT_FORMAT, true, false)
} catch(Exception e) {
} catch (Exception e) {
log.debug(e.message)
}
return null
}
}

V1beta1DaemonSet deleteDaemonSetPod(String name, String namespace, V1beta1DaemonSet deployedControllerSet) {
exceptionWrapper("statefulSets.create", "Replace Daemon Set ${name}", namespace) {
def nodeSelector = new HashMap<String, String>()
UUID uuid = UUID.randomUUID()

//Use generated random uid for replacing nodeseletor value which can trigger daemonset pod deletion(refer to kubectl)
nodeSelector.put(uuid.toString(), uuid.toString())
deployedControllerSet.spec.template.spec.nodeSelector = nodeSelector

V1beta1DaemonSet daemonSet = extApi.replaceNamespacedDaemonSet(name, namespace, deployedControllerSet, API_CALL_RESULT_FORMAT)

return daemonSet
}
}

boolean hardDestroyDaemonSet(String name, String namespace, V1DeleteOptions deleteoptions, Boolean orphanDependents, String propagationPolicy) {
void hardDestroyDaemonSet(String name, String namespace, V1DeleteOptions deleteoptions, Boolean orphanDependents, String propagationPolicy) {
exceptionWrapper("daemonSets.delete", "Hard Destroy Daemon Set ${name}", namespace) {
def deployedControllerSet = getDaemonSet(name, namespace)
V1beta1DaemonSet daemonset = deleteDaemonSetPod(name, namespace, deployedControllerSet)

V1Status status
try {
status = extApi.deleteNamespacedDaemonSet(name, namespace, deleteoptions ?: new V1DeleteOptions(), API_CALL_RESULT_FORMAT, TERMINATION_GRACE_PERIOD_SECONDS, orphanDependents, propagationPolicy);
} catch(Exception e) {
} catch (Exception e) {
log.debug(e.message)
}

return (status?.status == "Success" ? true : false )
getPods(namespace, deployedControllerSet.metadata.labels).items.forEach({ item ->
deletePod(item.metadata.name, namespace, null, null, null, true)
})
}
}

Expand All @@ -342,5 +332,34 @@ class KubernetesClientApiAdapter {
return result.items.collect { n -> n.getMetadata().getName() }
}
}

void deletePod(String name, String namespace, V1DeleteOptions deleteOptions, Boolean orphanDependents, String propagationPolicy, Boolean force) {
exceptionWrapper("pod.delete", "Destroy Pod ${name}", namespace) {
V1Status status
try {
if (force) {
deleteOptions = new V1DeleteOptions()
deleteOptions.kind = "DeleteOptions"
deleteOptions.apiVersion "v1"
deleteOptions.gracePeriodSeconds = 0
}

status = coreApi.deleteNamespacedPod(name, namespace, deleteOptions ?: new V1DeleteOptions(), API_CALL_RESULT_FORMAT, TERMINATION_GRACE_PERIOD_SECONDS, null, null)
} catch (Exception e) {
log.debug(e.message)
}
}
}

V1Pod getPodStatus(String name, String namespace) {
exceptionWrapper("pods.status", "Ge pod status ${name}", namespace) {
V1Pod pod
try {
pod = coreApi.readNamespacedPodStatus(name, namespace, API_CALL_RESULT_FORMAT)
} catch (Exception e) {
log.debug(e.message)
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.netflix.spinnaker.clouddriver.kubernetes.v1.api.KubernetesApiConverte
import com.netflix.spinnaker.clouddriver.kubernetes.v1.api.KubernetesClientApiConverter
import com.netflix.spinnaker.clouddriver.kubernetes.v1.deploy.description.autoscaler.KubernetesAutoscalerDescription
import com.netflix.spinnaker.clouddriver.kubernetes.v1.deploy.description.servergroup.DeployKubernetesAtomicOperationDescription
import com.netflix.spinnaker.clouddriver.kubernetes.v1.deploy.exception.KubernetesClientOperationException
import com.netflix.spinnaker.clouddriver.kubernetes.v1.deploy.exception.KubernetesOperationException
import com.netflix.spinnaker.clouddriver.kubernetes.v1.deploy.exception.KubernetesResourceNotFoundException
import com.netflix.spinnaker.clouddriver.kubernetes.v1.security.KubernetesV1Credentials
Expand All @@ -34,6 +35,7 @@ import io.fabric8.kubernetes.api.model.extensions.DeploymentBuilder
import io.fabric8.kubernetes.api.model.extensions.DeploymentFluentImpl
import io.fabric8.kubernetes.api.model.extensions.DoneableDeployment
import io.fabric8.kubernetes.api.model.extensions.ReplicaSetBuilder
import io.kubernetes.client.models.V1Pod

class DeployKubernetesAtomicOperation implements AtomicOperation<DeploymentResult> {
private static final String BASE_PHASE = "DEPLOY"
Expand Down Expand Up @@ -198,6 +200,9 @@ class DeployKubernetesAtomicOperation implements AtomicOperation<DeploymentResul
if (deployedControllerSet) {
task.updateStatus BASE_PHASE, "Update stateful set ${controllerName}"
controllerSet = credentials.clientApiAdaptor.replaceStatfulSet(controllerName, namespace, controllerSet)
if (description.updateController?.updateStrategy?.type.name() == "Recreate") {
deletePods(credentials, namespace, controllerSet)
}
} else {
task.updateStatus BASE_PHASE, "Deployed stateful set ${controllerName}"
controllerSet = credentials.clientApiAdaptor.createStatfulSet(namespace, controllerSet)
Expand All @@ -213,7 +218,7 @@ class DeployKubernetesAtomicOperation implements AtomicOperation<DeploymentResul
def autoscaler = KubernetesClientApiConverter.toAutoscaler(new KubernetesAutoscalerDescription(controllerName, description), controllerName, description.kind)

if (credentials.clientApiAdaptor.getAutoscaler(namespace, controllerName)) {
credentials.clientApiAdaptor.deleteAutoscaler(namespace, controllerName,null,null,null)
credentials.clientApiAdaptor.deleteAutoscaler(namespace, controllerName, null, null, null, true)
}
credentials.clientApiAdaptor.createAutoscaler(namespace, autoscaler)
}
Expand All @@ -224,15 +229,14 @@ class DeployKubernetesAtomicOperation implements AtomicOperation<DeploymentResul
def deployDaemonSet(KubernetesV1Credentials credentials, String controllerName, String clusterName, String namespace, boolean canUpdated) {
task.updateStatus BASE_PHASE, "Building daemon set..."
def controllerSet = KubernetesClientApiConverter.toDaemonSet(description, controllerName)

if (canUpdated) {
def deployedControllerSet = credentials.clientApiAdaptor.getDaemonSet(controllerName, namespace)
if (deployedControllerSet) {
task.updateStatus BASE_PHASE, "Update daemon set ${controllerName}"
controllerSet = credentials.clientApiAdaptor.replaceDaemonSet(controllerName, namespace, controllerSet)
if (description.updateController?.updateStrategy?.type.name() == "Recreate") {
deployedControllerSet = credentials.clientApiAdaptor.deleteDaemonSetPod(controllerName, namespace, deployedControllerSet)
deletePods(credentials, namespace, controllerSet)
}
controllerSet = credentials.clientApiAdaptor.replaceDaemonSet(controllerName, namespace, controllerSet)
} else {
task.updateStatus BASE_PHASE, "Deployed daemon set ${controllerName}"
controllerSet = credentials.clientApiAdaptor.createDaemonSet(namespace, controllerSet)
Expand All @@ -244,4 +248,35 @@ class DeployKubernetesAtomicOperation implements AtomicOperation<DeploymentResul

return controllerSet
}

void deletePods(KubernetesV1Credentials credentials, String namespace, def controllerSet) {
Map<String, String> podNameList = new LinkedHashMap<String, String>()

credentials.clientApiAdaptor.getPods(namespace, controllerSet.metadata.labels).items.forEach({ item ->
podNameList.put(item.metadata.name, item.metadata.uid)
})

def getPodState = null
podNameList.toSorted(Map.Entry.comparingByKey().reversed()).forEach ({ k, v ->
credentials.clientApiAdaptor.deletePod(k, namespace, null, null, null, false)

getPodState = {
V1Pod pod = credentials.clientApiAdaptor.getPodStatus(k, namespace)
if (pod) {
if (v != pod.metadata?.uid && pod.status?.phase == "Running") {
return true
}
} else {
if (controllerSet.kind == KubernetesUtil.CONTROLLERS_DAEMONSET_KIND) {
return true
}
}
return false
}

if (!credentials.clientApiAdaptor.blockUntilResourceConsistent(getPodState)) {
throw new KubernetesClientOperationException("Failed to launch a new pod($k) for ServerGroup $controllerSet.metadata.name in $namespace.")
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class DestroyKubernetesAtomicOperation implements AtomicOperation<Void> {
throw new KubernetesOperationException("Failed to delete associated autoscaler $autoscalerName in $namespace.")
}
}
credentials.apiClientAdaptor.deleteStatefulSetNotCascade(serverGroupName, namespace, null, null, null)
credentials.apiClientAdaptor.hardDestroyStatefulSet(serverGroupName, namespace, null, null, null)
break
case KubernetesUtil.CONTROLLERS_DAEMONSET_KIND:
credentials.apiClientAdaptor.hardDestroyDaemonSet(serverGroupName, namespace, null, null, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,7 @@ class ResizeKubernetesAtomicOperation implements AtomicOperation<Void> {
credentials.apiClientAdaptor.resizeStatefulSet(serverGroupName, namespace, size)
}
} else if (description.kind == KubernetesUtil.CONTROLLERS_DAEMONSET_KIND) {
if (size != 0) {
throw new KubernetesOperationException("Only support scale down the DaemoneSet.")
}
def deployedControllerSet = credentials.clientApiAdaptor.getDaemonSet(serverGroupName, namespace)
if (deployedControllerSet) {
credentials.apiClientAdaptor.deleteDaemonSetPod(serverGroupName, namespace, deployedControllerSet)
}
throw new KubernetesOperationException("Not support resizing DaemoneSet.")
}

task.updateStatus BASE_PHASE, "Completed resize operation."
Expand Down

0 comments on commit 15a751e

Please sign in to comment.