-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1105 from WeDataSphere/master
Delete unuseful code and update sparketl appconn.
- Loading branch information
Showing
13 changed files
with
449 additions
and
141 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
29 changes: 29 additions & 0 deletions
29
.../com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryJumpUrlOperation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package com.webank.wedatasphere.dss.appconn.sparketl.query; | ||
|
||
import com.webank.wedatasphere.dss.standard.app.development.operation.AbstractDevelopmentOperation; | ||
import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation; | ||
import com.webank.wedatasphere.dss.standard.app.development.ref.QueryJumpUrlResponseRef; | ||
import com.webank.wedatasphere.dss.standard.app.development.ref.impl.OnlyDevelopmentRequestRef; | ||
import com.webank.wedatasphere.dss.standard.app.development.utils.QueryJumpUrlConstant; | ||
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException; | ||
import org.apache.commons.lang3.StringUtils; | ||
|
||
public class SparkEtlRefQueryJumpUrlOperation extends AbstractDevelopmentOperation<OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl, QueryJumpUrlResponseRef> | ||
implements RefQueryJumpUrlOperation<OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl, QueryJumpUrlResponseRef> { | ||
|
||
@Override | ||
public QueryJumpUrlResponseRef query(OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl requestRef) throws ExternalOperationFailedException { | ||
String jumpUrl = mergeBaseUrl("#/sparketl?resourceId=%s&version=%s&%s=%s&%s=%s"); | ||
String resourceId = (String) requestRef.getRefJobContent().get("resourceId"); | ||
String version = (String) requestRef.getRefJobContent().get("version"); | ||
if(StringUtils.isBlank(resourceId) || StringUtils.isBlank(version)) { | ||
logger.info("resourceId or version is empty, maybe user {} want to create a new node.", requestRef.getUserName()); | ||
resourceId = ""; | ||
version = ""; | ||
} | ||
jumpUrl = String.format(jumpUrl, resourceId, version, QueryJumpUrlConstant.NODE_ID.getKey(), | ||
QueryJumpUrlConstant.NODE_ID.getValue(), QueryJumpUrlConstant.PROJECT_NAME.getKey(), QueryJumpUrlConstant.PROJECT_NAME.getValue()); | ||
return QueryJumpUrlResponseRef.newBuilder().setJumpUrl(jumpUrl).build(); | ||
} | ||
|
||
} |
12 changes: 12 additions & 0 deletions
12
...main/java/com/webank/wedatasphere/dss/appconn/sparketl/query/SparkEtlRefQueryService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package com.webank.wedatasphere.dss.appconn.sparketl.query; | ||
|
||
import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation; | ||
import com.webank.wedatasphere.dss.standard.app.development.service.AbstractRefQueryService; | ||
|
||
public class SparkEtlRefQueryService extends AbstractRefQueryService { | ||
|
||
@Override | ||
protected RefQueryJumpUrlOperation createRefQueryOperation() { | ||
return new SparkEtlRefQueryJumpUrlOperation(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
68 changes: 68 additions & 0 deletions
68
...bank/wedatasphere/dss/framework/appconn/restful/AppConn2LinkisRefExecutionRestfulApi.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package com.webank.wedatasphere.dss.framework.appconn.restful; | ||
|
||
import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn; | ||
import com.webank.wedatasphere.dss.appconn.manager.AppConnManager; | ||
import com.webank.wedatasphere.dss.common.label.DSSLabel; | ||
import com.webank.wedatasphere.dss.common.label.EnvDSSLabel; | ||
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils; | ||
import com.webank.wedatasphere.dss.standard.app.development.operation.AppConn2LinkisRefExecutionOperation; | ||
import com.webank.wedatasphere.dss.standard.app.development.ref.AppConn2LinkisResponseRef; | ||
import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentRequestRef; | ||
import com.webank.wedatasphere.dss.standard.app.development.service.RefExecutionService; | ||
import com.webank.wedatasphere.dss.standard.app.development.utils.DevelopmentOperationUtils; | ||
import com.webank.wedatasphere.dss.standard.app.sso.Workspace; | ||
import com.webank.wedatasphere.dss.standard.common.desc.AppInstance; | ||
import org.apache.linkis.server.Message; | ||
import org.apache.linkis.server.security.SecurityFilter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.web.bind.annotation.RequestBody; | ||
import org.springframework.web.bind.annotation.RequestMapping; | ||
import org.springframework.web.bind.annotation.RequestMethod; | ||
import org.springframework.web.bind.annotation.RestController; | ||
|
||
import javax.servlet.http.HttpServletRequest; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
@RequestMapping(path = "/dss/framework/appconn", produces = {"application/json"}) | ||
@RestController | ||
public class AppConn2LinkisRefExecutionRestfulApi { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(AppConn2LinkisRefExecutionRestfulApi.class); | ||
|
||
|
||
@Autowired | ||
private AppConnManagerRestfulApi appConnManagerRestfulApi; | ||
|
||
@RequestMapping(path = "execute", method = RequestMethod.POST) | ||
public Message execute(HttpServletRequest request, @RequestBody Map<String, Object> json) { | ||
String userName = SecurityFilter.getLoginUsername(request); | ||
LOGGER.info("user {} try to transform jobContent to Linkis job with requestBody {}.", userName, json); | ||
String workspaceStr = (String) json.get("workspaceStr"); | ||
String appConnName = (String) json.get("appConnName"); | ||
String labelStr = (String) json.get("labels"); | ||
Map<String, Object> refJobContent = (Map<String, Object>) json.get("jobContent"); | ||
Workspace workspace = DSSCommonUtils.COMMON_GSON.fromJson(workspaceStr, Workspace.class); | ||
OnlyDevelopmentAppConn appConn = (OnlyDevelopmentAppConn) AppConnManager.getAppConnManager().getAppConn(appConnName); | ||
AppInstance appInstance; | ||
List<DSSLabel> labels = Arrays.asList(new EnvDSSLabel(labelStr)); | ||
if(appConn.getAppDesc().getAppInstances().size() == 1) { | ||
appInstance = appConn.getAppDesc().getAppInstances().get(0); | ||
} else { | ||
appInstance = appConn.getAppDesc().getAppInstancesByLabels(labels).get(0); | ||
} | ||
AppConn2LinkisResponseRef responseRef = DevelopmentOperationUtils.tryDevelopmentOperation(() -> appConn.getOrCreateDevelopmentStandard().getRefExecutionService(appInstance), | ||
developmentService -> ((RefExecutionService) developmentService).getRefExecutionOperation(), | ||
null, refJobContentRequestRef -> refJobContentRequestRef.setRefJobContent(refJobContent), | ||
null, null, (developmentOperation, developmentRequestRef) -> { | ||
developmentRequestRef.setWorkspace(workspace).setUserName(userName).setDSSLabels(labels); | ||
return ((AppConn2LinkisRefExecutionOperation) developmentOperation).execute((RefJobContentRequestRef) developmentRequestRef); | ||
}, null, "fetch linkis jobContent from appConn " + appConnName + " failed."); | ||
return Message.ok().data("executionCode", responseRef.getCode()).data("params", responseRef.getParams()) | ||
.data("engineType", responseRef.getEngineType()).data("runType", responseRef.getRunType()); | ||
} | ||
|
||
} |
35 changes: 35 additions & 0 deletions
35
...va/com/webank/wedatasphere/dss/linkis/node/execution/entity/AppConn2LinkisPostAction.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package com.webank.wedatasphere.dss.linkis.node.execution.entity; | ||
|
||
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; | ||
import org.apache.linkis.httpclient.request.POSTAction; | ||
import org.apache.linkis.httpclient.request.UserAction; | ||
|
||
public class AppConn2LinkisPostAction extends POSTAction implements UserAction { | ||
|
||
private String url; | ||
private String user; | ||
|
||
@Override | ||
public String getRequestPayload() { | ||
return LinkisJobExecutionUtils.gson.toJson(getRequestPayloads()); | ||
} | ||
|
||
public void setUrl(String url) { | ||
this.url = url; | ||
} | ||
|
||
@Override | ||
public String getURL() { | ||
return url; | ||
} | ||
|
||
@Override | ||
public void setUser(String user) { | ||
this.user = user; | ||
} | ||
|
||
@Override | ||
public String getUser() { | ||
return user; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 92 additions & 0 deletions
92
...ava/com/webank/wedatasphere/dss/linkis/node/execution/parser/AppConn2LinkisJobParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package com.webank.wedatasphere.dss.linkis.node.execution.parser; | ||
|
||
import com.google.gson.reflect.TypeToken; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.conf.LinkisJobExecutionConfiguration; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.entity.AppConn2LinkisPostAction; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisUjesClientUtils; | ||
import org.apache.linkis.httpclient.dws.DWSHttpClient; | ||
import org.apache.linkis.httpclient.dws.config.DWSClientConfig; | ||
import org.apache.linkis.httpclient.response.impl.DefaultHttpResult; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
public class AppConn2LinkisJobParser extends BML2LinkisJobParser { | ||
|
||
@Override | ||
public void parseJob(Job job) throws Exception { | ||
if(!(job instanceof AppConnLinkisJob)) { | ||
return; | ||
} | ||
AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job; | ||
String runType = appConnLinkisJob.getRunType(); | ||
// 只处理包含 appconn2linkis 的 AppConnLinkisJob,例如:linkis.appconn.<appconnName>.appconn2linkis | ||
if(!runType.toLowerCase().contains("appconn2linkis")) { | ||
return; | ||
} | ||
job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from AppConn execution.", runType)); | ||
getAndSetCode(null, appConnLinkisJob); | ||
} | ||
|
||
@Override | ||
protected void dealExecutionParams(LinkisJob linkisAppConnJob, Map<String, Object> executionParams) { | ||
String engineType = (String) executionParams.get("engineType"); | ||
String runType = (String) executionParams.get("runType"); | ||
setEngineType(linkisAppConnJob, engineType, runType); | ||
} | ||
|
||
@Override | ||
protected Map<String, Object> getExecutionParams(BMLResource bmlResource, LinkisJob job) { | ||
String user = job.getUser(); | ||
String linkisUrl = LinkisURLService.Factory.getLinkisURLService().getDefaultLinkisURL(job); | ||
String token = LinkisJobExecutionConfiguration.LINKIS_AUTHOR_USER_TOKEN.getValue(job.getJobProps()); | ||
DWSHttpClient client = null; | ||
DWSClientConfig clientConfig = LinkisUjesClientUtils.getClientConfig1_X(linkisUrl, user, token, job.getJobProps()); | ||
AppConn2LinkisPostAction appConn2LinkisPostAction = new AppConn2LinkisPostAction(); | ||
appConn2LinkisPostAction.setUrl("/api/rest_j/v1/dss/framework/appconn/execute"); | ||
appConn2LinkisPostAction.addRequestPayload("workspaceStr", job.getRuntimeParams().get("workspace")); | ||
appConn2LinkisPostAction.addRequestPayload("appConnName", job.getRunType().split("\\.")[0]); | ||
appConn2LinkisPostAction.addRequestPayload("labels", getLabels(job.getJobProps().get("labels"))); | ||
Map<String, Object> jobContent = new HashMap<>(); | ||
if(job.getCode() != null && !job.getCode().isEmpty()) { | ||
jobContent.putAll(LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken<Map<String, Object>>() {}.getType())); | ||
} | ||
jobContent.putAll(job.getParams()); | ||
appConn2LinkisPostAction.addRequestPayload("jobContent", jobContent); | ||
appConn2LinkisPostAction.setUser(user); | ||
job.getLogObj().info(String.format("try to ask AppConn %s to execute AppConn2Linkis with requestBody %s.", job.getRunType(), appConn2LinkisPostAction.getRequestPayload())); | ||
appConn2LinkisPostAction.addHeader("Referer", ""); | ||
try { | ||
client = new DWSHttpClient(clientConfig, "Workspace-Fetch-Client-"); | ||
DefaultHttpResult result = (DefaultHttpResult) client.execute(appConn2LinkisPostAction); | ||
if (result.getStatusCode() == 200 || result.getStatusCode() == 0) { | ||
Map<String, Object> responseBody = LinkisJobExecutionUtils.gson.fromJson(result.getResponseBody(), Map.class); | ||
return (Map<String, Object>) responseBody.get("data"); | ||
} else { | ||
throw new LinkisJobExecutionErrorException(50063, "Failed to get workspace str, responseBody is: " + | ||
result.getResponseBody()); | ||
} | ||
} finally { | ||
if(client != null) { | ||
client.close(); | ||
} | ||
} | ||
} | ||
|
||
private String getLabels(String labels) { | ||
if (labels.contains("route") || labels.contains("DSSEnv") ) { | ||
Map<String, Object> labelMap = LinkisJobExecutionUtils.gson.fromJson(labels, Map.class); | ||
return (String) labelMap.getOrDefault("route", labelMap.getOrDefault("DSSEnv", labels)); | ||
} else { | ||
return labels; | ||
} | ||
} | ||
|
||
} |
55 changes: 55 additions & 0 deletions
55
...in/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/BML2LinkisJobParser.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package com.webank.wedatasphere.dss.linkis.node.execution.parser; | ||
|
||
import com.google.gson.reflect.TypeToken; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; | ||
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; | ||
|
||
import java.util.Map; | ||
|
||
public class BML2LinkisJobParser extends CodeParser { | ||
|
||
@Override | ||
public void parseJob(Job job) throws Exception { | ||
if(!(job instanceof AppConnLinkisJob)) { | ||
return; | ||
} | ||
AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job; | ||
String runType = appConnLinkisJob.getRunType(); | ||
// 只处理包含 bml2linkis 的 AppConnLinkisJob,例如:linkis.appconn.<appconnName>.bml2linkis | ||
if(!runType.toLowerCase().contains("bml2linkis")) { | ||
return; | ||
} | ||
job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from code %s.", runType, | ||
job.getCode())); | ||
Map<String, Object> script = LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken<Map<String, Object>>() {}.getType()); | ||
if(!script.containsKey("resourceId") || !script.containsKey("version") || !script.containsKey("fileName")) { | ||
job.getLogObj().error("the code error, resourceId, version or fileName is not exists."); | ||
throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent."); | ||
} | ||
BMLResource bmlResource = new BMLResource(); | ||
bmlResource.setResourceId((String) script.get("resourceId")); | ||
bmlResource.setVersion((String) script.get("version")); | ||
// fileName 的格式为 ${resourceId}.${engineType}.${runType} | ||
bmlResource.setFileName((String) script.get("fileName")); | ||
getAndSetCode(bmlResource, appConnLinkisJob); | ||
String[] fileNameArray = bmlResource.getFileName().split("\\."); | ||
if(fileNameArray.length < 3) { | ||
job.getLogObj().error(String.format("cannot recognize fileName %s, the fileName format must be ${resourceId}.${engineType}.${runType}", bmlResource.getFileName())); | ||
throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent."); | ||
} | ||
String realEngineType = fileNameArray[fileNameArray.length - 2]; | ||
String realRunType = fileNameArray[fileNameArray.length - 1]; | ||
setEngineType(job, realEngineType, realRunType); | ||
} | ||
|
||
protected void setEngineType(Job job, String realEngineType, String realRunType) { | ||
job.getLogObj().warn(String.format("switch job from engineType %s with runType %s to engineType %s with runType %s", | ||
job.getEngineType(), job.getRunType(), realEngineType, realRunType)); | ||
job.setEngineType(realEngineType); | ||
job.setRunType(realRunType); | ||
} | ||
|
||
} |
Oops, something went wrong.