Skip to content

Commit

Permalink
[Feature][Seatunnel-web]Add support to configure placeholder with def…
Browse files Browse the repository at this point in the history
…ault value in the job config. (#208)
  • Loading branch information
arshadmohammad authored Sep 9, 2024
1 parent 94d8a39 commit 70e526a
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
@AllArgsConstructor
// Job execution parameters
public class JobExecParam {
// job name -> key -> value
private Map<String, String> env;
// task name -> key -> value
private Map<String, Map<String, String>> tasks;
// job config placeholder name -> value
private Map<String, String> placeholderValues;
// task name -> new datasource id
private Map<String, String> datasource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam ex
return Result.success(executeResource.getJobInstanceId());
} catch (RuntimeException e) {
Result<Long> failure =
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
Result.failure(SeatunnelErrorEnum.JOB_EXEC_SUBMISSION_ERROR, e.getMessage());
// Even though job execution submission failed, we still need to return the
// jobInstanceId to the user
// as the job instance has been created in the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public String generateJobConfig(
BusinessMode businessMode =
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
envConfig = JobUtils.updateEnvConfig(executeParam, envConfig);
JobUtils.updateDataSource(executeParam, tasks);

Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -230,8 +229,6 @@ public String generateJobConfig(
ParsingMode.SHARDING.name()));
}

config =
JobUtils.updateTaskConfig(executeParam, config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
Expand All @@ -240,9 +237,6 @@ public String generateJobConfig(
businessMode,
config,
optionRule);
mergeConfig =
JobUtils.updateQueryTaskConfig(
executeParam, mergeConfig, task.getName());
sourceMap
.get(task.getConnectorType())
.add(filterEmptyValue(mergeConfig));
Expand Down Expand Up @@ -272,9 +266,6 @@ public String generateJobConfig(
}
List<TableSchemaReq> inputSchemas = findInputSchemas(tasks, lines, task);
Config transformConfig = buildTransformConfig(task, config, inputSchemas);
transformConfig =
JobUtils.updateTaskConfig(
executeParam, transformConfig, task.getName());
transformMap
.get(task.getConnectorType())
.add(filterEmptyValue(transformConfig));
Expand All @@ -289,8 +280,6 @@ public String generateJobConfig(
if (!sinkMap.containsKey(task.getConnectorType())) {
sinkMap.put(task.getConnectorType(), new ArrayList<>());
}
config =
JobUtils.updateTaskConfig(executeParam, config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
Expand Down Expand Up @@ -341,7 +330,8 @@ public String generateJobConfig(
.setJson(false)
.setComments(false)
.setOriginComments(false));
return SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks);
String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks);
return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
*/
package org.apache.seatunnel.app.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class JobUtils {

// The maximum length of the job execution error message, 4KB
private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;
private static final Pattern placeholderPattern = Pattern.compile("\\$\\{(\\w+)(?::(.*?))?\\}");

public static String getJobInstanceErrorMessage(String message) {
if (message == null) {
Expand All @@ -40,46 +43,6 @@ public static String getJobInstanceErrorMessage(String message) {
: message;
}

public static Config updateEnvConfig(JobExecParam jobExecParam, Config envConfig) {
if (jobExecParam == null || jobExecParam.getEnv() == null) {
return envConfig;
}
return updateConfig(envConfig, jobExecParam.getEnv());
}

private static Config updateConfig(Config config, Map<String, String> properties) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
config =
config.withValue(
entry.getKey(), ConfigValueFactory.fromAnyRef(entry.getValue()));
}
return config;
}

public static Config updateTaskConfig(
JobExecParam jobExecParam, Config taskConfig, String taskName) {
if (jobExecParam == null
|| jobExecParam.getTasks() == null
|| jobExecParam.getTasks().get(taskName) == null) {
return taskConfig;
}
return updateConfig(taskConfig, jobExecParam.getTasks().get(taskName));
}

public static Config updateQueryTaskConfig(
JobExecParam jobExecParam, Config taskConfig, String taskName) {
if (jobExecParam == null
|| jobExecParam.getTasks() == null
|| jobExecParam.getTasks().get(taskName) == null) {
return taskConfig;
}
String query = jobExecParam.getTasks().get(taskName).get("query");
if (query != null) {
return taskConfig.withValue("query", ConfigValueFactory.fromAnyRef(query));
}
return taskConfig;
}

public static void updateDataSource(JobExecParam jobExecParam, List<JobTask> tasks) {
if (jobExecParam == null || jobExecParam.getDatasource() == null) {
return;
Expand All @@ -104,4 +67,29 @@ public static boolean isJobEndStatus(JobStatus jobStatus) {
|| JobStatus.CANCELED == jobStatus
|| JobStatus.FAILED == jobStatus;
}

// Replace placeholders in job config with actual values
public static String replaceJobConfigPlaceholders(
String jobConfigString, JobExecParam jobExecParam) {
Map<String, String> placeholderValues =
(jobExecParam != null && jobExecParam.getPlaceholderValues() != null)
? jobExecParam.getPlaceholderValues()
: Collections.emptyMap();

Matcher matcher = placeholderPattern.matcher(jobConfigString);
StringBuffer result = new StringBuffer();

while (matcher.find()) {
String placeholderName = matcher.group(1);
String replacement = placeholderValues.getOrDefault(placeholderName, matcher.group(2));
if (replacement == null) {
throw new SeatunnelException(
SeatunnelErrorEnum.JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER, placeholderName);
}
matcher.appendReplacement(result, replacement);
}

matcher.appendTail(result);
return result.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.app.utils;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.server.common.SeatunnelException;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class JobUtilsTests {

@Test
public void testReplaceJobConfigPlaceholders_AllJobConfigPlaceholdersReplaced() {
String jobConfigContent =
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
paramValues.put("jobNameParam", "newJob");
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testReplaceJobConfigPlaceholders_JobConfig_PlaceholdersRepeat() {
String jobConfigContent =
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobModeParam}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=STREAMING";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testReplaceJobConfigPlaceholdersUsed() {
String jobConfigContent =
"job.mode=${jobModeParam:BATCH}\ncheckpoint.interval=30\njob.name=${jobNameParam:DefaultJob}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=DefaultJob";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testReplaceJobConfigPlaceholders_NoDefaultValueThrowsException() {
String jobConfigContent =
"job.mode=${jobModeParam}\ncheckpoint.interval=30\njob.name=${jobNameParam}";
Map<String, String> paramValues = new HashMap<>();
paramValues.put("jobModeParam", "STREAMING");
JobExecParam jobExecParam = getJobExecParam(paramValues);

assertThrows(
SeatunnelException.class,
() -> {
JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);
});
}

@Test
public void testReplaceJobConfigPlaceholders_NoJobConfigPlaceholders() {
String jobConfigContent = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
Map<String, String> paramValues = new HashMap<>();
JobExecParam jobExecParam = getJobExecParam(paramValues);

String expected = "job.mode=STREAMING\ncheckpoint.interval=30\njob.name=newJob";
String actual = JobUtils.replaceJobConfigPlaceholders(jobConfigContent, jobExecParam);

assertEquals(expected, actual);
}

@Test
public void testParseConfigWithPlaceHolders() {
String transformConfig =
"{\"log.print.data\":\"true\",\"log.print.delay.ms\":\"${logPrintDelayMs:100}\"}";
Config config = ConfigFactory.parseString(transformConfig);
assertNotNull(config);
}

private static @NotNull JobExecParam getJobExecParam(Map<String, String> paramValues) {
JobExecParam jobExecParam = new JobExecParam();
jobExecParam.setPlaceholderValues(paramValues);
return jobExecParam;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ public enum SeatunnelErrorEnum {
"load job state from engine error",
"load job statue from engine [%s] error, error msg is [%s]"),
UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"),
JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
JOB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),
LOAD_ENGINE_METRICS_ERROR(
40005, "load engine metrics error", "load engine metrics error. error msg is [%s]"),
JOB_NO_VALUE_FOUND_FOR_PLACEHOLDER(
40006, "No value found for placeholder", "No value found for placeholder: [%s]"),

JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"),
/* datasource and virtual table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@

import com.fasterxml.jackson.core.type.TypeReference;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class JobControllerWrapper extends SeatunnelWebTestingBase {

public Result<Long> createJob(JobCreateReq jobCreateRequest) {
Expand All @@ -48,15 +44,4 @@ public Result<JobRes> getJob(long jobVersionId) {
String response = sendRequest(urlWithParam("job/get/" + jobVersionId + "?"), null, "GET");
return JSONTestUtils.parseObject(response, new TypeReference<Result<JobRes>>() {});
}

public JobCreateReq populateJobCreateReqFromFile() {
String filePath = "src/test/resources/jobs/fake_source_console_job.json";
String jsonContent;
try {
jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
} catch (IOException e) {
throw new RuntimeException(e);
}
return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
}
}
Loading

0 comments on commit 70e526a

Please sign in to comment.