diff --git a/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/AbstractDataSourceService.java b/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/AbstractDataSourceService.java index 34c1387d9..5e3f18bd2 100644 --- a/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/AbstractDataSourceService.java +++ b/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/AbstractDataSourceService.java @@ -25,6 +25,8 @@ import org.apache.linkis.datasourcemanager.common.util.json.Json; import org.apache.linkis.httpclient.response.Result; import org.apache.linkis.server.security.SecurityFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.HttpServletRequest; import java.util.*; @@ -36,6 +38,9 @@ public class AbstractDataSourceService { protected final ExchangisJobParamConfigMapper exchangisJobParamConfigMapper; protected final ExchangisJobInfoMapper exchangisJobInfoMapper; + private final static Logger LOG = LoggerFactory.getLogger(AbstractDataSourceService.class); + + public AbstractDataSourceService(ExchangisDataSourceContext context, ExchangisJobParamConfigMapper exchangisJobParamConfigMapper, ExchangisJobInfoMapper exchangisJobInfoMapper) { this.context = context; this.exchangisJobParamConfigMapper = exchangisJobParamConfigMapper; @@ -278,7 +283,13 @@ private ElementUI fillElementUIValue(ExchangisJobParamConfig config, Object v case ElementUI.INPUT: return fillInputElementUIValue(config, String.valueOf(value)); case ElementUI.MAP: - return fillMapElementUIValue(config, (Map) value); + Map mapElement = null; + try { + mapElement = Json.fromJson(String.valueOf(value), Map.class); + } catch (Exception e) { + LOG.info("Exception happened while parse json"+ "Config value: " + value + "message: " + e.getMessage(), e); + } + return fillMapElementUIValue(config, mapElement); default: return null; } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedJobDao.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedJobDao.java index ec2e8474d..fd8ef87fd 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedJobDao.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedJobDao.java @@ -81,5 +81,11 @@ int upgradeLaunchedJobProgress(@Param("jobExecutionId")String jobExecutionId, @P * get All launchJob * @return job entity list */ - List getAllLaunchedJob(@Param("jobId") Long jobId, @Param("jobName") String jobName, @Param("status") String status, @Param("launchStartTime") Date launchStartTime, @Param("launchEndTime") Date launchEndTime); + List getAllLaunchedJob(@Param("jobExecutionId") String jobExecutionId, @Param("jobName") String jobName, @Param("status") String status, @Param("launchStartTime") Date launchStartTime, @Param("launchEndTime") Date launchEndTime, @Param("start")int start, @Param("size") int size); + + /** + * delete launchJob + */ + void deleteJob(@Param("jobExecutionId")String jobExecutionId); + } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedTaskDao.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedTaskDao.java index 2115db73d..3128b5421 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedTaskDao.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/dao/LaunchedTaskDao.java @@ -118,4 +118,19 @@ public interface LaunchedTaskDao { * @return */ String getLaunchedTaskStatus(@Param("taskId") String taskId); + + /** + * search TaskStatusList + * @param + */ + + List getTaskStatusList(@Param("jobExecutionId") String jobExecutionId); + + /** + * delete task + * @param + */ + + void deleteTask(@Param("jobExecutionId") String jobExecutionId); + } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedJobMapper.xml b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedJobMapper.xml index 658114c6a..fadc98f7c 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedJobMapper.xml +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedJobMapper.xml @@ -153,8 +153,8 @@ `exchangis_launched_job_entity` l LEFT JOIN `exchangis_job_entity` c ON l.job_id = c.id - - and l.job_id = #{jobId} + + and l.job_execution_id = #{jobExecutionId} @@ -170,6 +170,13 @@ and l.create_time <= #{launchEndTime} + order by create_time desc + limit ${start}, ${size} + + DELETE FROM WHERE + job_execution_id = #{jobExecutionId} + + diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedTaskMapper.xml b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedTaskMapper.xml index 0c4e41c6e..af039b978 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedTaskMapper.xml +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/mapper/impl/LaunchedTaskMapper.xml @@ -211,4 +211,16 @@ + + + + + DELETE FROM WHERE + job_execution_id = #{jobExecutionId} + diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/JobExecuteService.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/JobExecuteService.java index 88664fdc1..7123a9e2a 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/JobExecuteService.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/JobExecuteService.java @@ -60,20 +60,20 @@ public interface JobExecuteService { * Gets Executed job list * @return the launched jobList */ - List getExecutedJobList(Long jobId, String jobName, String status, - Long launchStartTime, Long launchEndTime, Integer current, Integer size); + List getExecutedJobList(String jobExecutionId, String jobName, String status, + Long launchStartTime, Long launchEndTime, int current, int size); /** * Count int. * - * @param jobId the job id + * @param jobExecutionId the job id * @param jobName the job name * @param status the status * @param launchStartTime the launch start time * @param launchEndTime the launch end time * @return the int */ - int count(Long jobId, String jobName, String status, Long launchStartTime, Long launchEndTime); + int count(String jobExecutionId, String jobName, String status, Long launchStartTime, Long launchEndTime); /** * Execute job @@ -89,4 +89,8 @@ List getExecutedJobList(Long jobId, String jobName, */ void killJob(String jobExecutionId); + /** + * @param jobExecutionId the job ExecutionId + */ + void deleteJob(String jobExecutionId) throws ExchangisJobServerException; } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/impl/DefaultJobExecuteService.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/impl/DefaultJobExecuteService.java index 292bfd98f..99585864a 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/impl/DefaultJobExecuteService.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/service/impl/DefaultJobExecuteService.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.*; @@ -226,12 +227,20 @@ public ExchangisCategoryLogVo getTaskLogInfo(String taskId, String jobExecutionI } @Override - public List getExecutedJobList(Long jobId, String jobName, String status, - Long launchStartTime, Long launchEndTime, Integer current, Integer size) { + public List getExecutedJobList(String jobExecutionId, String jobName, String status, + Long launchStartTime, Long launchEndTime, int current, int size) { + if (current <= 0) { + current = 1; + } + if (size <= 0) { + size = 10; + } + int start = (current - 1) * size; List jobList = new ArrayList<>(); Date startTime = launchStartTime == null ? null : new Date(launchStartTime); Date endTime = launchEndTime == null ? null : new Date(launchEndTime); - List jobEntitylist = launchedJobDao.getAllLaunchedJob(jobId, jobName, status, startTime, endTime); + List jobEntitylist = launchedJobDao.getAllLaunchedJob(jobExecutionId, jobName, status, startTime, endTime, start, size); + LOG.info("ExecutedJobList information: " + jobExecutionId + jobName + status + launchStartTime + launchEndTime + current + size); if(jobEntitylist != null) { try { for (int i = 0; i < jobEntitylist.size(); i++) { @@ -251,7 +260,7 @@ public List getExecutedJobList(Long jobId, String jo int flows = 0; int taskNum = launchedExchangisTaskEntities.size(); for (int j = 0; j < taskNum; j++) { - if(launchedExchangisTaskEntities.get(j).getMetricsMap() == null){ + if(launchedExchangisTaskEntities.get(j).getMetricsMap() == null || launchedExchangisTaskEntities.get(j).getMetricsMap().get("traffic") == null){ flows += 0; continue; } @@ -270,7 +279,7 @@ public List getExecutedJobList(Long jobId, String jo } @Override - public int count(Long jobId, String jobName, String status, Long launchStartTime, Long launchEndTime) { + public int count(String jobExecutionId, String jobName, String status, Long launchStartTime, Long launchEndTime) { Date startTime = launchStartTime == null ? null : new Date(launchStartTime); Date endTime = launchEndTime == null ? null : new Date(launchEndTime); return 100; @@ -325,9 +334,17 @@ private ExchangisCategoryLogVo resultToCategoryLog(LogResult logResult, TaskStat return categoryLogVo; } - /* public static void main(String[] args) { - ExchangisJobParamsContent.ExchangisJobParamsItem jobParamsContent = new ExchangisJobParamsContent.ExchangisJobParamsItem(); - jobParamsContent = Json.fromJson("", jobParamsContent.getClass()); - System.out.println(jobParamsContent.getConfigValue()); - }*/ + @Transactional + @Override + public void deleteJob(String jobExecutionId) throws ExchangisJobServerException { + List taskStatusList = launchedTaskDao.getTaskStatusList(jobExecutionId); + if(taskStatusList.contains("Inited") || taskStatusList.contains("Scheduled") || taskStatusList.contains("Running") || taskStatusList.contains("WaitForRetry")){ + throw new ExchangisJobServerException(JOB_EXCEPTION_CODE.getCode(), "不能删除该作业"); + }else { + launchedTaskDao.deleteTask(jobExecutionId); + launchedJobDao.deleteJob(jobExecutionId); + } + } + + } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/web/ExchangisJobExecuteController.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/web/ExchangisJobExecuteController.java index d9a795871..540961cd8 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/web/ExchangisJobExecuteController.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/web/ExchangisJobExecuteController.java @@ -139,16 +139,16 @@ public Message ExecutedJobKill(@PathVariable(value = "jobExecutionId") String jo } @RequestMapping(value = "/listJobs", method = RequestMethod.GET) - public Message listJobs(@RequestParam(value = "jobId", required = false) Long jobId, + public Message listJobs(@RequestParam(value = "jobExecutionId", required = false) String jobExecutionId, @RequestParam(value = "jobName", required = false) String jobName, @RequestParam(value = "status", required = false) String status, @RequestParam(value = "launchStartTime", required = false) Long launchStartTime, @RequestParam(value = "launchEndTime", required = false) Long launchEndTime, - @RequestParam(value = "current", required = false) Integer current, - @RequestParam(value = "size", required = false) Integer size) { - List jobList = executeService.getExecutedJobList(jobId, jobName, status, + @RequestParam(value = "current", required = false) int current, + @RequestParam(value = "size", required = false) int size) { + List jobList = executeService.getExecutedJobList(jobExecutionId, jobName, status, launchStartTime, launchEndTime, current, size); - int total = executeService.count(jobId, jobName, status, launchStartTime, launchEndTime); + int total = executeService.count(jobExecutionId, jobName, status, launchStartTime, launchEndTime); Message message = Message.ok("Submitted succeed(提交成功)!"); message.setMethod("/api/rest_j/v1/exchangis/job/execution/listJobs"); message.data("jobList", jobList); @@ -187,4 +187,14 @@ public Message listJobs(@RequestParam(value = "dataSourceType", required = false private boolean hasAuthority(String username, ExchangisJobInfo jobInfo){ return username.equals(jobInfo.getCreateUser()); } + + @RequestMapping( value = "/{jobExecutionId}/deleteJob", method = RequestMethod.POST) + public Message ExecutedJobDelete(@PathVariable(value = "jobExecutionId") String jobExecutionId) throws ExchangisJobServerException { + //ExchangisLaunchedJobEntity jobAndTaskStatus = exchangisExecutionService.getExecutedJobAndTaskStatus(jobExecutionId); + executeService.deleteJob(jobExecutionId); + Message message = Message.ok("Kill succeed(停止成功)!"); + message.setMethod("/api/rest_j/v1/exchangis/job/" + jobExecutionId + "/deleteJob"); + message.data("jobExecutionId", jobExecutionId); + return message; + } }