Skip to content

Commit

Permalink
fix: sql execution repository error
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Jan 29, 2025
1 parent 8b25b7e commit 6eff622
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private ExecutionStatus determineStatus(StageExecution stage) {
} else if (afterStageStatuses.contains(ExecutionStatus.NOT_STARTED)) {
return ExecutionStatus.RUNNING; // 后置阶段已计划但尚未运行
} else {
log.error("Unhandled condition for stage {} of {}, marking as TERMINAL. " +
log.error("Unhandled condition for stage (id={}) of pipeline (id={}), marking as TERMINAL. " +
"syntheticStatuses={}, taskStatuses={}, planningStatus={}, afterStageStatuses={}",
stage.getId(),
stage.getPipelineExecution().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import cn.sliew.carp.framework.dag.service.DagConfigService;
import cn.sliew.carp.framework.dag.service.DagConfigStepService;
import cn.sliew.carp.framework.dag.service.dto.orca.CarpDagOrcaPipelineDTO;
import cn.sliew.carp.framework.dag.service.dto.orca.CarpDagOrcaPipelineStageDTO;
import cn.sliew.carp.framework.dag.service.param.orca.CarpDagOrcaPipelineAddParam;
import cn.sliew.carp.framework.dag.service.param.orca.CarpDagOrcaPipelineStageAddParam;
import cn.sliew.carp.framework.dag.service.param.orca.CarpDagOrcaPipelineStageUpdateParam;
Expand All @@ -39,7 +40,9 @@
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

public class SqlExecutionRepository implements ExecutionRepository {

Expand All @@ -54,11 +57,19 @@ public SqlExecutionRepository(CarpDagOrcaPipelineService carpDagOrcaPipelineServ

@Override
public PipelineExecution retrieve(ExecutionType type, Long id) throws ExecutionNotFoundException {
CarpDagOrcaPipelineDTO carpDagOrcaPipelineDTO = carpDagOrcaPipelineService.get(id);
Optional<CarpDagOrcaPipelineDTO> optional = carpDagOrcaPipelineService.get(id);
CarpDagOrcaPipelineDTO carpDagOrcaPipelineDTO = optional.orElseThrow();
PipelineExecution execution = JacksonUtil.toObject(carpDagOrcaPipelineDTO.getBody(), PipelineExecution.class);
if (CollectionUtils.isEmpty(execution.getStages()) == false) {
execution.getStages().forEach(stage -> ((StageExecutionImpl) stage).setPipelineExecution(execution));
List<StageExecution> stages = execution.getStages().stream().map(stage -> {
Optional<CarpDagOrcaPipelineStageDTO> stageOptional = carpDagOrcaPipelineService.getStage(stage.getId());
StageExecution stageExecution = stageOptional.map(item -> JacksonUtil.toObject(item.getBody(), StageExecution.class)).orElse(stage);
((StageExecutionImpl) stageExecution).setPipelineExecution(execution);
return stageExecution;
}).collect(Collectors.toList());
((PipelineExecutionImpl) execution).setStages(stages);
}

return execution;
}

Expand All @@ -70,8 +81,8 @@ public Observable<PipelineExecution> retrieve(ExecutionType type) {
@Override
public Long store(PipelineExecution execution) {
// upsert PipelineExecution -> CarpDagOrcaPipelineDTO
try {
carpDagOrcaPipelineService.get(execution.getId());
Optional<CarpDagOrcaPipelineDTO> optional = carpDagOrcaPipelineService.get(execution.getId());
if (optional.isPresent()) {
CarpDagOrcaPipelineUpdateParam updateParam = CarpDagOrcaPipelineUpdateParam.builder()
.id(execution.getId())
.uuid(execution.getUuid())
Expand All @@ -89,7 +100,7 @@ public Long store(PipelineExecution execution) {

carpDagOrcaPipelineService.update(updateParam);
return execution.getId();
} catch (NullPointerException ignored) {
} else {
CarpDagOrcaPipelineAddParam addParam = CarpDagOrcaPipelineAddParam.builder()
.id(execution.getId())
.uuid(execution.getUuid())
Expand All @@ -105,18 +116,19 @@ public Long store(PipelineExecution execution) {

@Override
public Long storeStage(StageExecution stage) {
try {
carpDagOrcaPipelineService.getStage(stage.getId());
Optional<CarpDagOrcaPipelineStageDTO> optional = carpDagOrcaPipelineService.getStage(stage.getId());
if (optional.isPresent()) {
// update
CarpDagOrcaPipelineStageUpdateParam param = new CarpDagOrcaPipelineStageUpdateParam();
param.setId(stage.getId());
param.setUuid(stage.getUuid());
param.setPipelineId(stage.getPipelineExecution().getId());
param.setStatus(stage.getStatus().name());
param.setBody(JacksonUtil.toJsonNode(stage));
CarpDagOrcaPipelineStageUpdateParam param = CarpDagOrcaPipelineStageUpdateParam.builder()
.id(stage.getId())
.uuid(stage.getUuid())
.pipelineId(stage.getPipelineExecution().getId())
.status(stage.getStatus().name())
.body(JacksonUtil.toJsonNode(stage))
.build();
carpDagOrcaPipelineService.updateStage(param);
return stage.getId();
} catch (NullPointerException ignored) {
} else {
// insert
return addStage(stage);
}
Expand All @@ -125,6 +137,7 @@ public Long storeStage(StageExecution stage) {
@Override
public Long addStage(StageExecution stage) {
CarpDagOrcaPipelineStageAddParam addParam = CarpDagOrcaPipelineStageAddParam.builder()
.id(stage.getId())
.uuid(stage.getUuid())
.pipelineId(stage.getPipelineExecution().getId())
.status(stage.getStatus().name())
Expand All @@ -136,6 +149,8 @@ public Long addStage(StageExecution stage) {
@Override
public void removeStage(PipelineExecution execution, Long stageId) {
carpDagOrcaPipelineService.deleteStage(stageId);
execution.getStages().removeIf(stage -> Objects.equals(stage.getId(), stageId));
store(execution);
}

@Override
Expand All @@ -145,8 +160,8 @@ public void updateStageContext(StageExecution stage) {

@Override
public boolean isCanceled(ExecutionType type, Long id) {
CarpDagOrcaPipelineDTO carpDagOrcaPipelineDTO = carpDagOrcaPipelineService.get(id);
return carpDagOrcaPipelineDTO.getCanceled();
Optional<CarpDagOrcaPipelineDTO> optional = carpDagOrcaPipelineService.get(id);
return optional.orElseThrow().getCanceled();
}

@Override
Expand Down

0 comments on commit 6eff622

Please sign in to comment.