Skip to content

Commit

Permalink
Optimize clear cache operation
Browse files Browse the repository at this point in the history
  • Loading branch information
jieguangzhou committed Dec 16, 2022
1 parent 1269505 commit e5f80d2
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.dolphinscheduler.api.utils.Result;

import java.util.List;

import lombok.Data;

/**
Expand All @@ -29,7 +27,6 @@
@Data
public class TaskInstanceRemoveCacheResponse extends Result {

private List<Integer> deleteTaskInstanceCacheIds;
private String cacheKey;

public TaskInstanceRemoveCacheResponse(Result result) {
Expand All @@ -38,11 +35,10 @@ public TaskInstanceRemoveCacheResponse(Result result) {
this.setMsg(result.getMsg());
}

public TaskInstanceRemoveCacheResponse(Result result, List<Integer> deleteTaskInstanceCacheIds, String cacheKey) {
public TaskInstanceRemoveCacheResponse(Result result, String cacheKey) {
super();
this.setCode(result.getCode());
this.setMsg(result.getMsg());
this.deleteTaskInstanceCacheIds = deleteTaskInstanceCacheIds;
this.cacheKey = cacheKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -348,20 +348,11 @@ public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, l
String tagCacheKey = taskInstance.getCacheKey();
Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
String cacheKey = taskIdAndCacheKey.getRight();
List<Integer> cacheTaskInstanceIds = new ArrayList<>();
while (true) {
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
if (cacheTaskInstance == null) {
break;
}
cacheTaskInstance.setCacheKey(null);
boolean updateResult = taskInstanceDao.updateTaskInstance(cacheTaskInstance);
logger.info("remove task instance cache, taskInstanceId:{}, cacheKey:{}, result:{}",
cacheTaskInstance.getId(), cacheKey, updateResult);
cacheTaskInstanceIds.add(cacheTaskInstance.getId());
if (StringUtils.isNotEmpty(cacheKey)) {
taskInstanceDao.clearCacheByCacheKey(cacheKey);
}
putMsg(result, Status.SUCCESS);
return new TaskInstanceRemoveCacheResponse(result, cacheTaskInstanceIds, cacheKey);
return new TaskInstanceRemoveCacheResponse(result, cacheKey);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processIns

TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey);

Boolean clearCacheByCacheKey(@Param("cacheKey") String cacheKey);

List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
@Param("taskCodes") List<Long> taskCodes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ public interface TaskInstanceDao {
*/
TaskInstance findTaskInstanceByCacheKey(String cacheKey);

/**
* clear task instance cache by cache_key
* @param cacheKey cache key
* @return task instance
*/
Boolean clearCacheByCacheKey(String cacheKey);

/**
* find task instance list by id list
* @param idList task id list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ public TaskInstance findTaskInstanceByCacheKey(String cacheKey) {
return taskInstanceMapper.queryByCacheKey(cacheKey);
}

@Override
public Boolean clearCacheByCacheKey(String cacheKey) {
try {
taskInstanceMapper.clearCacheByCacheKey(cacheKey);
return true;
} catch (Exception e) {
logger.error("clear cache by cacheKey failed", e);
return false;
}
}

@Override
public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
if (CollectionUtils.isEmpty(idList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@
where cache_key = #{cacheKey}
limit 1
</select>
<update id="clearCacheByCacheKey">
update t_ds_task_instance
set cache_key = null
where cache_key = #{cacheKey}
</update>
<select id="queryByProcessInstanceIdsAndTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
Expand Down

0 comments on commit e5f80d2

Please sign in to comment.