Skip to content

Commit

Permalink
Merge pull request #185 from Davidhua1996/dev-1.0.0
Browse files Browse the repository at this point in the history
LGTM
  • Loading branch information
FinalTarget authored Feb 26, 2022
2 parents 127a172 + e86bccf commit e65e0a1
Show file tree
Hide file tree
Showing 20 changed files with 378 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.webank.wedatasphere.exchangis.datasource.core.domain;

/**
* Meta column
*/
public class MetaColumn {

/**
* Column index
*/
private int index = -1;

/**
* Is primary key
*/
private boolean primaryKey;

/**
* Name
*/
private String name;

/**
* Type symbol
*/
private String type;

public MetaColumn(){

}

public MetaColumn(int index, String name, String type, boolean primaryKey){
this.index = index;
this.name = name;
this.type = type;
this.primaryKey = primaryKey;
}
public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}

public boolean isPrimaryKey() {
return primaryKey;
}

public void setPrimaryKey(boolean primaryKey) {
this.primaryKey = primaryKey;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.webank.wedatasphere.exchangis.datasource.core.service;

import com.webank.wedatasphere.exchangis.datasource.core.domain.MetaColumn;
import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceException;
import com.webank.wedatasphere.exchangis.datasource.core.service.rpc.ServiceRpcClient;

Expand Down Expand Up @@ -45,4 +46,15 @@ Map<String, String> getTableProps(ServiceRpcClient<?> rpcClient, String userName
* @throws ExchangisDataSourceException
*/
List<String> getPartitionKeys(String userName, Long dataSourceId, String database, String table) throws ExchangisDataSourceException;

/**
* Get columns
* @param userName userName
* @param dataSourceId data source id
* @param database database
* @param table table
* @return
* @throws ExchangisDataSourceException
*/
List<MetaColumn> getColumns(String userName, Long dataSourceId, String database, String table) throws ExchangisDataSourceException;
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package com.webank.wedatasphere.exchangis.datasource.linkis.service;

import com.webank.wedatasphere.exchangis.datasource.core.domain.MetaColumn;
import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceException;
import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisServiceRpcException;
import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService;
import com.webank.wedatasphere.exchangis.datasource.core.service.rpc.ServiceRpcClient;
import com.webank.wedatasphere.exchangis.datasource.linkis.ExchangisLinkisRemoteClient;
import com.webank.wedatasphere.exchangis.datasource.linkis.partition.MetadataGetPartitionsResult;
import com.webank.wedatasphere.exchangis.datasource.linkis.request.MetadataGetPartitionPropsAction;
import com.webank.wedatasphere.exchangis.datasource.linkis.response.MetadataGetPartitionPropsResult;
import com.webank.wedatasphere.exchangis.datasource.linkis.service.rpc.LinkisDataSourceServiceOperation;
import com.webank.wedatasphere.exchangis.datasource.linkis.service.rpc.LinkisDataSourceServiceRpcDispatcher;
import org.apache.linkis.datasource.client.impl.LinkisMetaDataRemoteClient;
import org.apache.linkis.datasource.client.request.MetadataGetColumnsAction;
import org.apache.linkis.datasource.client.request.MetadataGetPartitionsAction;
import org.apache.linkis.datasource.client.request.MetadataGetTablePropsAction;
//import org.apache.linkis.datasource.client.response.MetadataGetPartitionsResult;
import org.apache.linkis.datasource.client.response.MetadataGetColumnsResult;
import org.apache.linkis.datasource.client.response.MetadataGetPartitionsResult;
import org.apache.linkis.datasource.client.response.MetadataGetTablePropsResult;
import org.apache.linkis.metadatamanager.common.domain.MetaColumnInfo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceExceptionCode.*;

Expand Down Expand Up @@ -79,5 +84,18 @@ public List<String> getPartitionKeys(String userName, Long dataSourceId, String
return result.getPartitionInfo().getPartKeys();
}

@Override
public List<MetaColumn> getColumns(String userName, Long dataSourceId, String database, String table) throws ExchangisDataSourceException {
MetadataGetColumnsResult result = dispatch(getDefaultRemoteClient(), new LinkisDataSourceServiceOperation(() -> MetadataGetColumnsAction.builder()
.setSystem(LINKIS_RPC_CLIENT_SYSTEM.getValue())
.setDataSourceId(dataSourceId).setDatabase(database).setTable(table)
.setUser(userName).build()),CLIENT_METADATA_GET_PARTITION.getCode(), "getColumns");
List<MetaColumnInfo> columnInfoList = result.getAllColumns();
List<MetaColumn> columns = new ArrayList<>();
Optional.ofNullable(columnInfoList).ifPresent(infoList -> infoList.forEach(info ->
columns.add(new MetaColumn(info.getIndex(), info.getName(), info.getType(), info.isPrimaryKey()))));
return columns;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,4 @@ public Message partition(@PathVariable("elementType") String type,
return result;
}

public static void main(String[] args){
ElementUI.Type.valueOf("map".toUpperCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.webank.wedatasphere.exchangis.datasource.core.ui.*;
import com.webank.wedatasphere.exchangis.datasource.core.ui.viewer.DefaultDataSourceUIViewer;
import com.webank.wedatasphere.exchangis.datasource.core.ui.viewer.ExchangisDataSourceUIViewer;
import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;
import com.webank.wedatasphere.exchangis.datasource.core.vo.ExchangisJobDataSourcesContent;
import com.webank.wedatasphere.exchangis.datasource.core.vo.ExchangisJobInfoContent;
import com.webank.wedatasphere.exchangis.datasource.core.vo.ExchangisJobParamsContent;
Expand All @@ -23,7 +24,6 @@
import org.apache.linkis.datasource.client.impl.LinkisDataSourceRemoteClient;
import org.apache.linkis.datasource.client.request.GetInfoByDataSourceIdAction;
import org.apache.linkis.datasourcemanager.common.exception.JsonErrorException;
import org.apache.linkis.datasourcemanager.common.util.json.Json;
import org.apache.linkis.httpclient.response.Result;
import org.apache.linkis.server.security.SecurityFilter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -99,12 +99,9 @@ private ExchangisDataSourceIdsUI buildDataSourceIdsUI(HttpServletRequest request
Result execute = dsClient.execute(action);
String responseBody = execute.getResponseBody();
GetDataSourceInfoResultDTO dsInfo = null;
try {
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
source.setDs(dsInfo.getData().getInfo().getDataSourceName());
} catch (JsonErrorException e) {
//TODO throws Exception
}
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
assert dsInfo != null;
source.setDs(dsInfo.getData().getInfo().getDataSourceName());
});
});
source.setDb(split[2]);
Expand All @@ -129,12 +126,9 @@ private ExchangisDataSourceIdsUI buildDataSourceIdsUI(HttpServletRequest request
Result execute = dsClient.execute(action);
String responseBody = execute.getResponseBody();
GetDataSourceInfoResultDTO dsInfo = null;
try {
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
sink.setDs(dsInfo.getData().getInfo().getDataSourceName());
} catch (JsonErrorException e) {
//TODO throw Exception
}
dsInfo = Json.fromJson(responseBody, GetDataSourceInfoResultDTO.class);
assert dsInfo != null;
sink.setDs(dsInfo.getData().getInfo().getDataSourceName());
});
});

Expand Down Expand Up @@ -293,7 +287,8 @@ private ElementUI<?> fillElementUIValue(ExchangisJobParamConfig config, Object v
case MAP:
Map<String, Object> mapElement = null;
try {
mapElement = Json.fromJson(String.valueOf(value), Map.class);
mapElement = Json.fromJson(Json.toJson(value, null),
Map.class, String.class, Object.class);
} catch (Exception e) {
LOG.info("Exception happened while parse json"+ "Config value: " + value + "message: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -359,4 +354,5 @@ private MapElementUI fillMapElementUIValue(ExchangisJobParamConfig config, Map<S
ui.setValidateMsg(config.getValidateMsg());
return ui;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public Message queryDataSources(HttpServletRequest request, DataSourceQueryVO vo

Message message = Message.ok();
message.data("list", dataSources);
message.data("total", result.getTotalPage() * pageSize);
message.data("total", result.getTotalPage());
return message;
//return Message.ok().data("list", dataSources);
}
Expand Down Expand Up @@ -1222,19 +1222,6 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F
right = sourceFields;
exchanged = !exchanged;
}
// for (DataSourceDbTableColumnDTO l : left) {
// String lname = l.getName();
// for (DataSourceDbTableColumnDTO r : right) {
// String rname = r.getName();
// if (lname.equals(rname)) {
// Map<String, Object> deduction = new HashMap<>();
// deduction.put("source", exchanged ? r : l);
// deduction.put("sink", exchanged ? l : r);
// deduction.put("deleteEnable", !containHive);
// deductions.add(deduction);
// }
// }
// }
for (int i = 0; i < left.size(); i ++){
DataSourceDbTableColumnDTO leftElement = left.get(i);
DataSourceDbTableColumnDTO rightElement = right.get(i % right.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.webank.wedatasphere.exchangis.job.domain;


import com.webank.wedatasphere.exchangis.job.utils.MemUtils;

import java.util.HashMap;
import java.util.Map;

Expand All @@ -20,6 +22,18 @@ public class ExchangisEngineJob extends GenericExchangisJob {
*/
private Map<String, Object> runtimeParams = new HashMap<>();

/**
* Memory used in engine job
*/
private Long memoryUsed;

private String memoryUnit = MemUtils.StoreUnit.MB.name();

/**
* Cpu used in engine job
*/
private Long cpuUsed;

public Map<String, Object> getJobContent() {
return jobContent;
}
Expand All @@ -35,4 +49,28 @@ public Map<String, Object> getRuntimeParams() {
public void setRuntimeParams(Map<String, Object> runtimeParams) {
this.runtimeParams = runtimeParams;
}

public Long getMemoryUsed() {
return memoryUsed;
}

public void setMemoryUsed(Long memoryUsed) {
this.memoryUsed = memoryUsed;
}

public Long getCpuUsed() {
return cpuUsed;
}

public void setCpuUsed(Long cpuUsed) {
this.cpuUsed = cpuUsed;
}

public String getMemoryUnit() {
return memoryUnit;
}

public void setMemoryUnit(String memoryUnit) {
this.memoryUnit = memoryUnit;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.webank.wedatasphere.exchangis.job.domain.params;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -55,12 +52,13 @@ public <T>JobParam<T> load(JobParamDefine<T> jobParamDefine){

@SuppressWarnings("unchecked")
public <T>JobParam<T> load(JobParamDefine<T> jobParamDefine, Object source){
return (JobParam<T>) jobParamStore.compute(jobParamDefine.getKey(), (key, value) -> {
if (Objects.isNull(value)) {
value = prepare(jobParamDefine, source);
}
return value;
});
// Avoid the deadlock problem in nested call, we should not use compute/computeIfAbsent method
JobParam<?> jobParam = this.jobParamStore.get(jobParamDefine.getKey());
if (Objects.isNull(jobParam)){
jobParam = prepare(jobParamDefine, source);
this.jobParamStore.put(jobParamDefine.getKey(),jobParam);
}
return (JobParam<T>) jobParam;
}

public JobParamSet combine(JobParamSet paramSet){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ public class ExchangisLauncherConfiguration {

public static final String TASK_NOT_EXIST = "Not exists EngineConn";

public static final String LAUNCHER_LINKIS_RUNTIME_PARAM_NAME = "runtimeParams";

public static final String LAUNCHER_LINKIS_STARTUP_PARAM_NAME = "startUpParams";

public static final String LAUNCHER_LINKIS_REQUEST_MEMORY = "wds.linkis.engineconn.java.driver.memory";

public static final CommonVars<String> LAUNCHER_LINKIS_CREATOR = CommonVars.apply("wds.exchangis.job.task.launcher.linkis.creator", "exchangis");

public static final CommonVars<String> LAUNCHER_LINKIS_ENGINE_CONN_MODE = CommonVars.apply("wds.exchangis.job.task.launcher.linkis.engineConn.mode", "once");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
import com.webank.wedatasphere.exchangis.job.domain.ExchangisEngineJob;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;
import com.webank.wedatasphere.exchangis.job.launcher.domain.LaunchableExchangisTask;
import com.webank.wedatasphere.exchangis.job.utils.MemUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static com.webank.wedatasphere.exchangis.job.launcher.ExchangisLauncherConfiguration.*;

/**
* Launcher job builder
Expand All @@ -13,6 +20,7 @@
public class LinkisExchangisLauncherJobBuilder extends AbstractExchangisJobBuilder<ExchangisEngineJob, LaunchableExchangisTask> {

private static final String LAUNCHER_NAME = "Linkis";

@Override
public LaunchableExchangisTask buildJob(ExchangisEngineJob inputJob, LaunchableExchangisTask expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException {
LaunchableExchangisTask launchableTask = new LaunchableExchangisTask();
Expand All @@ -21,7 +29,14 @@ public LaunchableExchangisTask buildJob(ExchangisEngineJob inputJob, LaunchableE
launchableTask.setExecuteUser(inputJob.getCreateUser());
// launcherJob.setExecuteNode(exchangisJob.getExecuteNode());
launchableTask.setLinkisContentMap(inputJob.getJobContent());
launchableTask.setLinkisParamsMap(inputJob.getRuntimeParams());
Map<String, Object> linkisParams = new HashMap<>();
Map<String, Object> startUpParams = new HashMap<>();
linkisParams.put(LAUNCHER_LINKIS_RUNTIME_PARAM_NAME, inputJob.getRuntimeParams());
linkisParams.put(LAUNCHER_LINKIS_STARTUP_PARAM_NAME, startUpParams);
long memoryUsed = Objects.nonNull(inputJob.getMemoryUsed())? MemUtils.convertToGB(inputJob.getMemoryUsed(),
inputJob.getMemoryUnit()) : 0;
startUpParams.put(LAUNCHER_LINKIS_REQUEST_MEMORY, String.valueOf(memoryUsed <= 0 ? 1 : memoryUsed));
launchableTask.setLinkisParamsMap(linkisParams);
launchableTask.setEngineType(inputJob.getEngineType());
launchableTask.setLabels(inputJob.getJobLabel());
launchableTask.setName(inputJob.getName());
Expand Down
Loading

0 comments on commit e65e0a1

Please sign in to comment.