Skip to content

Commit

Permalink
fix WeBankFinTech#193. Replace DWS with DSS
Browse files Browse the repository at this point in the history
  • Loading branch information
Adamyuanyuan committed Jul 2, 2020
1 parent 4dadcfe commit 3de5916
Show file tree
Hide file tree
Showing 107 changed files with 901 additions and 914 deletions.
4 changes: 2 additions & 2 deletions docs/zh_CN/ch4/DSS工程发布调度系统架构设计.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

(1)从数据库读取最新版本的工程、工作流信息,获取所有的保存在BML库工作流JSON文件。

(2)将上面的数据库内容,JSON文件内容分别转成DSS中的DWSProject,DWSFlow,如果存在子flow,则需要一并设置到flow中,保持原来的层级关系和依赖关系,构建好DWSProject,其中包含了工程下所有的DWSFlow
(2)将上面的数据库内容,JSON文件内容分别转成DSS中的DSSProject,DSSFlow,如果存在子flow,则需要一并设置到flow中,保持原来的层级关系和依赖关系,构建好DSSProject,其中包含了工程下所有的DSSFlow
一个工作流JSON包含了所有节点的定义,并存储了节点之间的依赖关系,以及工作流自身的属性信息。

(3)将DWSProject经过工程转换器转成SchedulerProject,转成SchedulerProject的过程中,同时完成了DWSJSONFlow到SchedulerFlow的转换,也完成了DWSNode到SchedulerNode的转换
(3)将DSSProject经过工程转换器转成SchedulerProject,转成SchedulerProject的过程中,同时完成了DSSJSONFlow到SchedulerFlow的转换,也完成了DSSNode到SchedulerNode的转换

(4)使用ProjectTuning对整个SchedulerProject工程进行tuning操作,用于完成工程发布前的整体调整操作,在Azkaban的实现中主要完成了工程的路径设置和工作流的存储路径设置。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*
*/

package com.webank.wedatasphere.dss.server.dao;
package com.webank.wedatasphere.dss.application.dao;

import org.apache.ibatis.annotations.Mapper;
import com.webank.wedatasphere.dss.application.entity.DSSUser;

/**
* Created by chaogefeng on 2019/10/11.
*/
public interface DSSApplicationUserMapper {
DSSUser getUserByName(String username);

public interface DWSUserMapper {
Long getUserID(String userName);
void registerDssUser(DSSUser userDb);

String getuserName(Long userID);
void updateUserFirstLogin(Long userId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >


<mapper namespace="com.webank.wedatasphere.dss.application.dao.DSSUserMapper">
<mapper namespace="com.webank.wedatasphere.dss.application.dao.DSSApplicationUserMapper">

<sql id="dss_user">
id,`username`,`name`,`is_first_login`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.webank.wedatasphere.dss.application.handler;

import com.webank.wedatasphere.dss.application.entity.DSSUser;
import com.webank.wedatasphere.dss.application.service.DSSUserService;
import com.webank.wedatasphere.dss.application.service.DSSApplicationUserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -33,7 +33,7 @@ public class UserFirstLoginHandler implements Handler {
private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private DSSUserService dssUserService;
private DSSApplicationUserService dssApplicationUserService;

@Override
public int getOrder() {
Expand All @@ -44,15 +44,15 @@ public int getOrder() {
public void handle(DSSUser user) {
logger.info("UserFirstLoginHandler:");
synchronized (user.getUsername().intern()){
DSSUser userDb = dssUserService.getUserByName(user.getUsername());
DSSUser userDb = dssApplicationUserService.getUserByName(user.getUsername());
if(userDb == null){
logger.info("User first enter dss, insert table dss_user");
userDb = new DSSUser();
userDb.setUsername(user.getUsername());
userDb.setName(user.getName());
userDb.setFirstLogin(true);
userDb.setId(user.getId());
dssUserService.registerDSSUser(userDb);
dssApplicationUserService.registerDssUser(userDb);
}
// TODO: 2019/11/29 update firstLogin
user = userDb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.webank.wedatasphere.dss.application.entity.DSSUserVO;
import com.webank.wedatasphere.dss.application.handler.ApplicationHandlerChain;
import com.webank.wedatasphere.dss.application.service.ApplicationService;
import com.webank.wedatasphere.dss.application.service.DSSUserService;
import com.webank.wedatasphere.dss.application.service.DSSApplicationUserService;
import com.webank.wedatasphere.dss.application.util.ApplicationUtils;
import com.webank.wedatasphere.linkis.server.Message;
import com.webank.wedatasphere.linkis.server.security.SecurityFilter;
Expand Down Expand Up @@ -51,7 +51,7 @@ public class ApplicationRestfulApi {
@Autowired
private ApplicationService applicationService;
@Autowired
private DSSUserService dataworkisUserService;
private DSSApplicationUserService dataworkisUserService;
@Autowired
private ApplicationHandlerChain applicationHandlerChain;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
/**
* Created by chaogefeng on 2019/10/11.
*/
public interface DSSUserService {
public interface DSSApplicationUserService {

DSSUser getUserByName(String username);

void registerDSSUser(DSSUser userDb);
void registerDssUser(DSSUser userDb);

void updateUserFirstLogin(Long id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,33 @@

package com.webank.wedatasphere.dss.application.service.impl;

import com.webank.wedatasphere.dss.application.dao.DSSUserMapper;
import com.webank.wedatasphere.dss.application.dao.DSSApplicationUserMapper;
import com.webank.wedatasphere.dss.application.entity.DSSUser;
import com.webank.wedatasphere.dss.application.service.DSSUserService;
import com.webank.wedatasphere.dss.application.service.DSSApplicationUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* Created by chaogefeng on 2019/10/11.
*/
@Service
public class DSSUserServiceImpl implements DSSUserService {
public class DSSApplicationUserServiceImpl implements DSSApplicationUserService {

@Autowired
private DSSUserMapper dssUserMapper;
private DSSApplicationUserMapper dssApplicationUserMapper;

@Override
public DSSUser getUserByName(String username) {
return dssUserMapper.getUserByName(username);
return dssApplicationUserMapper.getUserByName(username);
}

@Override
public void registerDSSUser(DSSUser userDb) {
dssUserMapper.registerDSSUser( userDb);
public void registerDssUser(DSSUser userDb) {
dssApplicationUserMapper.registerDssUser( userDb);
}

@Override
public void updateUserFirstLogin(Long id) {
dssUserMapper.updateUserFirstLogin(id);
dssApplicationUserMapper.updateUserFirstLogin(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package com.webank.wedatasphere.dss.application.service.impl;

import com.webank.wedatasphere.dss.application.dao.DSSUserMapper;
import com.webank.wedatasphere.dss.application.dao.DSSApplicationUserMapper;
import com.webank.wedatasphere.dss.application.dao.LinkisUserMapper;
import com.webank.wedatasphere.dss.application.entity.DSSUser;
import com.webank.wedatasphere.dss.application.entity.LinkisUser;
Expand All @@ -33,7 +33,7 @@ public class LinkisUserServiceImpl implements LinkisUserService {
@Autowired
private LinkisUserMapper linkisUserMapper;
@Autowired
private DSSUserMapper dssUserMapper;
private DSSApplicationUserMapper dssApplicationUserMapper;

@Override
public LinkisUser getUserByName(String username) {
Expand All @@ -55,6 +55,6 @@ public void registerDSSUser(LinkisUser userDb) {
dssUser.setName(userDb.getName());
dssUser.setUsername(userDb.getUserName());
dssUser.setFirstLogin(userDb.getFirstLogin());
dssUserMapper.registerDSSUser(dssUser);
dssApplicationUserMapper.registerDssUser(dssUser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void postPublish(SchedulerNode schedulerNode) {
}

private void writeNodeResourcesToLocal(SchedulerNode schedulerNode) throws DSSErrorException {
List<Resource> nodeResources = schedulerNode.getDWSNode().getResources();
List<Resource> nodeResources = schedulerNode.getDssNode().getResources();
if(nodeResources == null || nodeResources.isEmpty()) {return;}
FileOutputStream os = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private void removeProjectStoreDirAndzip(AzkabanSchedulerProject publishProject)
}

private void writeProjectResourcesToLocal(AzkabanSchedulerProject publishProject)throws DSSErrorException {
List<Resource> resources = publishProject.getDWSProject().getProjectResources();
List<Resource> resources = publishProject.getDssProject().getProjectResources();
FileOutputStream os = null;
try {
String storePath = publishProject.getStorePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private void convertHead(LinkisAzkabanSchedulerNode schedulerNode,LinkisJob job)
}

private void convertDependencies(LinkisAzkabanSchedulerNode schedulerNode,LinkisJob job){
List<String> dependencys = schedulerNode.getDWSNode().getDependencys();
List<String> dependencys = schedulerNode.getDssNode().getDependencys();
if(dependencys != null && !dependencys.isEmpty()) {
StringBuilder dependencies = new StringBuilder();
dependencys.forEach(d ->dependencies.append(d + ","));
Expand All @@ -88,12 +88,12 @@ private void convertDependencies(LinkisAzkabanSchedulerNode schedulerNode,Linkis
}

private void convertProxyUser(LinkisAzkabanSchedulerNode schedulerNode,LinkisJob job){
String userProxy = schedulerNode.getDWSNode().getUserProxy();
String userProxy = schedulerNode.getDssNode().getUserProxy();
if(!StringUtils.isEmpty(userProxy)) job.setProxyUser(userProxy);
}

private void convertConfiguration(LinkisAzkabanSchedulerNode schedulerNode,LinkisJob job){
Map<String, Object> params = schedulerNode.getDWSNode().getParams();
Map<String, Object> params = schedulerNode.getDssNode().getParams();
if (params != null && !params.isEmpty()) {
Object configuration = params.get("configuration");
String confprefix = "node.conf.";
Expand All @@ -103,7 +103,7 @@ private void convertConfiguration(LinkisAzkabanSchedulerNode schedulerNode,Linki
}

private void convertJobCommand(LinkisAzkabanSchedulerNode schedulerNode,LinkisJob job){
Map<String, Object> jobContent = schedulerNode.getDWSNode().getJobContent();
Map<String, Object> jobContent = schedulerNode.getDssNode().getJobContent();
if(jobContent != null) {
jobContent.remove("jobParams");
job.setCommand(new Gson().toJson(jobContent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.webank.wedatasphere.dss.appjoint.scheduler.parser.AbstractFlowParser;
import com.webank.wedatasphere.dss.appjoint.scheduler.parser.NodeParser;
import com.webank.wedatasphere.dss.appjoint.scheduler.azkaban.entity.AzkabanSchedulerFlow;
import com.webank.wedatasphere.dss.common.entity.flow.DWSJSONFlow;
import com.webank.wedatasphere.dss.common.entity.flow.DSSJSONFlow;
import com.webank.wedatasphere.dss.appjoint.scheduler.entity.SchedulerFlow;
import java.util.ArrayList;

Expand All @@ -30,7 +30,7 @@ public void setNodeParsers(NodeParser[] nodeParsers) {
}

@Override
public Boolean ifFlowCanParse(DWSJSONFlow flow) {
public Boolean ifFlowCanParse(DSSJSONFlow flow) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.webank.wedatasphere.dss.appjoint.scheduler.azkaban.parser;

import com.webank.wedatasphere.dss.appjoint.scheduler.azkaban.entity.LinkisAzkabanSchedulerNode;
import com.webank.wedatasphere.dss.common.entity.node.DWSNode;
import com.webank.wedatasphere.dss.common.entity.node.DSSNode;
import com.webank.wedatasphere.dss.appjoint.scheduler.entity.SchedulerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,14 +12,14 @@ public class LinkisAzkabanNodeParser extends AzkabanNodeParser {


@Override
public SchedulerNode parseNode(DWSNode dwsNode) {
public SchedulerNode parseNode(DSSNode dssNode) {
LinkisAzkabanSchedulerNode schedulerNode = new LinkisAzkabanSchedulerNode();
schedulerNode.setDWSNode(dwsNode);
schedulerNode.setDssNode(dssNode);
return schedulerNode;
}

@Override
public Boolean ifNodeCanParse(DWSNode dwsNode) {
public Boolean ifNodeCanParse(DSSNode dssNode) {
//预留
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private void assignStorePath(AzkabanSchedulerProject azkabanSchedulerProject) {
SimpleDateFormat dateFormat = new SimpleDateFormat(AzkabanSchedulerProject.DATE_FORMAT);
Date date = new Date();
String dataStr = dateFormat.format(date);
String userName = azkabanSchedulerProject.getDWSProject().getUserName();
String userName = azkabanSchedulerProject.getDssProject().getUserName();
String name = azkabanSchedulerProject.getName();
String storePath = AzkabanConf.DEFAULT_STORE_PATH.getValue() + File.separator + userName
+ File.separator + dataStr + File.separator +name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.webank.wedatasphere.dss.appjoint.scheduler.entity.SchedulerNode;
import com.webank.wedatasphere.dss.appjoint.scheduler.tuning.AbstractFlowTuning;
import com.webank.wedatasphere.dss.appjoint.scheduler.tuning.NodeTuning;
import com.webank.wedatasphere.dss.common.entity.node.DWSNodeDefault;
import com.webank.wedatasphere.dss.common.entity.node.DSSNodeDefault;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -58,7 +58,7 @@ public Boolean ifFlowCanTuning(SchedulerFlow schedulerFlow) {
}

private SchedulerFlow addEndNodeForFlowName(SchedulerFlow flow) {
DWSNodeDefault endNode = new DWSNodeDefault();
DSSNodeDefault endNode = new DSSNodeDefault();
List<SchedulerNode> endNodeList = getFlowEndJobList(flow);
endNode.setId(flow.getName() + "_");
endNode.setName(flow.getName() + "_");
Expand All @@ -70,7 +70,7 @@ private SchedulerFlow addEndNodeForFlowName(SchedulerFlow flow) {
endNodeList.forEach(tmpNode -> endNode.addDependency(tmpNode.getName()));
}
LinkisAzkabanSchedulerNode azkabanSchedulerNode = new LinkisAzkabanSchedulerNode();
azkabanSchedulerNode.setDWSNode(endNode);
azkabanSchedulerNode.setDssNode(endNode);
flow.getSchedulerNodes().add((azkabanSchedulerNode));
return flow;
}
Expand All @@ -80,7 +80,7 @@ private List<SchedulerNode> getFlowEndJobList(SchedulerFlow flow) {
for (SchedulerNode job : flow.getSchedulerNodes()) {
int flag = 0;
for (SchedulerEdge link : flow.getSchedulerEdges()) {
if (job.getId().equals(link.getDWSEdge().getSource())) {
if (job.getId().equals(link.getDssEdge().getSource())) {
flag = 1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Created by enjoyyin on 2019/5/14.
*/
public class DWSFlow implements Flow {
public class DSSFlow implements Flow {
private Long id;
private String name;
private Boolean state; //0,1代表发布过和未发布过
Expand All @@ -38,11 +38,11 @@ public class DWSFlow implements Flow {
private Boolean hasSaved;//0disable 1 enable 0表示工作流从来没存过,发布的时候忽略
private String uses;

private List<DWSFlowVersion> versions; //为了前台不做修改,还是使用versions 而不使用flowVersions的变量名
private List<DWSFlow> children;
private List<DSSFlowVersion> versions; //为了前台不做修改,还是使用versions 而不使用flowVersions的变量名
private List<DSSFlow> children;
private String flowType;

private DWSFlowVersion latestVersion;
private DSSFlowVersion latestVersion;


public Integer getRank() {
Expand Down Expand Up @@ -86,27 +86,27 @@ public void setDescription(String description) {

@Override
public void addFlowVersion(FlowVersion flowVersion) {
this.versions.add((DWSFlowVersion) flowVersion);
this.versions.add((DSSFlowVersion) flowVersion);
}

@Override
public List<? extends DWSFlow> getChildren() {
public List<? extends DSSFlow> getChildren() {
return children;
}

@Override
public void setChildren(List<? extends Flow> children) {
this.children = children.stream().map(f ->(DWSFlow)f).collect(Collectors.toList());
this.children = children.stream().map(f ->(DSSFlow)f).collect(Collectors.toList());
}

@Override
public List<DWSFlowVersion> getFlowVersions() {
public List<DSSFlowVersion> getFlowVersions() {
return this.versions;
}

@Override
public void setFlowVersions(List<? extends FlowVersion> flowVersions) {
this.versions = flowVersions.stream().map(f ->(DWSFlowVersion)f).collect(Collectors.toList());
this.versions = flowVersions.stream().map(f ->(DSSFlowVersion)f).collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -176,11 +176,11 @@ public void setHasSaved(Boolean hasSaved) {
this.hasSaved = hasSaved;
}

public DWSFlowVersion getLatestVersion() {
public DSSFlowVersion getLatestVersion() {
return latestVersion;
}

public void setLatestVersion(DWSFlowVersion latestVersion) {
public void setLatestVersion(DSSFlowVersion latestVersion) {
this.latestVersion = latestVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
/**
* Created by enjoyyin on 2019/9/19.
*/
public class DWSFlowPublishHistory {
public class DSSFlowPublishHistory {
}
Loading

0 comments on commit 3de5916

Please sign in to comment.