Skip to content

Commit

Permalink
Merge pull request WeBankFinTech#160 from FinalTarget/dev-1.0.0
Browse files Browse the repository at this point in the history
Modify the synchronization history function
  • Loading branch information
Davidhua1996 authored Feb 11, 2022
2 parents 92554c5 + 03834e8 commit 02c7c7d
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.linkis.datasourcemanager.common.util.json.Json;
import org.apache.linkis.httpclient.response.Result;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import java.util.*;
Expand All @@ -36,6 +38,9 @@ public class AbstractDataSourceService {
protected final ExchangisJobParamConfigMapper exchangisJobParamConfigMapper;
protected final ExchangisJobInfoMapper exchangisJobInfoMapper;

private final static Logger LOG = LoggerFactory.getLogger(AbstractDataSourceService.class);


public AbstractDataSourceService(ExchangisDataSourceContext context, ExchangisJobParamConfigMapper exchangisJobParamConfigMapper, ExchangisJobInfoMapper exchangisJobInfoMapper) {
this.context = context;
this.exchangisJobParamConfigMapper = exchangisJobParamConfigMapper;
Expand Down Expand Up @@ -278,7 +283,13 @@ private ElementUI<?> fillElementUIValue(ExchangisJobParamConfig config, Object v
case ElementUI.INPUT:
return fillInputElementUIValue(config, String.valueOf(value));
case ElementUI.MAP:
return fillMapElementUIValue(config, (Map<String, Object>) value);
Map<String, Object> mapElement = null;
try {
mapElement = Json.fromJson(String.valueOf(value), Map.class);
} catch (Exception e) {
LOG.info("Exception happened while parse json"+ "Config value: " + value + "message: " + e.getMessage(), e);
}
return fillMapElementUIValue(config, mapElement);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,11 @@ int upgradeLaunchedJobProgress(@Param("jobExecutionId")String jobExecutionId, @P
* get All launchJob
* @return job entity list
*/
List<LaunchedExchangisJobEntity> getAllLaunchedJob(@Param("jobId") Long jobId, @Param("jobName") String jobName, @Param("status") String status, @Param("launchStartTime") Date launchStartTime, @Param("launchEndTime") Date launchEndTime);
List<LaunchedExchangisJobEntity> getAllLaunchedJob(@Param("jobExecutionId") String jobExecutionId, @Param("jobName") String jobName, @Param("status") String status, @Param("launchStartTime") Date launchStartTime, @Param("launchEndTime") Date launchEndTime, @Param("start")int start, @Param("size") int size);

/**
* delete launchJob
*/
void deleteJob(@Param("jobExecutionId")String jobExecutionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,19 @@ public interface LaunchedTaskDao {
* @return
*/
String getLaunchedTaskStatus(@Param("taskId") String taskId);

/**
* search TaskStatusList
* @param
*/

List<String> getTaskStatusList(@Param("jobExecutionId") String jobExecutionId);

/**
* delete task
* @param
*/

void deleteTask(@Param("jobExecutionId") String jobExecutionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
`exchangis_launched_job_entity` l
LEFT JOIN `exchangis_job_entity` c ON l.job_id = c.id
<where>
<if test="jobId != null">
and l.job_id = #{jobId}
<if test="jobExecutionId.trim() != '' and jobExecutionId != null">
and l.job_execution_id = #{jobExecutionId}
</if>
<!--如果name有值,则模糊匹配name-->
<if test="jobName.trim() != '' and jobName != null">
Expand All @@ -170,6 +170,13 @@
and l.create_time &lt;= #{launchEndTime}
</if>
</where>
order by create_time desc
limit ${start}, ${size}
</select>

<delete id="deleteJob">
DELETE FROM <include refid="TableName"/> WHERE
job_execution_id = #{jobExecutionId}
</delete>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,16 @@
<include refid="TableName"/>
<![CDATA[ WHERE job_execution_id = #{jobExecutionId};]]>
</select>

<select id="getTaskStatusList" parameterType="java.lang.String" resultType="java.lang.String">
select status
from
<include refid="TableName" />
where job_execution_id = #{jobExecutionId}
</select>

<delete id="deleteTask">
DELETE FROM <include refid="TableName"/> WHERE
job_execution_id = #{jobExecutionId}
</delete>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ public interface JobExecuteService {
* Gets Executed job list
* @return the launched jobList
*/
List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jobName, String status,
Long launchStartTime, Long launchEndTime, Integer current, Integer size);
List<ExchangisLaunchedJobListVO> getExecutedJobList(String jobExecutionId, String jobName, String status,
Long launchStartTime, Long launchEndTime, int current, int size);

/**
* Count int.
*
* @param jobId the job id
* @param jobExecutionId the job id
* @param jobName the job name
* @param status the status
* @param launchStartTime the launch start time
* @param launchEndTime the launch end time
* @return the int
*/
int count(Long jobId, String jobName, String status, Long launchStartTime, Long launchEndTime);
int count(String jobExecutionId, String jobName, String status, Long launchStartTime, Long launchEndTime);

/**
* Execute job
Expand All @@ -89,4 +89,8 @@ List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jobName,
*/
void killJob(String jobExecutionId);

/**
* @param jobExecutionId the job ExecutionId
*/
void deleteJob(String jobExecutionId) throws ExchangisJobServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.*;
Expand Down Expand Up @@ -226,12 +227,20 @@ public ExchangisCategoryLogVo getTaskLogInfo(String taskId, String jobExecutionI
}

@Override
public List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jobName, String status,
Long launchStartTime, Long launchEndTime, Integer current, Integer size) {
public List<ExchangisLaunchedJobListVO> getExecutedJobList(String jobExecutionId, String jobName, String status,
Long launchStartTime, Long launchEndTime, int current, int size) {
if (current <= 0) {
current = 1;
}
if (size <= 0) {
size = 10;
}
int start = (current - 1) * size;
List<ExchangisLaunchedJobListVO> jobList = new ArrayList<>();
Date startTime = launchStartTime == null ? null : new Date(launchStartTime);
Date endTime = launchEndTime == null ? null : new Date(launchEndTime);
List<LaunchedExchangisJobEntity> jobEntitylist = launchedJobDao.getAllLaunchedJob(jobId, jobName, status, startTime, endTime);
List<LaunchedExchangisJobEntity> jobEntitylist = launchedJobDao.getAllLaunchedJob(jobExecutionId, jobName, status, startTime, endTime, start, size);
LOG.info("ExecutedJobList information: " + jobExecutionId + jobName + status + launchStartTime + launchEndTime + current + size);
if(jobEntitylist != null) {
try {
for (int i = 0; i < jobEntitylist.size(); i++) {
Expand All @@ -251,7 +260,7 @@ public List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jo
int flows = 0;
int taskNum = launchedExchangisTaskEntities.size();
for (int j = 0; j < taskNum; j++) {
if(launchedExchangisTaskEntities.get(j).getMetricsMap() == null){
if(launchedExchangisTaskEntities.get(j).getMetricsMap() == null || launchedExchangisTaskEntities.get(j).getMetricsMap().get("traffic") == null){
flows += 0;
continue;
}
Expand All @@ -270,7 +279,7 @@ public List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jo
}

@Override
public int count(Long jobId, String jobName, String status, Long launchStartTime, Long launchEndTime) {
public int count(String jobExecutionId, String jobName, String status, Long launchStartTime, Long launchEndTime) {
Date startTime = launchStartTime == null ? null : new Date(launchStartTime);
Date endTime = launchEndTime == null ? null : new Date(launchEndTime);
return 100;
Expand Down Expand Up @@ -325,9 +334,17 @@ private ExchangisCategoryLogVo resultToCategoryLog(LogResult logResult, TaskStat
return categoryLogVo;
}

/* public static void main(String[] args) {
ExchangisJobParamsContent.ExchangisJobParamsItem jobParamsContent = new ExchangisJobParamsContent.ExchangisJobParamsItem();
jobParamsContent = Json.fromJson("", jobParamsContent.getClass());
System.out.println(jobParamsContent.getConfigValue());
}*/
@Transactional
@Override
public void deleteJob(String jobExecutionId) throws ExchangisJobServerException {
List<String> taskStatusList = launchedTaskDao.getTaskStatusList(jobExecutionId);
if(taskStatusList.contains("Inited") || taskStatusList.contains("Scheduled") || taskStatusList.contains("Running") || taskStatusList.contains("WaitForRetry")){
throw new ExchangisJobServerException(JOB_EXCEPTION_CODE.getCode(), "不能删除该作业");
}else {
launchedTaskDao.deleteTask(jobExecutionId);
launchedJobDao.deleteJob(jobExecutionId);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ public Message ExecutedJobKill(@PathVariable(value = "jobExecutionId") String jo
}

@RequestMapping(value = "/listJobs", method = RequestMethod.GET)
public Message listJobs(@RequestParam(value = "jobId", required = false) Long jobId,
public Message listJobs(@RequestParam(value = "jobExecutionId", required = false) String jobExecutionId,
@RequestParam(value = "jobName", required = false) String jobName,
@RequestParam(value = "status", required = false) String status,
@RequestParam(value = "launchStartTime", required = false) Long launchStartTime,
@RequestParam(value = "launchEndTime", required = false) Long launchEndTime,
@RequestParam(value = "current", required = false) Integer current,
@RequestParam(value = "size", required = false) Integer size) {
List<ExchangisLaunchedJobListVO> jobList = executeService.getExecutedJobList(jobId, jobName, status,
@RequestParam(value = "current", required = false) int current,
@RequestParam(value = "size", required = false) int size) {
List<ExchangisLaunchedJobListVO> jobList = executeService.getExecutedJobList(jobExecutionId, jobName, status,
launchStartTime, launchEndTime, current, size);
int total = executeService.count(jobId, jobName, status, launchStartTime, launchEndTime);
int total = executeService.count(jobExecutionId, jobName, status, launchStartTime, launchEndTime);
Message message = Message.ok("Submitted succeed(提交成功)!");
message.setMethod("/api/rest_j/v1/exchangis/job/execution/listJobs");
message.data("jobList", jobList);
Expand Down Expand Up @@ -187,4 +187,14 @@ public Message listJobs(@RequestParam(value = "dataSourceType", required = false
private boolean hasAuthority(String username, ExchangisJobInfo jobInfo){
return username.equals(jobInfo.getCreateUser());
}

@RequestMapping( value = "/{jobExecutionId}/deleteJob", method = RequestMethod.POST)
public Message ExecutedJobDelete(@PathVariable(value = "jobExecutionId") String jobExecutionId) throws ExchangisJobServerException {
//ExchangisLaunchedJobEntity jobAndTaskStatus = exchangisExecutionService.getExecutedJobAndTaskStatus(jobExecutionId);
executeService.deleteJob(jobExecutionId);
Message message = Message.ok("Kill succeed(停止成功)!");
message.setMethod("/api/rest_j/v1/exchangis/job/" + jobExecutionId + "/deleteJob");
message.data("jobExecutionId", jobExecutionId);
return message;
}
}

0 comments on commit 02c7c7d

Please sign in to comment.