Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify publish operation and restful. #269

Merged
merged 1 commit into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ public void setData(InsertIdDTO data) {
}

public static class InsertIdDTO {
@JsonProperty(value = "insert_id")
private Long id;
@JsonProperty(value = "insertId")
private Long insertId;

public Long getId() {
return id;
return insertId;
}

public void setId(Long id) {
this.id = id;
public void setId(Long insertId) {
this.insertId = insertId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ public GetDataSourceInfoResultDTO getDataSourceByIdAndVersionId(String userName,
LinkisDataSourceRemoteClient linkisDataSourceRemoteClient = ExchangisLinkisRemoteClient.getLinkisDataSourceRemoteClient();
try {
Result execute = linkisDataSourceRemoteClient.execute(
GetDataSourceInfoByIdAndVersionIdAction.builder().setSystem("system").setUser(userName).setDataSourceId(id).setVersionId(Long.valueOf(versionId)).build()
GetDataSourceInfoByIdAndVersionIdAction.builder().setSystem("system").setUser(userName).setDataSourceId(id).setVersionId(versionId).build()
);
String responseBody = execute.getResponseBody();
GetDataSourceInfoResultDTO result = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
Expand Down Expand Up @@ -1069,7 +1069,7 @@ public Message publishDataSource(HttpServletRequest request, Long id, Long versi
PublishDataSourceVersionResult result;
try {
result = linkisDataSourceRemoteClient.publishDataSourceVersion(
new PublishDataSourceVersionAction.Builder().setUser(userName).setDataSourceId(Long.parseLong(id + "")).setVersion(version).build()
new PublishDataSourceVersionAction.Builder().setUser(userName).setDataSourceId(Long.parseLong(id + "")).setVersion(Long.parseLong(version + "")).build()
);
} catch (Exception e) {
if (e instanceof ErrorException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.linkis.httpclient.request.GetAction
class GetDataSourceInfoByIdAndVersionIdAction extends GetAction with DataSourceAction {
private var user: String = _
private var dataSourceId: Long = _
private var versionId: Long = _
private var versionId: String = _

override def setUser(user: String): Unit = this.user = user

Expand All @@ -24,7 +24,7 @@ object GetDataSourceInfoByIdAndVersionIdAction {

class Builder private[GetDataSourceInfoByIdAndVersionIdAction]() {
private var dataSourceId: Long = _
private var versionId: Long = _
private var versionId: String = _
private var system:String = _
private var user: String = _

Expand All @@ -43,7 +43,7 @@ object GetDataSourceInfoByIdAndVersionIdAction {
this
}

def setVersionId(versionId: Long): Builder = {
def setVersionId(versionId: String): Builder = {
this.versionId = versionId
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class ExchangisJobVo {
/**
* Execute user
*/
@JsonProperty("proxyUser")
//@JsonProperty("proxyUser")
private String executeUser;

/**
Expand Down Expand Up @@ -111,6 +111,8 @@ public class ExchangisJobVo {

private Map<String, Object> labels;

private String proxyUser;

public ExchangisJobVo(){

}
Expand All @@ -128,8 +130,18 @@ public ExchangisJobVo(ExchangisJobInfo jobInfo){
this.modifyTime = jobInfo.getLastUpdateTime();
this.jobParams = jobInfo.getJobParams();
this.executeUser = jobInfo.getExecuteUser();
this.proxyUser = jobInfo.getExecuteUser();
}
}

public String getProxyUser() {
return proxyUser;
}

public void setProxyUser(String proxyUser) {
this.proxyUser = proxyUser;
}

public Long getId() { return id; }

public void setId(Long id) { this.id = id; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.webank.wedatasphere.exchangis.common.validator.groups.InsertGroup;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.launcher.domain.task.TaskStatus;
import com.webank.wedatasphere.exchangis.job.server.exception.ExchangisJobServerException;
import com.webank.wedatasphere.exchangis.job.server.service.IProjectImportService;
import com.webank.wedatasphere.exchangis.job.server.service.JobInfoService;
Expand Down Expand Up @@ -29,7 +30,7 @@
* Define to support the app conn, in order to distinguish from the inner api
*/
@RestController
@RequestMapping(value = "/exchangis/dss/job", produces = {"application/json;charset=utf-8"})
@RequestMapping(value = "/dss/exchangis/main/appJob", produces = {"application/json;charset=utf-8"})
public class ExchangisJobDssAppConnRestfulApi {

private static final Logger LOG = LoggerFactory.getLogger(ExchangisJobDssAppConnRestfulApi.class);
Expand Down Expand Up @@ -83,7 +84,7 @@ public Message createJob(
* @param request http request
* @return message
*/
@RequestMapping( value = "/{id:\\d+}", method = RequestMethod.DELETE)
@RequestMapping( value = "/{id:\\d+}", method = RequestMethod.POST)
public Message deleteJob(@PathVariable("id") Long id, HttpServletRequest request) {
String userName = SecurityFilter.getLoginUsername(request);
Message response = Message.ok("dss job deleted");
Expand Down Expand Up @@ -155,6 +156,19 @@ public Message executeJob(@PathVariable("id") Long id, HttpServletRequest reques
String jobExecutionId = executeService.executeJob(jobInfo, StringUtils.isNotBlank(jobInfo.getExecuteUser()) ?
jobInfo.getExecuteUser() : loginUser);
result.data("jobExecutionId", jobExecutionId);

while (true) {
TaskStatus jobStatus = executeService.getJobStatus(jobExecutionId).getStatus();
if ("Success".equals(jobStatus) ) {
result.data("jobStatus", jobStatus);
LOG.info("Execute task success");
break;
} else if ("Cancelled".equals(jobStatus) || "Failed".equals(jobStatus) || "Undefined".equals(jobStatus) || "Timeout".equals(jobStatus)) {
result.data("jobStatus", jobStatus);
LOG.info("Execute task faild");
throw new Exception();
}
}
} catch (Exception e) {
String message;
if (Objects.nonNull(jobInfo)) {
Expand All @@ -170,11 +184,11 @@ public Message executeJob(@PathVariable("id") Long id, HttpServletRequest reques
}

@RequestMapping( value = "/import", method = RequestMethod.POST)
public Message importJob(@Context HttpServletRequest request, @RequestBody Map<String, String> params) throws ServerException, ExchangisJobServerException{
public Message importJob(@Context HttpServletRequest request, @RequestBody Map<String, Object> params) throws ServerException, ExchangisJobServerException{

Message response = null;
try {
LOG.info("param6666: {}", params);
LOG.info("param: {}", params);
response = projectImportServer.importProject(request, params);
LOG.info("import job success");
} catch (Exception e){
Expand All @@ -187,7 +201,7 @@ public Message importJob(@Context HttpServletRequest request, @RequestBody Map<S
}

@RequestMapping( value = "/export", method = RequestMethod.POST)
public Message exportJob(@Context HttpServletRequest request, @RequestBody Map<String, String> params) throws ServerException, ExchangisJobServerException {
public Message exportJob(@Context HttpServletRequest request, @RequestBody Map<String, Object> params) throws ServerException, ExchangisJobServerException {
String userName = SecurityFilter.getLoginUsername(request);

LOG.info("export function params: {}", params);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @Date 2022/3/15 10:01
*/
public interface IProjectImportService {
Message importProject(HttpServletRequest req, Map<String, String> params) throws ExchangisJobServerException, ServerException;
Message importProject(HttpServletRequest req, Map<String, Object> params) throws ExchangisJobServerException, ServerException;

IdCatalog importOpt(String projectJson, Long projectId, String versionSuffix) throws ExchangisJobServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public interface JobInfoService {
* @param username params
* @return
*/
Message exportProject(Map<String, String> params, String username, HttpServletRequest request) throws ExchangisJobServerException, ServerException;
Message exportProject(Map<String, Object> params, String username, HttpServletRequest request) throws ExchangisJobServerException, ServerException;

ExportedProject export(Long projectId, Map<String, Set<Long>> moduleIdsMap, boolean partial, HttpServletRequest request) throws ExchangisJobServerException;

Map<String, Set<Long>> getModuleIdsMap(Map<String, String> params);
Map<String, Set<Long>> getModuleIdsMap(Map<String, Object> params);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.webank.wedatasphere.exchangis.job.vo.ExchangisJobVo;
import com.webank.wedatasphere.exchangis.project.server.service.ProjectService;
import com.webank.wedatasphere.exchangis.project.server.vo.ExchangisProjectInfo;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.linkis.bml.client.BmlClient;
import org.apache.linkis.bml.client.BmlClientFactory;
Expand Down Expand Up @@ -77,7 +78,7 @@ public class DefaultJobInfoService implements JobInfoService {
@Transactional(rollbackFor = Exception.class)
public ExchangisJobVo createJob(ExchangisJobVo jobVo) {
// Define the entity
ExchangisJobEntity jobEntity = new ExchangisJobEntity();
/*ExchangisJobEntity jobEntity = new ExchangisJobEntity();
jobEntity.setProjectId(jobVo.getProjectId());
jobEntity.setJobType(jobVo.getJobType());
jobEntity.setEngineType(jobVo.getEngineType());
Expand All @@ -92,6 +93,31 @@ public ExchangisJobVo createJob(ExchangisJobVo jobVo) {
jobEntityDao.addJobEntity(jobEntity);
jobVo.setId(jobEntity.getId());
jobVo.setCreateTime(jobEntity.getCreateTime());
return jobVo;*/

ExchangisJobEntity jobEntity = new ExchangisJobEntity();
jobEntity.setProjectId(jobVo.getProjectId());
jobEntity.setJobType(jobVo.getJobType());
jobEntity.setEngineType(jobVo.getEngineType());
jobEntity.setJobLabels(jobVo.getJobLabels());
jobEntity.setName(jobVo.getJobName());
jobEntity.setJobDesc(jobVo.getJobDesc());
jobEntity.setExecuteUser(jobVo.getProxyUser());
jobEntity.setJobParams(jobVo.getJobParams());
jobEntity.setCreateUser(jobVo.getCreateUser());
jobEntity.setCreateTime(Calendar.getInstance().getTime());
jobEntity.setSource(Json.toJson(jobVo.getSource(), null));
//jobEntity.setJobContent(jobVo.getContent());
jobEntity.setModifyUser(jobVo.getModifyUser());
//Map<String, Object> contentVo = BDPJettyServerHelper.gson().fromJson(jobVo.getContent(), Map.class);
LOG.info("888888Sqoop job content is: {}, Modify user is: {}, jobType is: {}", jobVo.getContent(), jobEntity.getExecuteUser(), jobEntity.getJobType());
if(jobVo.getContent() != null) {
jobEntity.setJobContent(jobVo.getContent());
LOG.info("55555Sqoop job content is: {}, executor: {}", jobEntity.getJobContent(), jobEntity.getExecuteUser());
}
jobEntityDao.addJobEntity(jobEntity);
jobVo.setId(jobEntity.getId());
jobVo.setCreateTime(jobEntity.getCreateTime());
return jobVo;
}

Expand Down Expand Up @@ -223,7 +249,7 @@ public ExchangisJobVo updateJobConfig(ExchangisJobVo jobVo) {
ExchangisJobEntity jobEntity = this.jobEntityDao.getBasicInfo(jobVo.getId());
Map<String, Object> sourceMap = StringUtils.isNotBlank(jobEntity.getSource())?
Json.fromJson(jobEntity.getSource(), Map.class, String.class, Object.class) : null;
jobEntity.setExecuteUser(jobVo.getExecuteUser());
jobEntity.setExecuteUser(jobVo.getProxyUser());
jobEntity.setJobParams(jobVo.getJobParams());
if (Objects.isNull(sourceMap)){
sourceMap = new HashMap<>();
Expand Down Expand Up @@ -272,10 +298,10 @@ public ExchangisJobVo updateJobContent(ExchangisJobVo jobVo) throws ExchangisJob
}

@Override
public Message exportProject(Map<String, String> params, String userName, HttpServletRequest request) throws ExchangisJobServerException, ServerException {
public Message exportProject(Map<String, Object> params, String userName, HttpServletRequest request) throws ExchangisJobServerException, ServerException {
ExportedProject exportedProject = null;
Long projectId = Long.parseLong(params.get("projectId"));
Boolean partial = Boolean.parseBoolean(params.get("partial"));
Long projectId = (Long) params.get("projectId");
Boolean partial = (Boolean) params.get("partial");
Map<String, Set<Long>> moduleIdsMap = getModuleIdsMap(params);

LOG.info("export project, user: {}, project: {}, partial:{}", userName, projectId, partial);
Expand Down Expand Up @@ -328,7 +354,7 @@ private void setSqoop(Long projectId, Map<String, Set<Long>> moduleIdsMap, boole
if (longs.size() > 0) {
for (Long id : longs) {
LOG.info("id: {}", id);
ExchangisJobVo job = jobInfoService.getDecoratedJob(request, id);
ExchangisJobVo job = jobInfoService.getJob(id, false);

String sqoopStr = null;
try {
Expand All @@ -340,6 +366,7 @@ private void setSqoop(Long projectId, Map<String, Set<Long>> moduleIdsMap, boole
LOG.info("sqoopStr99999:{}", sqoopStr);
LOG.info("ExchangisJobVo sqoop: {}", job.getContent());
LOG.info("getCreateTimep: {}", job.getId());
LOG.info("executorUser999: {}", job.getExecuteUser());
sqoops.add(job);
}
exportedProject.setSqoops(sqoops);
Expand Down Expand Up @@ -382,11 +409,17 @@ private void setDatax(Long projectId, Map<String, Set<Long>> moduleIdsMap, boole
* @return
*/
@Override
public Map<String, Set<Long>> getModuleIdsMap(Map<String, String> params) {
public Map<String, Set<Long>> getModuleIdsMap(Map<String, Object> params) {

Map<String, Set<Long>> map = Maps.newHashMap();
String sqoopIdsStr = params.get("sqoopIds");
String dataxIdsStr = params.get("dataxIds");
String sqoopIdsStr = null;
if(params.get("sqoopIds") != null) {
sqoopIdsStr = params.get("sqoopIds").toString();
}
String dataxIdsStr = null;
if(params.get("dataxIds") != null) {
dataxIdsStr = params.get("dataxIds").toString();
}

Set<Long> sqoopIds = Sets.newHashSet();
Set<Long> dataxIds = Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ public class ProjectImportServerImpl implements IProjectImportService {
private ProjectMapper projectMapper;

@Override
public Message importProject(HttpServletRequest req, Map<String, String> params) throws ExchangisJobServerException, ServerException {
public Message importProject(HttpServletRequest req, Map<String, Object> params) throws ExchangisJobServerException, ServerException {
String userName = SecurityFilter.getLoginUsername(req);
String resourceId = "99763d27-a35e-43f2-829b-100830bca538";
//String resourceId = params.get("resourceId");
String version = params.get("flowVersion");
Long projectId = Long.parseLong("1497870871035973669");
//Long projectId = Long.parseLong(params.get("projectId"));
String projectVersion = params.get("projectVersion");
String flowVersion = params.get("flowVersion");
String versionSuffix = projectVersion + "_" + flowVersion;
//String resourceId = "99763d27-a35e-43f2-829b-100830bca538";
String resourceId = (String) params.get("resourceId");
String version = (String) params.get("flowVersion");
//Long projectId = Long.parseLong("1497870871035973669");
Long projectId = Long.parseLong("111111");
String projectVersion = (String) params.get("projectVersion");
String flowVersion = (String) params.get("flowVersion");
String versionSuffix = projectVersion;
LOG.info("resourceId: {}, projectId: {}, versionSuffix: {}, version: {}, userName: {}", resourceId, projectId, versionSuffix, version, userName);
BmlClient bmlClient = BmlClientFactory.createBmlClient(userName);
BmlDownloadResponse bmlDownloadResponse = bmlClient.downloadShareResource(userName, resourceId, version);
Expand Down Expand Up @@ -132,6 +132,7 @@ private void importSqoop(Long projectId, String versionSuffix, ExportedProject e
throw new ExchangisJobServerException(31101, "Already exits duplicated job name(存在重复任务名称) jobName is:" + "[" + sqoop.getJobName() + "]");
} else {
//sqoop.setJobName("hahaha");
LOG.info("Sqoop job content is: {}, Modify user is: {}, jobType is: {}", sqoop.getContent(), sqoop.getExecuteUser(), sqoop.getJobType());
jobInfoService.createJob(sqoop);
idCatalog.getSqoop().put(oldId, sqoop.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class Constraints {
// AppConn name
public final static String EXCHANGIS_APPCONN_NAME = CommonVars.apply("wds.dss.appconn.exchangis.name", "Exchangis").getValue();

public final static String API_REQUEST_PREFIX = CommonVars.apply("wds.dss.appconn.exchangis.api.request-prefix", "/api/rest_j/v1/exchangis/dss").getValue();
public final static String API_REQUEST_PREFIX = CommonVars.apply("wds.dss.appconn.exchangis.api.request-prefix", "api/rest_j/v1/dss/exchangis/main").getValue();

public final static String DOMAIN_NAME = CommonVars.apply("wds.dss.appconn.exchangis.domain.name", "DSS").getValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ExchangisProjectCreationOperation extends AbstractExchangisProjectO
private StructureService structureService;

public ExchangisProjectCreationOperation(StructureService structureService) {
super(new String[]{"project"});
super(new String[]{"appProject"});
setStructureService(structureService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;

/**
* Ref export operation
*/
Expand All @@ -37,11 +39,14 @@ public ExchangisExportOperation(DevelopmentService developmentService){

@Override
public ResponseRef exportRef(ExportRequestRef exportRequestRef) throws ExternalOperationFailedException {
String url = developmentService.getAppInstance().getBaseUrl() + "api/rest_j/" + ServerConfiguration.BDP_SERVER_VERSION() + "/exchangis/dss/job" + "/export";
String url = developmentService.getAppInstance().getBaseUrl() + "api/rest_j/" + ServerConfiguration.BDP_SERVER_VERSION() + "/dss/exchangis/main/appJob" + "/export";
ExchangisEntityPostAction exchangisEntityPostAction = new ExchangisEntityPostAction();
exchangisEntityPostAction.setUser(exportRequestRef.getParameter("user").toString());
exchangisEntityPostAction.addRequestPayload("projectId", exportRequestRef.getParameter("projectId"));
exchangisEntityPostAction.addRequestPayload("partial", true);
HashMap<String, String> labels = new HashMap<>();
labels.put("route", "dev");
exchangisEntityPostAction.addRequestPayload("labels", labels);
String nodeType = exportRequestRef.getParameter("nodeType").toString();
String externalContent = null;

Expand Down
Loading