Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Modify the synchronization history function #160

Merged
merged 3 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.linkis.datasourcemanager.common.util.json.Json;
import org.apache.linkis.httpclient.response.Result;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import java.util.*;
Expand All @@ -36,6 +38,9 @@ public class AbstractDataSourceService {
protected final ExchangisJobParamConfigMapper exchangisJobParamConfigMapper;
protected final ExchangisJobInfoMapper exchangisJobInfoMapper;

private final static Logger LOG = LoggerFactory.getLogger(AbstractDataSourceService.class);


public AbstractDataSourceService(ExchangisDataSourceContext context, ExchangisJobParamConfigMapper exchangisJobParamConfigMapper, ExchangisJobInfoMapper exchangisJobInfoMapper) {
this.context = context;
this.exchangisJobParamConfigMapper = exchangisJobParamConfigMapper;
Expand Down Expand Up @@ -278,7 +283,13 @@ private ElementUI<?> fillElementUIValue(ExchangisJobParamConfig config, Object v
case ElementUI.INPUT:
return fillInputElementUIValue(config, String.valueOf(value));
case ElementUI.MAP:
return fillMapElementUIValue(config, (Map<String, Object>) value);
Map<String, Object> mapElement = null;
try {
mapElement = Json.fromJson(String.valueOf(value), Map.class);
} catch (Exception e) {
LOG.info("Exception happened while parse json"+ "Config value: " + value + "message: " + e.getMessage(), e);
}
return fillMapElementUIValue(config, mapElement);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,11 @@ int upgradeLaunchedJobProgress(@Param("jobExecutionId")String jobExecutionId, @P
* get All launchJob
* @return job entity list
*/
List<LaunchedExchangisJobEntity> getAllLaunchedJob(@Param("jobId") Long jobId, @Param("jobName") String jobName, @Param("status") String status, @Param("launchStartTime") Date launchStartTime, @Param("launchEndTime") Date launchEndTime);
List<LaunchedExchangisJobEntity> getAllLaunchedJob(@Param("jobExecutionId") String jobExecutionId, @Param("jobName") String jobName, @Param("status") String status, @Param("launchStartTime") Date launchStartTime, @Param("launchEndTime") Date launchEndTime, @Param("start")int start, @Param("size") int size);

/**
* delete launchJob
*/
void deleteJob(@Param("jobExecutionId")String jobExecutionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,19 @@ public interface LaunchedTaskDao {
* @return
*/
String getLaunchedTaskStatus(@Param("taskId") String taskId);

/**
* search TaskStatusList
* @param
*/

List<String> getTaskStatusList(@Param("jobExecutionId") String jobExecutionId);

/**
* delete task
* @param
*/

void deleteTask(@Param("jobExecutionId") String jobExecutionId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
`exchangis_launched_job_entity` l
LEFT JOIN `exchangis_job_entity` c ON l.job_id = c.id
<where>
<if test="jobId != null">
and l.job_id = #{jobId}
<if test="jobExecutionId.trim() != '' and jobExecutionId != null">
and l.job_execution_id = #{jobExecutionId}
</if>
<!--如果name有值,则模糊匹配name-->
<if test="jobName.trim() != '' and jobName != null">
Expand All @@ -170,6 +170,13 @@
and l.create_time &lt;= #{launchEndTime}
</if>
</where>
order by create_time desc
limit ${start}, ${size}
</select>

<delete id="deleteJob">
DELETE FROM <include refid="TableName"/> WHERE
job_execution_id = #{jobExecutionId}
</delete>

</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,16 @@
<include refid="TableName"/>
<![CDATA[ WHERE job_execution_id = #{jobExecutionId};]]>
</select>

<select id="getTaskStatusList" parameterType="java.lang.String" resultType="java.lang.String">
select status
from
<include refid="TableName" />
where job_execution_id = #{jobExecutionId}
</select>

<delete id="deleteTask">
DELETE FROM <include refid="TableName"/> WHERE
job_execution_id = #{jobExecutionId}
</delete>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ public interface JobExecuteService {
* Gets Executed job list
* @return the launched jobList
*/
List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jobName, String status,
Long launchStartTime, Long launchEndTime, Integer current, Integer size);
List<ExchangisLaunchedJobListVO> getExecutedJobList(String jobExecutionId, String jobName, String status,
Long launchStartTime, Long launchEndTime, int current, int size);

/**
* Count int.
*
* @param jobId the job id
* @param jobExecutionId the job id
* @param jobName the job name
* @param status the status
* @param launchStartTime the launch start time
* @param launchEndTime the launch end time
* @return the int
*/
int count(Long jobId, String jobName, String status, Long launchStartTime, Long launchEndTime);
int count(String jobExecutionId, String jobName, String status, Long launchStartTime, Long launchEndTime);

/**
* Execute job
Expand All @@ -89,4 +89,8 @@ List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jobName,
*/
void killJob(String jobExecutionId);

/**
* @param jobExecutionId the job ExecutionId
*/
void deleteJob(String jobExecutionId) throws ExchangisJobServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.*;
Expand Down Expand Up @@ -226,12 +227,20 @@ public ExchangisCategoryLogVo getTaskLogInfo(String taskId, String jobExecutionI
}

@Override
public List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jobName, String status,
Long launchStartTime, Long launchEndTime, Integer current, Integer size) {
public List<ExchangisLaunchedJobListVO> getExecutedJobList(String jobExecutionId, String jobName, String status,
Long launchStartTime, Long launchEndTime, int current, int size) {
if (current <= 0) {
current = 1;
}
if (size <= 0) {
size = 10;
}
int start = (current - 1) * size;
List<ExchangisLaunchedJobListVO> jobList = new ArrayList<>();
Date startTime = launchStartTime == null ? null : new Date(launchStartTime);
Date endTime = launchEndTime == null ? null : new Date(launchEndTime);
List<LaunchedExchangisJobEntity> jobEntitylist = launchedJobDao.getAllLaunchedJob(jobId, jobName, status, startTime, endTime);
List<LaunchedExchangisJobEntity> jobEntitylist = launchedJobDao.getAllLaunchedJob(jobExecutionId, jobName, status, startTime, endTime, start, size);
LOG.info("ExecutedJobList information: " + jobExecutionId + jobName + status + launchStartTime + launchEndTime + current + size);
if(jobEntitylist != null) {
try {
for (int i = 0; i < jobEntitylist.size(); i++) {
Expand All @@ -251,7 +260,7 @@ public List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jo
int flows = 0;
int taskNum = launchedExchangisTaskEntities.size();
for (int j = 0; j < taskNum; j++) {
if(launchedExchangisTaskEntities.get(j).getMetricsMap() == null){
if(launchedExchangisTaskEntities.get(j).getMetricsMap() == null || launchedExchangisTaskEntities.get(j).getMetricsMap().get("traffic") == null){
flows += 0;
continue;
}
Expand All @@ -270,7 +279,7 @@ public List<ExchangisLaunchedJobListVO> getExecutedJobList(Long jobId, String jo
}

@Override
public int count(Long jobId, String jobName, String status, Long launchStartTime, Long launchEndTime) {
public int count(String jobExecutionId, String jobName, String status, Long launchStartTime, Long launchEndTime) {
Date startTime = launchStartTime == null ? null : new Date(launchStartTime);
Date endTime = launchEndTime == null ? null : new Date(launchEndTime);
return 100;
Expand Down Expand Up @@ -325,9 +334,17 @@ private ExchangisCategoryLogVo resultToCategoryLog(LogResult logResult, TaskStat
return categoryLogVo;
}

/* public static void main(String[] args) {
ExchangisJobParamsContent.ExchangisJobParamsItem jobParamsContent = new ExchangisJobParamsContent.ExchangisJobParamsItem();
jobParamsContent = Json.fromJson("", jobParamsContent.getClass());
System.out.println(jobParamsContent.getConfigValue());
}*/
@Transactional
@Override
public void deleteJob(String jobExecutionId) throws ExchangisJobServerException {
List<String> taskStatusList = launchedTaskDao.getTaskStatusList(jobExecutionId);
if(taskStatusList.contains("Inited") || taskStatusList.contains("Scheduled") || taskStatusList.contains("Running") || taskStatusList.contains("WaitForRetry")){
throw new ExchangisJobServerException(JOB_EXCEPTION_CODE.getCode(), "不能删除该作业");
}else {
launchedTaskDao.deleteTask(jobExecutionId);
launchedJobDao.deleteJob(jobExecutionId);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ public Message ExecutedJobKill(@PathVariable(value = "jobExecutionId") String jo
}

@RequestMapping(value = "/listJobs", method = RequestMethod.GET)
public Message listJobs(@RequestParam(value = "jobId", required = false) Long jobId,
public Message listJobs(@RequestParam(value = "jobExecutionId", required = false) String jobExecutionId,
@RequestParam(value = "jobName", required = false) String jobName,
@RequestParam(value = "status", required = false) String status,
@RequestParam(value = "launchStartTime", required = false) Long launchStartTime,
@RequestParam(value = "launchEndTime", required = false) Long launchEndTime,
@RequestParam(value = "current", required = false) Integer current,
@RequestParam(value = "size", required = false) Integer size) {
List<ExchangisLaunchedJobListVO> jobList = executeService.getExecutedJobList(jobId, jobName, status,
@RequestParam(value = "current", required = false) int current,
@RequestParam(value = "size", required = false) int size) {
List<ExchangisLaunchedJobListVO> jobList = executeService.getExecutedJobList(jobExecutionId, jobName, status,
launchStartTime, launchEndTime, current, size);
int total = executeService.count(jobId, jobName, status, launchStartTime, launchEndTime);
int total = executeService.count(jobExecutionId, jobName, status, launchStartTime, launchEndTime);
Message message = Message.ok("Submitted succeed(提交成功)!");
message.setMethod("/api/rest_j/v1/exchangis/job/execution/listJobs");
message.data("jobList", jobList);
Expand Down Expand Up @@ -187,4 +187,14 @@ public Message listJobs(@RequestParam(value = "dataSourceType", required = false
private boolean hasAuthority(String username, ExchangisJobInfo jobInfo){
return username.equals(jobInfo.getCreateUser());
}

@RequestMapping( value = "/{jobExecutionId}/deleteJob", method = RequestMethod.POST)
public Message ExecutedJobDelete(@PathVariable(value = "jobExecutionId") String jobExecutionId) throws ExchangisJobServerException {
//ExchangisLaunchedJobEntity jobAndTaskStatus = exchangisExecutionService.getExecutedJobAndTaskStatus(jobExecutionId);
executeService.deleteJob(jobExecutionId);
Message message = Message.ok("Kill succeed(停止成功)!");
message.setMethod("/api/rest_j/v1/exchangis/job/" + jobExecutionId + "/deleteJob");
message.data("jobExecutionId", jobExecutionId);
return message;
}
}