Skip to content

Commit

Permalink
Merge pull request #188 from Davidhua1996/dev-1.0.0
Browse files Browse the repository at this point in the history
LGTM
  • Loading branch information
FinalTarget authored Feb 27, 2022
2 parents 37f390b + f7994e3 commit de367ad
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,14 @@ private void checkDataXDSSupportDegree(String sourceDsType, String sinkDsType) t

}

/**
* TODO: the mapping function is defined by the rule of Hive directly, we should abstract to support all the types
* @param request
* @param vo
* @return
* @throws Exception
*/
@SuppressWarnings("unchecked")
public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, FieldMappingVO vo) throws Exception {

this.checkDSSupportDegree(vo.getEngine(), vo.getSourceTypeId(), vo.getSinkTypeId());
Expand All @@ -1209,26 +1217,23 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F
field.setFieldEditable(!"HIVE".equals(vo.getSinkTypeId()));
}
message.data("sinkFields", sinkFields);


// field mapping deduction
List<Map<String, Object>> deductions = new ArrayList<>();
// boolean[] matchedIndex = new boolean[sinkFields.size()];
List<DataSourceDbTableColumnDTO> left = sourceFields;
List<DataSourceDbTableColumnDTO> right = sinkFields;
boolean exchanged = false;
if (containHive && "HIVE".equals(vo.getSinkTypeId())) {
left = sinkFields;
right = sourceFields;
exchanged = !exchanged;
exchanged = true;
}
for (int i = 0; i < left.size(); i ++){
DataSourceDbTableColumnDTO leftElement = left.get(i);
DataSourceDbTableColumnDTO rightElement = right.get(i % right.size());
Map<String, Object> deduction = new HashMap<>();
deduction.put("source", exchanged ? rightElement : leftElement);
deduction.put("sink", exchanged ? leftElement : rightElement);
deduction.put("deleteEnable", !containHive);
deduction.put("deleteEnable", true);
deductions.add(deduction);
}
message.data("deductions", deductions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.webank.wedatasphere.exchangis.job.builder;


import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService;
import com.webank.wedatasphere.exchangis.job.builder.api.ExchangisJobBuilder;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.listener.JobLogListener;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -16,35 +15,25 @@ public class ExchangisJobBuilderContext {
/**
* Origin job
*/
private ExchangisJobInfo originalJob;
protected ExchangisJobInfo originalJob;

/**
* Listen the log event
* Current builder
*/
private JobLogListener jobLogListener;
protected ExchangisJobBuilder<?, ?> currentBuilder;

private Map<String, Object> env = new HashMap<>();

private Map<String, Map<String, Object>> datasourceParams = new HashMap<>();

private MetadataInfoService metadataInfoService;

public ExchangisJobBuilderContext() {

}

public ExchangisJobBuilderContext(ExchangisJobInfo originalJob, JobLogListener jobLogListener){
public ExchangisJobBuilderContext(ExchangisJobInfo originalJob){
this.originalJob = originalJob;
this.jobLogListener = jobLogListener;
}

public ExchangisJobBuilderContext(ExchangisJobInfo originalJob) {
this(originalJob, null);
}

public JobLogListener getJobLogListener() {
return jobLogListener;
}

public ExchangisJobInfo getOriginalJob() {
return originalJob;
Expand Down Expand Up @@ -73,11 +62,11 @@ public boolean containsEnv(String name) {
return this.env.containsKey(name);
}

public MetadataInfoService getMetadataInfoService() {
return metadataInfoService;
public ExchangisJobBuilder<?, ?> getCurrentBuilder() {
return currentBuilder;
}

public void setMetadataInfoService(MetadataInfoService metadataInfoService) {
this.metadataInfoService = metadataInfoService;
public void setCurrentBuilder(ExchangisJobBuilder<?, ?> currentBuilder) {
this.currentBuilder = currentBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ public boolean canBuild(T inputJob) {

@Override
public E build(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException {
ExchangisJobBuilder<?, ?> beforeJoBuilder = ctx.getCurrentBuilder();
JobParamDefine.defaultParam.set(new JobParamSet());
contextThreadLocal.set(ctx);
ctx.setCurrentBuilder(this);
try {
return buildJob(inputJob, expectOut, ctx);
} finally{
JobParamDefine.defaultParam.remove();
ctx.setCurrentBuilder(beforeJoBuilder);
contextThreadLocal.remove();
JobParamDefine.defaultParam.remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.webank.wedatasphere.exchangis.job.server.builder;

import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService;
import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.builder.api.ExchangisJobBuilder;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.listener.JobLogListener;
import com.webank.wedatasphere.exchangis.job.listener.events.JobLogEvent;
import com.webank.wedatasphere.exchangis.job.server.log.JobServerLogging;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/**
* Service in job builder context
*/
public class ServiceInExchangisJobBuilderContext extends ExchangisJobBuilderContext {

/**
* Meta info service
*/
private MetadataInfoService metadataInfoService;

/**
* Job execution id
*/
private String jobExecutionId;

/**
* Logging
*/
private JobServerLogging<ExchangisJobBuilder<?, ?>> logging;

public ServiceInExchangisJobBuilderContext(ExchangisJobInfo originalJob,
JobLogListener jobLogListener) {
super(originalJob);
this.logging = new JobServerLogging<ExchangisJobBuilder<?, ?>>() {
@Override
public Logger getLogger() {
return Objects.nonNull(currentBuilder)?
LoggerFactory.getLogger(currentBuilder.getClass()) : null;
}

@Override
public JobLogListener getJobLogListener() {
return jobLogListener;
}

@Override
public JobLogEvent getJobLogEvent(JobLogEvent.Level level, ExchangisJobBuilder<?, ?> builder, String message, Object... args) {
return new JobLogEvent(level, originalJob.getExecuteUser(), jobExecutionId, message, args);
}
};
}

public String getJobExecutionId() {
return jobExecutionId;
}

public void setJobExecutionId(String jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}

public MetadataInfoService getMetadataInfoService() {
return metadataInfoService;
}

public void setMetadataInfoService(MetadataInfoService metadataInfoService) {
this.metadataInfoService = metadataInfoService;
}

public JobServerLogging<ExchangisJobBuilder<?, ?>> getLogging() {
return logging;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.webank.wedatasphere.exchangis.job.server.builder.engine;

import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.builder.api.AbstractExchangisJobBuilder;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisBase;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJob;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;
import com.webank.wedatasphere.exchangis.job.server.builder.ServiceInExchangisJobBuilderContext;

/**
* Abstract implement for engine job builder
*/
public abstract class AbstractLoggingExchangisJobBuilder<T extends ExchangisJob, E extends ExchangisBase> extends
AbstractExchangisJobBuilder<T, E> {

/**
* Get builder context
* @return context
* @throws ExchangisJobException.Runtime exception
*/
protected static ServiceInExchangisJobBuilderContext getServiceInBuilderContext() throws ExchangisJobException.Runtime{
ExchangisJobBuilderContext context = getCurrentBuilderContext();
if (!(context instanceof ServiceInExchangisJobBuilderContext)) {
throw new ExchangisJobException.Runtime(-1, "The job builder context cannot not be casted to " + ServiceInExchangisJobBuilderContext.class.getCanonicalName(), null);
}
return (ServiceInExchangisJobBuilderContext)context;
}

/**
* Warn message
* @param message message
*/
public static void warn(String message, Object... args){
getServiceInBuilderContext().getLogging().warn(null, message, args);
}

public static void warn(String message, Throwable t){
getServiceInBuilderContext().getLogging().warn(null, message, t);
}

/**
* Info message
* @param message message
*/
public static void info(String message, Object... args){
getServiceInBuilderContext().getLogging().info(null, message, args);
}

public static void info(String message, Throwable t){
getServiceInBuilderContext().getLogging().info(null, message, t);
}
}
Loading

0 comments on commit de367ad

Please sign in to comment.