Skip to content

Commit

Permalink
[FLINK-32529] Add startup probe for JM deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jul 10, 2023
1 parent f214262 commit 5ebfc5e
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 188 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<td>Duration</td>
<td>Time after which jobmanager pods of terminal application deployments are shut down.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.jm-deployment.startup.probe.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Enable job manager startup probe to allow detecting when the jobmanager could not submit the job.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.restart.failed</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
<td>Duration</td>
<td>Time after which jobmanager pods of terminal application deployments are shut down.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.jm-deployment.startup.probe.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Enable job manager startup probe to allow detecting when the jobmanager could not submit the job.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.job.restart.failed</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.Resource;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.utils.Constants;
Expand Down Expand Up @@ -190,11 +191,56 @@ protected FlinkConfigBuilder applyLogConfiguration() throws IOException {
return this;
}

protected FlinkConfigBuilder applyCommonPodTemplate() throws IOException {
if (spec.getPodTemplate() != null) {
effectiveConfig.setString(
"kubernetes.pod-template-file", createTempFile(spec.getPodTemplate()));
protected FlinkConfigBuilder applyPodTemplate() throws IOException {
Pod commonPodTemplate = spec.getPodTemplate();
boolean mergeByName =
effectiveConfig.get(KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME);

Pod jmPodTemplate;
if (spec.getJobManager() != null) {
jmPodTemplate =
mergePodTemplates(
commonPodTemplate, spec.getJobManager().getPodTemplate(), mergeByName);

jmPodTemplate =
applyResourceToPodTemplate(jmPodTemplate, spec.getJobManager().getResource());
} else {
jmPodTemplate = ReconciliationUtils.clone(commonPodTemplate);
}

if (effectiveConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED)) {
if (jmPodTemplate == null) {
jmPodTemplate = new Pod();
}
FlinkUtils.addStartupProbe(jmPodTemplate);
}

String jmTemplateFile = null;
if (jmPodTemplate != null) {
jmTemplateFile = createTempFile(jmPodTemplate);
effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, jmTemplateFile);
}

Pod tmPodTemplate;
if (spec.getTaskManager() != null) {
tmPodTemplate =
mergePodTemplates(
commonPodTemplate, spec.getTaskManager().getPodTemplate(), mergeByName);
tmPodTemplate =
applyResourceToPodTemplate(tmPodTemplate, spec.getTaskManager().getResource());
} else {
tmPodTemplate = ReconciliationUtils.clone(commonPodTemplate);
}

if (tmPodTemplate != null) {
effectiveConfig.set(
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
tmPodTemplate.equals(jmPodTemplate)
? jmTemplateFile
: createTempFile(tmPodTemplate));
}

return this;
}

Expand All @@ -216,15 +262,9 @@ protected FlinkConfigBuilder applyServiceAccount() {
return this;
}

protected FlinkConfigBuilder applyJobManagerSpec() throws IOException {
protected FlinkConfigBuilder applyJobManagerSpec() {
if (spec.getJobManager() != null) {
setResource(spec.getJobManager().getResource(), effectiveConfig, true);
setPodTemplate(
spec.getPodTemplate(),
spec.getJobManager().getPodTemplate(),
spec.getJobManager().getResource(),
effectiveConfig,
true);
if (spec.getJobManager().getReplicas() > 0) {
effectiveConfig.set(
KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS,
Expand All @@ -234,16 +274,9 @@ protected FlinkConfigBuilder applyJobManagerSpec() throws IOException {
return this;
}

protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
protected FlinkConfigBuilder applyTaskManagerSpec() {
if (spec.getTaskManager() != null) {
setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
setPodTemplate(
spec.getPodTemplate(),
spec.getTaskManager().getPodTemplate(),
spec.getTaskManager().getResource(),
effectiveConfig,
false);

if (spec.getTaskManager().getReplicas() != null
&& spec.getTaskManager().getReplicas() > 0) {
effectiveConfig.set(
Expand Down Expand Up @@ -369,7 +402,7 @@ public static Configuration buildFrom(
.applyImage()
.applyImagePullPolicy()
.applyServiceAccount()
.applyCommonPodTemplate()
.applyPodTemplate()
.applyIngressDomain()
.applyJobManagerSpec()
.applyTaskManagerSpec()
Expand Down Expand Up @@ -420,39 +453,6 @@ private void configureCpu(Resource resource, Configuration conf, boolean isJM) {
conf.setDouble(configKey, resource.getCpu());
}

private static void setPodTemplate(
Pod basicPod,
Pod appendPod,
Resource resource,
Configuration effectiveConfig,
boolean isJM)
throws IOException {

// Avoid to create temporary pod template files for JobManager and TaskManager if it is not
// configured explicitly via .spec.JobManagerSpec.podTemplate or
// .spec.TaskManagerSpec.podTemplate.
final ConfigOption<String> podConfigOption =
isJM
? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
: KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;

Pod podTemplate;
if (basicPod != null || appendPod != null) {
Pod mergedPodTemplate =
mergePodTemplates(
basicPod,
appendPod,
effectiveConfig.get(
KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME));
podTemplate = applyResourceToPodTemplate(mergedPodTemplate, resource);
} else {
podTemplate = applyResourceToPodTemplate(null, resource);
}
if (podTemplate != null) {
effectiveConfig.setString(podConfigOption, createTempFile(podTemplate));
}
}

@VisibleForTesting
protected static Pod applyResourceToPodTemplate(Pod podTemplate, Resource resource) {
if (resource == null
Expand Down Expand Up @@ -542,9 +542,6 @@ private static String createTempFile(Pod podTemplate) throws IOException {
}

protected static void cleanupTmpFiles(Configuration configuration) {
configuration
.getOptional(KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE)
.ifPresent(FlinkConfigBuilder::deleteSilentlyIfGenerated);
configuration
.getOptional(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)
.ifPresent(FlinkConfigBuilder::deleteSilentlyIfGenerated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Allowed max time between spec update and reconciliation for canary resources.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> OPERATOR_JM_STARTUP_PROBE_ENABLED =
operatorConfig("jm-deployment.startup.probe.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Enable job manager startup probe to allow detecting when the jobmanager could not submit the job.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> POD_TEMPLATE_MERGE_BY_NAME =
operatorConfig("pod-template.merge-arrays-by-name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.Preconditions;

Expand All @@ -40,15 +42,21 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.HTTPGetAction;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.Probe;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -69,9 +77,9 @@ public class FlinkUtils {

public static Pod mergePodTemplates(Pod toPod, Pod fromPod, boolean mergeArraysByName) {
if (fromPod == null) {
return toPod;
return ReconciliationUtils.clone(toPod);
} else if (toPod == null) {
return fromPod;
return ReconciliationUtils.clone(fromPod);
}
JsonNode node1 = MAPPER.valueToTree(toPod);
JsonNode node2 = MAPPER.valueToTree(fromPod);
Expand Down Expand Up @@ -150,6 +158,49 @@ private static Map<String, ObjectNode> groupByName(ArrayNode node) {
return out;
}

public static void addStartupProbe(Pod pod) {
var spec = pod.getSpec();
if (spec == null) {
spec = new PodSpec();
pod.setSpec(spec);
}

var containers = spec.getContainers();
if (containers == null) {
containers = new ArrayList<>();
spec.setContainers(containers);
}

var mainContainer =
containers.stream()
.filter(c -> Constants.MAIN_CONTAINER_NAME.equals(c.getName()))
.findAny()
.orElseGet(
() -> {
var c = new Container();
c.setName(Constants.MAIN_CONTAINER_NAME);
var containersCopy =
new ArrayList<>(pod.getSpec().getContainers());
containersCopy.add(c);
pod.getSpec().setContainers(containersCopy);
return c;
});

if (mainContainer.getStartupProbe() == null) {
var probe = new Probe();
probe.setFailureThreshold(Integer.MAX_VALUE);
probe.setPeriodSeconds(1);

var configGet = new HTTPGetAction();
configGet.setPath(
DashboardConfigurationHeaders.getInstance().getTargetRestEndpointURL());
configGet.setPort(new IntOrString(Constants.REST_PORT_NAME));
probe.setHttpGet(configGet);

mainContainer.setStartupProbe(probe);
}
}

public static void deleteZookeeperHAMetadata(Configuration conf) {
try (var curator = ZooKeeperUtils.startCuratorFramework(conf, exception -> {})) {
try {
Expand Down
Loading

0 comments on commit 5ebfc5e

Please sign in to comment.