From 916ca1faa3281fee2db0480bd6aef8f4c9523f9c Mon Sep 17 00:00:00 2001 From: shinobi6xp <445848285@qq.com> Date: Thu, 21 Sep 2023 15:16:18 +0800 Subject: [PATCH] =?UTF-8?q?fixbug:=E8=A7=A3=E5=86=B3=E5=A4=8D=E5=88=B6flow?= =?UTF-8?q?=E5=90=8E=E5=8F=91=E5=B8=83=E5=88=B0=E8=B0=83=E5=BA=A6=E5=B9=B3?= =?UTF-8?q?=E5=8F=B0=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/job/OrchestratorCopyJob.java | 75 +++++++++++++++++-- .../server/service/OrchestratorService.java | 12 +++ .../OrchestratorFrameworkServiceImpl.java | 2 +- .../service/impl/OrchestratorServiceImpl.java | 41 ++++++++++ 4 files changed, 124 insertions(+), 6 deletions(-) diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java index 33828ce96d..0db545e398 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java @@ -1,17 +1,22 @@ package com.webank.wedatasphere.dss.orchestrator.server.job; import com.google.common.collect.Lists; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationService; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.DSSOrchestrationContentRequestRef; +import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.OrchestrationResponseRef; import com.webank.wedatasphere.dss.common.exception.DSSErrorException; import com.webank.wedatasphere.dss.common.label.DSSLabel; import com.webank.wedatasphere.dss.common.utils.MapUtils; -import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorCopyInfo; -import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo; -import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion; +import com.webank.wedatasphere.dss.orchestrator.common.entity.*; import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant; import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator; import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils; +import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper; import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils; import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorCopyVo; +import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService; +import com.webank.wedatasphere.dss.orchestrator.server.service.impl.OrchestratorFrameworkServiceImpl; import com.webank.wedatasphere.dss.standard.app.development.operation.RefCopyOperation; import com.webank.wedatasphere.dss.standard.app.development.ref.CopyRequestRef; import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentResponseRef; @@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable { protected OrchestratorCopyEnv orchestratorCopyEnv; + private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl; + + private OrchestratorService orchestratorService; + + private OrchestratorMapper orchestratorMapper; + private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo(UUID.randomUUID().toString()); @@ -66,9 +77,10 @@ private void copyOrchestrator() { newOrchestrator.setDesc("copy from " + sourceOrchestrator.getName()); newOrchestrator.setUpdateTime(null); newOrchestrator.setUpdateUser(null); + DSSOrchestratorVersion dssOrchestratorVersion = null; try { - doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator, + dssOrchestratorVersion = doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator, orchestratorCopyVo.getTargetProjectName(), Lists.newArrayList(orchestratorCopyVo.getDssLabel()), appId); } catch (Exception e) { //保存错误信息 @@ -91,9 +103,37 @@ private void copyOrchestrator() { orchestratorCopyInfo.setSuccessNode(Lists.newArrayList("All")); orchestratorCopyInfo.setStatus(1); orchestratorCopyEnv.getOrchestratorCopyJobMapper().updateCopyStatus(orchestratorCopyInfo); + + List dssLabels = new ArrayList<>(); + dssLabels.add(orchestratorCopyVo.getDssLabel()); + + //2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求 + OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl.tryOrchestrationOperation(dssLabels, true, orchestratorCopyVo.getUsername(), + orchestratorCopyVo.getTargetProjectName(), orchestratorCopyVo.getWorkspace(), newOrchestrator, + OrchestrationService::getOrchestrationCreationOperation, + (structureOperation, structureRequestRef) -> ((OrchestrationCreationOperation) structureOperation) + .createOrchestration((DSSOrchestrationContentRequestRef) structureRequestRef), "create"); + + try { + orchestratorService.copyOrchestrator(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), orchestratorCopyVo.getTargetProjectName(), + orchestratorCopyVo.getTargetProjectId(), newOrchestrator.getDesc(), newOrchestrator, dssLabels); + } catch (Exception e) { + throw new RuntimeException("error happened when copying orc.", e); + } + + Long orchestratorId = newOrchestrator.getId(); + Long orchestratorVersionId = dssOrchestratorVersion.getId(); + //4.将工程和orchestrator的关系存储到的数据库中 + if (orchestrationResponseRef != null) { + Long refProjectId = (Long) orchestrationResponseRef.toMap().get("refProjectId"); + orchestratorMapper.addOrchestratorRefOrchestration(new DSSOrchestratorRefOrchestration(orchestratorId, refProjectId, orchestrationResponseRef.getRefOrchestrationId())); + } else { + LOGGER.info("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null.", newOrchestrator.getName(), orchestratorId, orchestratorVersionId); + + } } - private void doOrchestratorCopy(String userName, + private DSSOrchestratorVersion doOrchestratorCopy(String userName, Workspace workspace, DSSOrchestratorInfo dssOrchestratorInfo, String projectName, @@ -132,6 +172,7 @@ private void doOrchestratorCopy(String userName, orchestratorCopyEnv.getOrchestratorMapper().addOrchestrator(dssOrchestratorInfo); dssOrchestratorVersion.setOrchestratorId(dssOrchestratorInfo.getId()); orchestratorCopyEnv.getOrchestratorMapper().addOrchestratorVersion(dssOrchestratorVersion); + return dssOrchestratorVersion; } @@ -158,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() { public void setOrchestratorCopyInfo(DSSOrchestratorCopyInfo orchestratorCopyInfo) { this.orchestratorCopyInfo = orchestratorCopyInfo; } + + public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl() { + return orchestratorFrameworkServiceImpl; + } + + public void setOrchestratorFrameworkServiceImpl(OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl) { + this.orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl; + } + + public OrchestratorService getOrchestratorService() { + return orchestratorService; + } + + public void setOrchestratorService(OrchestratorService orchestratorService) { + this.orchestratorService = orchestratorService; + } + + public OrchestratorMapper getOrchestratorMapper() { + return orchestratorMapper; + } + + public void setOrchestratorMapper(OrchestratorMapper orchestratorMapper) { + this.orchestratorMapper = orchestratorMapper; + } } diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java index 711b79f65d..0e19e5860e 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java @@ -70,6 +70,18 @@ void deleteOrchestrator(String userName, String projectName, Long orchestratorInfoId, List dssLabels) throws Exception; + /** + * 复制编排 + * + */ + OrchestratorVo copyOrchestrator(String userName, + Workspace workspace, + String projectName, + Long projectId, + String description, + DSSOrchestratorInfo dssOrchestratorInfo, + List dssLabels) throws Exception; + /** * 解锁编排对应的工作流 diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java index 5e9cbef17b..a5ba12e0a1 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java @@ -177,7 +177,7 @@ public CommonOrchestratorVo createOrchestrator(String username, OrchestratorCrea return commonOrchestratorVo; } - private V tryOrchestrationOperation(List dssLabels, Boolean askProjectSender, String userName, String projectName, + public V tryOrchestrationOperation(List dssLabels, Boolean askProjectSender, String userName, String projectName, Workspace workspace, DSSOrchestratorInfo dssOrchestrator, Function getOrchestrationOperation, BiFunction responseRefConsumer, String operationName) { diff --git a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java index f66c567770..4c298d4b21 100644 --- a/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java +++ b/dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java @@ -238,6 +238,47 @@ public void deleteOrchestrator(String userName, orchestratorMapper.deleteOrchestrator(orchestratorInfoId); } + @Override + @Transactional(rollbackFor = Exception.class) + public OrchestratorVo copyOrchestrator(String userName, + Workspace workspace, + String projectName, + Long projectId, + String description, + DSSOrchestratorInfo dssOrchestratorInfo, + List dssLabels) throws Exception { + OrchestratorVo orchestratorVo = new OrchestratorVo(); + //todo 增加校验 + String uuid = UUID.randomUUID().toString(); + + //作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。 + dssOrchestratorInfo.setUUID(uuid); + + String version = OrchestratorUtils.generateNewVersion(); + String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName); + LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName()); + //1. 访问DSS工作流微模块创建工作流 + RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null, + developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(), + dssContextRequestRef -> dssContextRequestRef.setContextId(contextId), + projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId), + (developmentOperation, developmentRequestRef) -> { + DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName, + workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels); + Map dssJobContent = MapUtils.newCommonMapBuilder() + .put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo) + .put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version) + .put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator) + .map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName) + .map(Object::toString).orElse("NULL")).build(); + DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef; + requestRef.setDSSJobContent(dssJobContent); + return ((RefCreationOperation) developmentOperation).createRef(requestRef); + }, "create"); + + return orchestratorVo; + } + @Override public OrchestratorUnlockVo unlockOrchestrator(String userName, Workspace workspace,