Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Important problem fix for execution #185

Merged
merged 3 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.webank.wedatasphere.exchangis.datasource.core.domain;

/**
* Meta column
*/
public class MetaColumn {

/**
* Column index
*/
private int index = -1;

/**
* Is primary key
*/
private boolean primaryKey;

/**
* Name
*/
private String name;

/**
* Type symbol
*/
private String type;

public MetaColumn(){

}

public MetaColumn(int index, String name, String type, boolean primaryKey){
this.index = index;
this.name = name;
this.type = type;
this.primaryKey = primaryKey;
}
public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}

public boolean isPrimaryKey() {
return primaryKey;
}

public void setPrimaryKey(boolean primaryKey) {
this.primaryKey = primaryKey;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.webank.wedatasphere.exchangis.datasource.core.service;

import com.webank.wedatasphere.exchangis.datasource.core.domain.MetaColumn;
import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceException;
import com.webank.wedatasphere.exchangis.datasource.core.service.rpc.ServiceRpcClient;

Expand Down Expand Up @@ -45,4 +46,15 @@ Map<String, String> getTableProps(ServiceRpcClient<?> rpcClient, String userName
* @throws ExchangisDataSourceException
*/
List<String> getPartitionKeys(String userName, Long dataSourceId, String database, String table) throws ExchangisDataSourceException;

/**
* Get columns
* @param userName userName
* @param dataSourceId data source id
* @param database database
* @param table table
* @return
* @throws ExchangisDataSourceException
*/
List<MetaColumn> getColumns(String userName, Long dataSourceId, String database, String table) throws ExchangisDataSourceException;
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package com.webank.wedatasphere.exchangis.datasource.linkis.service;

import com.webank.wedatasphere.exchangis.datasource.core.domain.MetaColumn;
import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceException;
import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisServiceRpcException;
import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService;
import com.webank.wedatasphere.exchangis.datasource.core.service.rpc.ServiceRpcClient;
import com.webank.wedatasphere.exchangis.datasource.linkis.ExchangisLinkisRemoteClient;
import com.webank.wedatasphere.exchangis.datasource.linkis.partition.MetadataGetPartitionsResult;
import com.webank.wedatasphere.exchangis.datasource.linkis.request.MetadataGetPartitionPropsAction;
import com.webank.wedatasphere.exchangis.datasource.linkis.response.MetadataGetPartitionPropsResult;
import com.webank.wedatasphere.exchangis.datasource.linkis.service.rpc.LinkisDataSourceServiceOperation;
import com.webank.wedatasphere.exchangis.datasource.linkis.service.rpc.LinkisDataSourceServiceRpcDispatcher;
import org.apache.linkis.datasource.client.impl.LinkisMetaDataRemoteClient;
import org.apache.linkis.datasource.client.request.MetadataGetColumnsAction;
import org.apache.linkis.datasource.client.request.MetadataGetPartitionsAction;
import org.apache.linkis.datasource.client.request.MetadataGetTablePropsAction;
//import org.apache.linkis.datasource.client.response.MetadataGetPartitionsResult;
import org.apache.linkis.datasource.client.response.MetadataGetColumnsResult;
import org.apache.linkis.datasource.client.response.MetadataGetPartitionsResult;
import org.apache.linkis.datasource.client.response.MetadataGetTablePropsResult;
import org.apache.linkis.metadatamanager.common.domain.MetaColumnInfo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceExceptionCode.*;

Expand Down Expand Up @@ -79,5 +84,18 @@ public List<String> getPartitionKeys(String userName, Long dataSourceId, String
return result.getPartitionInfo().getPartKeys();
}

@Override
public List<MetaColumn> getColumns(String userName, Long dataSourceId, String database, String table) throws ExchangisDataSourceException {
MetadataGetColumnsResult result = dispatch(getDefaultRemoteClient(), new LinkisDataSourceServiceOperation(() -> MetadataGetColumnsAction.builder()
.setSystem(LINKIS_RPC_CLIENT_SYSTEM.getValue())
.setDataSourceId(dataSourceId).setDatabase(database).setTable(table)
.setUser(userName).build()),CLIENT_METADATA_GET_PARTITION.getCode(), "getColumns");
List<MetaColumnInfo> columnInfoList = result.getAllColumns();
List<MetaColumn> columns = new ArrayList<>();
Optional.ofNullable(columnInfoList).ifPresent(infoList -> infoList.forEach(info ->
columns.add(new MetaColumn(info.getIndex(), info.getName(), info.getType(), info.isPrimaryKey()))));
return columns;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,4 @@ public Message partition(@PathVariable("elementType") String type,
return result;
}

public static void main(String[] args){
ElementUI.Type.valueOf("map".toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.webank.wedatasphere.exchangis.datasource.core.ui.*;
import com.webank.wedatasphere.exchangis.datasource.core.ui.viewer.DefaultDataSourceUIViewer;
import com.webank.wedatasphere.exchangis.datasource.core.ui.viewer.ExchangisDataSourceUIViewer;
import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;
import com.webank.wedatasphere.exchangis.datasource.core.vo.ExchangisJobDataSourcesContent;
import com.webank.wedatasphere.exchangis.datasource.core.vo.ExchangisJobInfoContent;
import com.webank.wedatasphere.exchangis.datasource.core.vo.ExchangisJobParamsContent;
Expand All @@ -23,7 +24,6 @@
import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
import org.apache.linkis.datasource.client.request.GetInfoByDataSourceIdAction;
import org.apache.linkis.datasourcemanager.common.exception.JsonErrorException;
import org.apache.linkis.datasourcemanager.common.util.json.Json;
import org.apache.linkis.httpclient.response.Result;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,12 +99,9 @@ private ExchangisDataSourceIdsUI buildDataSourceIdsUI(HttpServletRequest request
Result execute = dsClient.execute(action);
String responseBody = execute.getResponseBody();
GetDataSourceInfoResultDTO dsInfo = null;
try {
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
source.setDs(dsInfo.getData().getInfo().getDataSourceName());
} catch (JsonErrorException e) {
//TODO throws Exception
}
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
assert dsInfo != null;
source.setDs(dsInfo.getData().getInfo().getDataSourceName());
});
});
source.setDb(split[2]);
Expand All @@ -129,12 +126,9 @@ private ExchangisDataSourceIdsUI buildDataSourceIdsUI(HttpServletRequest request
Result execute = dsClient.execute(action);
String responseBody = execute.getResponseBody();
GetDataSourceInfoResultDTO dsInfo = null;
try {
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
sink.setDs(dsInfo.getData().getInfo().getDataSourceName());
} catch (JsonErrorException e) {
//TODO throw Exception
}
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
assert dsInfo != null;
sink.setDs(dsInfo.getData().getInfo().getDataSourceName());
});
});

Expand Down Expand Up @@ -293,7 +287,8 @@ private ElementUI<?> fillElementUIValue(ExchangisJobParamConfig config, Object v
case MAP:
Map<String, Object> mapElement = null;
try {
mapElement = Json.fromJson(String.valueOf(value), Map.class);
mapElement = Json.fromJson(Json.toJson(value, null),
Map.class, String.class, Object.class);
} catch (Exception e) {
LOG.info("Exception happened while parse json"+ "Config value: " + value + "message: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -359,4 +354,5 @@ private MapElementUI fillMapElementUIValue(ExchangisJobParamConfig config, Map<S
ui.setValidateMsg(config.getValidateMsg());
return ui;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public Message queryDataSources(HttpServletRequest request, DataSourceQueryVO vo

Message message = Message.ok();
message.data("list", dataSources);
message.data("total", result.getTotalPage() * pageSize);
message.data("total", result.getTotalPage());
return message;
//return Message.ok().data("list", dataSources);
}
Expand Down Expand Up @@ -1222,19 +1222,6 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F
right = sourceFields;
exchanged = !exchanged;
}
// for (DataSourceDbTableColumnDTO l : left) {
// String lname = l.getName();
// for (DataSourceDbTableColumnDTO r : right) {
// String rname = r.getName();
// if (lname.equals(rname)) {
// Map<String, Object> deduction = new HashMap<>();
// deduction.put("source", exchanged ? r : l);
// deduction.put("sink", exchanged ? l : r);
// deduction.put("deleteEnable", !containHive);
// deductions.add(deduction);
// }
// }
// }
for (int i = 0; i < left.size(); i ++){
DataSourceDbTableColumnDTO leftElement = left.get(i);
DataSourceDbTableColumnDTO rightElement = right.get(i % right.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.webank.wedatasphere.exchangis.job.domain;


import com.webank.wedatasphere.exchangis.job.utils.MemUtils;

import java.util.HashMap;
import java.util.Map;

Expand All @@ -20,6 +22,18 @@ public class ExchangisEngineJob extends GenericExchangisJob {
*/
private Map<String, Object> runtimeParams = new HashMap<>();

/**
* Memory used in engine job
*/
private Long memoryUsed;

private String memoryUnit = MemUtils.StoreUnit.MB.name();

/**
* Cpu used in engine job
*/
private Long cpuUsed;

public Map<String, Object> getJobContent() {
return jobContent;
}
Expand All @@ -35,4 +49,28 @@ public Map<String, Object> getRuntimeParams() {
public void setRuntimeParams(Map<String, Object> runtimeParams) {
this.runtimeParams = runtimeParams;
}

public Long getMemoryUsed() {
return memoryUsed;
}

public void setMemoryUsed(Long memoryUsed) {
this.memoryUsed = memoryUsed;
}

public Long getCpuUsed() {
return cpuUsed;
}

public void setCpuUsed(Long cpuUsed) {
this.cpuUsed = cpuUsed;
}

public String getMemoryUnit() {
return memoryUnit;
}

public void setMemoryUnit(String memoryUnit) {
this.memoryUnit = memoryUnit;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.webank.wedatasphere.exchangis.job.domain.params;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -55,12 +52,13 @@ public <T>JobParam<T> load(JobParamDefine<T> jobParamDefine){

@SuppressWarnings("unchecked")
public <T>JobParam<T> load(JobParamDefine<T> jobParamDefine, Object source){
return (JobParam<T>) jobParamStore.compute(jobParamDefine.getKey(), (key, value) -> {
if (Objects.isNull(value)) {
value = prepare(jobParamDefine, source);
}
return value;
});
// Avoid the deadlock problem in nested call, we should not use compute/computeIfAbsent method
JobParam<?> jobParam = this.jobParamStore.get(jobParamDefine.getKey());
if (Objects.isNull(jobParam)){
jobParam = prepare(jobParamDefine, source);
this.jobParamStore.put(jobParamDefine.getKey(),jobParam);
}
return (JobParam<T>) jobParam;
}

public JobParamSet combine(JobParamSet paramSet){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ public class ExchangisLauncherConfiguration {

public static final String TASK_NOT_EXIST = "Not exists EngineConn";

public static final String LAUNCHER_LINKIS_RUNTIME_PARAM_NAME = "runtimeParams";

public static final String LAUNCHER_LINKIS_STARTUP_PARAM_NAME = "startUpParams";

public static final String LAUNCHER_LINKIS_REQUEST_MEMORY = "wds.linkis.engineconn.java.driver.memory";

public static final CommonVars<String> LAUNCHER_LINKIS_CREATOR = CommonVars.apply("wds.exchangis.job.task.launcher.linkis.creator", "exchangis");

public static final CommonVars<String> LAUNCHER_LINKIS_ENGINE_CONN_MODE = CommonVars.apply("wds.exchangis.job.task.launcher.linkis.engineConn.mode", "once");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
import com.webank.wedatasphere.exchangis.job.domain.ExchangisEngineJob;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;
import com.webank.wedatasphere.exchangis.job.launcher.domain.LaunchableExchangisTask;
import com.webank.wedatasphere.exchangis.job.utils.MemUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static com.webank.wedatasphere.exchangis.job.launcher.ExchangisLauncherConfiguration.*;

/**
* Launcher job builder
Expand All @@ -13,6 +20,7 @@
public class LinkisExchangisLauncherJobBuilder extends AbstractExchangisJobBuilder<ExchangisEngineJob, LaunchableExchangisTask> {

private static final String LAUNCHER_NAME = "Linkis";

@Override
public LaunchableExchangisTask buildJob(ExchangisEngineJob inputJob, LaunchableExchangisTask expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException {
LaunchableExchangisTask launchableTask = new LaunchableExchangisTask();
Expand All @@ -21,7 +29,14 @@ public LaunchableExchangisTask buildJob(ExchangisEngineJob inputJob, LaunchableE
launchableTask.setExecuteUser(inputJob.getCreateUser());
// launcherJob.setExecuteNode(exchangisJob.getExecuteNode());
launchableTask.setLinkisContentMap(inputJob.getJobContent());
launchableTask.setLinkisParamsMap(inputJob.getRuntimeParams());
Map<String, Object> linkisParams = new HashMap<>();
Map<String, Object> startUpParams = new HashMap<>();
linkisParams.put(LAUNCHER_LINKIS_RUNTIME_PARAM_NAME, inputJob.getRuntimeParams());
linkisParams.put(LAUNCHER_LINKIS_STARTUP_PARAM_NAME, startUpParams);
long memoryUsed = Objects.nonNull(inputJob.getMemoryUsed())? MemUtils.convertToGB(inputJob.getMemoryUsed(),
inputJob.getMemoryUnit()) : 0;
startUpParams.put(LAUNCHER_LINKIS_REQUEST_MEMORY, String.valueOf(memoryUsed <= 0 ? 1 : memoryUsed));
launchableTask.setLinkisParamsMap(linkisParams);
launchableTask.setEngineType(inputJob.getEngineType());
launchableTask.setLabels(inputJob.getJobLabel());
launchableTask.setName(inputJob.getName());
Expand Down
Loading