Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement in Sqoop Engine Job Builder #188

Merged
merged 2 commits into from
Feb 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,14 @@ private void checkDataXDSSupportDegree(String sourceDsType, String sinkDsType) t

}

/**
* TODO: the mapping function is defined by the rule of Hive directly, we should abstract to support all the types
* @param request
* @param vo
* @return
* @throws Exception
*/
@SuppressWarnings("unchecked")
public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, FieldMappingVO vo) throws Exception {

this.checkDSSupportDegree(vo.getEngine(), vo.getSourceTypeId(), vo.getSinkTypeId());
Expand All @@ -1209,26 +1217,23 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F
field.setFieldEditable(!"HIVE".equals(vo.getSinkTypeId()));
}
message.data("sinkFields", sinkFields);


// field mapping deduction
List<Map<String, Object>> deductions = new ArrayList<>();
// boolean[] matchedIndex = new boolean[sinkFields.size()];
List<DataSourceDbTableColumnDTO> left = sourceFields;
List<DataSourceDbTableColumnDTO> right = sinkFields;
boolean exchanged = false;
if (containHive && "HIVE".equals(vo.getSinkTypeId())) {
left = sinkFields;
right = sourceFields;
exchanged = !exchanged;
exchanged = true;
}
for (int i = 0; i < left.size(); i ++){
DataSourceDbTableColumnDTO leftElement = left.get(i);
DataSourceDbTableColumnDTO rightElement = right.get(i % right.size());
Map<String, Object> deduction = new HashMap<>();
deduction.put("source", exchanged ? rightElement : leftElement);
deduction.put("sink", exchanged ? leftElement : rightElement);
deduction.put("deleteEnable", !containHive);
deduction.put("deleteEnable", true);
deductions.add(deduction);
}
message.data("deductions", deductions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package com.webank.wedatasphere.exchangis.job.builder;


import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService;
import com.webank.wedatasphere.exchangis.job.builder.api.ExchangisJobBuilder;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.listener.JobLogListener;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -16,35 +15,25 @@ public class ExchangisJobBuilderContext {
/**
* Origin job
*/
private ExchangisJobInfo originalJob;
protected ExchangisJobInfo originalJob;

/**
* Listen the log event
* Current builder
*/
private JobLogListener jobLogListener;
protected ExchangisJobBuilder<?, ?> currentBuilder;

private Map<String, Object> env = new HashMap<>();

private Map<String, Map<String, Object>> datasourceParams = new HashMap<>();

private MetadataInfoService metadataInfoService;

public ExchangisJobBuilderContext() {

}

public ExchangisJobBuilderContext(ExchangisJobInfo originalJob, JobLogListener jobLogListener){
public ExchangisJobBuilderContext(ExchangisJobInfo originalJob){
this.originalJob = originalJob;
this.jobLogListener = jobLogListener;
}

public ExchangisJobBuilderContext(ExchangisJobInfo originalJob) {
this(originalJob, null);
}

public JobLogListener getJobLogListener() {
return jobLogListener;
}

public ExchangisJobInfo getOriginalJob() {
return originalJob;
Expand Down Expand Up @@ -73,11 +62,11 @@ public boolean containsEnv(String name) {
return this.env.containsKey(name);
}

public MetadataInfoService getMetadataInfoService() {
return metadataInfoService;
public ExchangisJobBuilder<?, ?> getCurrentBuilder() {
return currentBuilder;
}

public void setMetadataInfoService(MetadataInfoService metadataInfoService) {
this.metadataInfoService = metadataInfoService;
public void setCurrentBuilder(ExchangisJobBuilder<?, ?> currentBuilder) {
this.currentBuilder = currentBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ public boolean canBuild(T inputJob) {

@Override
public E build(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException {
ExchangisJobBuilder<?, ?> beforeJoBuilder = ctx.getCurrentBuilder();
JobParamDefine.defaultParam.set(new JobParamSet());
contextThreadLocal.set(ctx);
ctx.setCurrentBuilder(this);
try {
return buildJob(inputJob, expectOut, ctx);
} finally{
JobParamDefine.defaultParam.remove();
ctx.setCurrentBuilder(beforeJoBuilder);
contextThreadLocal.remove();
JobParamDefine.defaultParam.remove();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.webank.wedatasphere.exchangis.job.server.builder;

import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService;
import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.builder.api.ExchangisJobBuilder;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.listener.JobLogListener;
import com.webank.wedatasphere.exchangis.job.listener.events.JobLogEvent;
import com.webank.wedatasphere.exchangis.job.server.log.JobServerLogging;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/**
* Service in job builder context
*/
public class ServiceInExchangisJobBuilderContext extends ExchangisJobBuilderContext {

/**
* Meta info service
*/
private MetadataInfoService metadataInfoService;

/**
* Job execution id
*/
private String jobExecutionId;

/**
* Logging
*/
private JobServerLogging<ExchangisJobBuilder<?, ?>> logging;

public ServiceInExchangisJobBuilderContext(ExchangisJobInfo originalJob,
JobLogListener jobLogListener) {
super(originalJob);
this.logging = new JobServerLogging<ExchangisJobBuilder<?, ?>>() {
@Override
public Logger getLogger() {
return Objects.nonNull(currentBuilder)?
LoggerFactory.getLogger(currentBuilder.getClass()) : null;
}

@Override
public JobLogListener getJobLogListener() {
return jobLogListener;
}

@Override
public JobLogEvent getJobLogEvent(JobLogEvent.Level level, ExchangisJobBuilder<?, ?> builder, String message, Object... args) {
return new JobLogEvent(level, originalJob.getExecuteUser(), jobExecutionId, message, args);
}
};
}

public String getJobExecutionId() {
return jobExecutionId;
}

public void setJobExecutionId(String jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}

public MetadataInfoService getMetadataInfoService() {
return metadataInfoService;
}

public void setMetadataInfoService(MetadataInfoService metadataInfoService) {
this.metadataInfoService = metadataInfoService;
}

public JobServerLogging<ExchangisJobBuilder<?, ?>> getLogging() {
return logging;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.webank.wedatasphere.exchangis.job.server.builder.engine;

import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext;
import com.webank.wedatasphere.exchangis.job.builder.api.AbstractExchangisJobBuilder;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisBase;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJob;
import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException;
import com.webank.wedatasphere.exchangis.job.server.builder.ServiceInExchangisJobBuilderContext;

/**
* Abstract implement for engine job builder
*/
public abstract class AbstractLoggingExchangisJobBuilder<T extends ExchangisJob, E extends ExchangisBase> extends
AbstractExchangisJobBuilder<T, E> {

/**
* Get builder context
* @return context
* @throws ExchangisJobException.Runtime exception
*/
protected static ServiceInExchangisJobBuilderContext getServiceInBuilderContext() throws ExchangisJobException.Runtime{
ExchangisJobBuilderContext context = getCurrentBuilderContext();
if (!(context instanceof ServiceInExchangisJobBuilderContext)) {
throw new ExchangisJobException.Runtime(-1, "The job builder context cannot not be casted to " + ServiceInExchangisJobBuilderContext.class.getCanonicalName(), null);
}
return (ServiceInExchangisJobBuilderContext)context;
}

/**
* Warn message
* @param message message
*/
public static void warn(String message, Object... args){
getServiceInBuilderContext().getLogging().warn(null, message, args);
}

public static void warn(String message, Throwable t){
getServiceInBuilderContext().getLogging().warn(null, message, t);
}

/**
* Info message
* @param message message
*/
public static void info(String message, Object... args){
getServiceInBuilderContext().getLogging().info(null, message, args);
}

public static void info(String message, Throwable t){
getServiceInBuilderContext().getLogging().info(null, message, t);
}
}
Loading