Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] [Seatunnel-web] Improve code in DatasourceServiceImpl,SeaTunnelEngineMetricsExtractor etc. #199

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -570,17 +570,7 @@ private List<DatasourceDetailRes> convertDatasourceDetailRes(List<Datasource> da

datasourceList.forEach(
datasource -> {
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
datasourceDetailRes.setPluginName(datasource.getPluginName());
datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
datasourceDetailRes.setDescription(datasource.getDescription());
datasourceDetailRes.setCreateTime(datasource.getCreateTime());
datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());
Map<String, String> config = JsonUtils.toMap(datasource.getDatasourceConfig());
datasourceDetailRes.setDatasourceConfig(config);
datasourceDetailResList.add(datasourceDetailRes);
datasourceDetailResList.add(getDatasourceDetailRes(datasource));
});
return datasourceDetailResList;
}
Expand All @@ -599,6 +589,10 @@ public DatasourceDetailRes queryDatasourceDetailByDatasourceName(String datasour
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceName);
}
return getDatasourceDetailRes(datasource);
}

private static DatasourceDetailRes getDatasourceDetailRes(Datasource datasource) {
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
Expand All @@ -612,7 +606,6 @@ public DatasourceDetailRes queryDatasourceDetailByDatasourceName(String datasour
JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class);
// convert option rule
datasourceDetailRes.setDatasourceConfig(datasourceConfig);

return datasourceDetailRes;
}

Expand All @@ -624,21 +617,7 @@ public DatasourceDetailRes queryDatasourceDetailById(String datasourceId) {
if (null == datasource) {
throw new SeatunnelException(SeatunnelErrorEnum.DATASOURCE_NOT_FOUND, datasourceId);
}
DatasourceDetailRes datasourceDetailRes = new DatasourceDetailRes();
datasourceDetailRes.setId(datasource.getId().toString());
datasourceDetailRes.setDatasourceName(datasource.getDatasourceName());
datasourceDetailRes.setPluginName(datasource.getPluginName());
datasourceDetailRes.setPluginVersion(datasource.getPluginVersion());
datasourceDetailRes.setDescription(datasource.getDescription());
datasourceDetailRes.setCreateTime(datasource.getCreateTime());
datasourceDetailRes.setUpdateTime(datasource.getUpdateTime());

Map<String, String> datasourceConfig =
JsonUtils.toMap(datasource.getDatasourceConfig(), String.class, String.class);
// convert option rule
datasourceDetailRes.setDatasourceConfig(datasourceConfig);

return datasourceDetailRes;
return getDatasourceDetailRes(datasource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.app.utils.JobUtils;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
Expand Down Expand Up @@ -130,8 +130,7 @@ private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInst
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setEndTime(new Date());
String jobInstanceErrorMessage =
JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage());
String jobInstanceErrorMessage = JobUtils.getJobInstanceErrorMessage(e.getMessage());
jobInstance.setErrorMessage(jobInstanceErrorMessage);
jobInstanceDao.update(jobInstance);
throw new RuntimeException(e.getMessage(), e);
Expand Down Expand Up @@ -178,7 +177,6 @@ public void waitJobFinish(

private SeaTunnelClient createSeaTunnelClient() {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(clientConfig.getClusterName());
return new SeaTunnelClient(clientConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
import org.apache.seatunnel.app.domain.request.job.transform.Transform;
import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
import org.apache.seatunnel.app.domain.response.datasource.DatasourceDetailRes;
import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
Expand All @@ -59,7 +60,7 @@
import org.apache.seatunnel.app.service.IVirtualTableService;
import org.apache.seatunnel.app.thirdparty.datasource.DataSourceConfigSwitcherUtils;
import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcherUtils;
import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.app.utils.JobUtils;
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
Expand Down Expand Up @@ -173,8 +174,8 @@ public String generateJobConfig(
BusinessMode businessMode =
BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
envConfig = JobExecParamUtil.updateEnvConfig(executeParam, envConfig);
JobExecParamUtil.updateDataSource(executeParam, tasks);
envConfig = JobUtils.updateEnvConfig(executeParam, envConfig);
JobUtils.updateDataSource(executeParam, tasks);

Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
Map<String, List<Config>> transformMap = new LinkedHashMap<>();
Expand Down Expand Up @@ -230,8 +231,7 @@ public String generateJobConfig(
}

config =
JobExecParamUtil.updateTaskConfig(
executeParam, config, task.getName());
JobUtils.updateTaskConfig(executeParam, config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
Expand All @@ -241,7 +241,7 @@ public String generateJobConfig(
config,
optionRule);
mergeConfig =
JobExecParamUtil.updateQueryTaskConfig(
JobUtils.updateQueryTaskConfig(
executeParam, mergeConfig, task.getName());
sourceMap
.get(task.getConnectorType())
Expand Down Expand Up @@ -273,7 +273,7 @@ public String generateJobConfig(
List<TableSchemaReq> inputSchemas = findInputSchemas(tasks, lines, task);
Config transformConfig = buildTransformConfig(task, config, inputSchemas);
transformConfig =
JobExecParamUtil.updateTaskConfig(
JobUtils.updateTaskConfig(
executeParam, transformConfig, task.getName());
transformMap
.get(task.getConnectorType())
Expand All @@ -290,8 +290,7 @@ public String generateJobConfig(
sinkMap.put(task.getConnectorType(), new ArrayList<>());
}
config =
JobExecParamUtil.updateTaskConfig(
executeParam, config, task.getName());
JobUtils.updateTaskConfig(executeParam, config, task.getName());
Config mergeConfig =
mergeTaskConfig(
task,
Expand Down Expand Up @@ -370,8 +369,7 @@ public void complete(
jobInstance.setJobStatus(jobResult.getStatus().name());
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobInstance.setErrorMessage(
JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError()));
jobInstance.setErrorMessage(JobUtils.getJobInstanceErrorMessage(jobResult.getError()));
jobInstanceDao.update(jobInstance);
}

Expand Down Expand Up @@ -436,16 +434,14 @@ private Config mergeTaskConfig(
throws JsonProcessingException {

Long datasourceInstanceId = task.getDataSourceId();
String pluginName =
datasourceService
.queryDatasourceDetailById(datasourceInstanceId.toString())
.getPluginName();
DatasourceDetailRes datasourceDetailRes =
datasourceService.queryDatasourceDetailById(datasourceInstanceId.toString());
String pluginName = datasourceDetailRes.getPluginName();
Config datasourceConf =
parseConfigWithOptionRule(
pluginType,
connectorType,
datasourceService.queryDatasourceConfigById(
datasourceInstanceId.toString()),
datasourceDetailRes.getDatasourceConfig(),
optionRule);

DataSourceOption dataSourceOption = null;
Expand Down
Loading
Loading