diff --git a/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/ExchangisDataSourceService.java b/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/ExchangisDataSourceService.java index 38d9c2510..cadd91a9c 100644 --- a/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/ExchangisDataSourceService.java +++ b/exchangis-datasource/exchangis-datasource-service/src/main/java/com/webank/wedatasphere/exchangis/datasource/service/ExchangisDataSourceService.java @@ -1184,6 +1184,14 @@ private void checkDataXDSSupportDegree(String sourceDsType, String sinkDsType) t } + /** + * TODO: the mapping function is defined by the rule of Hive directly, we should abstract to support all the types + * @param request + * @param vo + * @return + * @throws Exception + */ + @SuppressWarnings("unchecked") public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, FieldMappingVO vo) throws Exception { this.checkDSSupportDegree(vo.getEngine(), vo.getSourceTypeId(), vo.getSinkTypeId()); @@ -1209,18 +1217,15 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F field.setFieldEditable(!"HIVE".equals(vo.getSinkTypeId())); } message.data("sinkFields", sinkFields); - - // field mapping deduction List> deductions = new ArrayList<>(); -// boolean[] matchedIndex = new boolean[sinkFields.size()]; List left = sourceFields; List right = sinkFields; boolean exchanged = false; if (containHive && "HIVE".equals(vo.getSinkTypeId())) { left = sinkFields; right = sourceFields; - exchanged = !exchanged; + exchanged = true; } for (int i = 0; i < left.size(); i ++){ DataSourceDbTableColumnDTO leftElement = left.get(i); @@ -1228,7 +1233,7 @@ public Message queryDataSourceDBTableFieldsMapping(HttpServletRequest request, F Map deduction = new HashMap<>(); deduction.put("source", exchanged ? rightElement : leftElement); deduction.put("sink", exchanged ? leftElement : rightElement); - deduction.put("deleteEnable", !containHive); + deduction.put("deleteEnable", true); deductions.add(deduction); } message.data("deductions", deductions); diff --git a/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/ExchangisJobBuilderContext.java b/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/ExchangisJobBuilderContext.java index af221df16..e6aee9968 100644 --- a/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/ExchangisJobBuilderContext.java +++ b/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/ExchangisJobBuilderContext.java @@ -1,9 +1,8 @@ package com.webank.wedatasphere.exchangis.job.builder; -import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService; +import com.webank.wedatasphere.exchangis.job.builder.api.ExchangisJobBuilder; import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo; -import com.webank.wedatasphere.exchangis.job.listener.JobLogListener; import java.util.HashMap; import java.util.Map; @@ -16,35 +15,25 @@ public class ExchangisJobBuilderContext { /** * Origin job */ - private ExchangisJobInfo originalJob; + protected ExchangisJobInfo originalJob; /** - * Listen the log event + * Current builder */ - private JobLogListener jobLogListener; + protected ExchangisJobBuilder currentBuilder; private Map env = new HashMap<>(); private Map> datasourceParams = new HashMap<>(); - private MetadataInfoService metadataInfoService; - public ExchangisJobBuilderContext() { } - public ExchangisJobBuilderContext(ExchangisJobInfo originalJob, JobLogListener jobLogListener){ + public ExchangisJobBuilderContext(ExchangisJobInfo originalJob){ this.originalJob = originalJob; - this.jobLogListener = jobLogListener; - } - - public ExchangisJobBuilderContext(ExchangisJobInfo originalJob) { - this(originalJob, null); } - public JobLogListener getJobLogListener() { - return jobLogListener; - } public ExchangisJobInfo getOriginalJob() { return originalJob; @@ -73,11 +62,11 @@ public boolean containsEnv(String name) { return this.env.containsKey(name); } - public MetadataInfoService getMetadataInfoService() { - return metadataInfoService; + public ExchangisJobBuilder getCurrentBuilder() { + return currentBuilder; } - public void setMetadataInfoService(MetadataInfoService metadataInfoService) { - this.metadataInfoService = metadataInfoService; + public void setCurrentBuilder(ExchangisJobBuilder currentBuilder) { + this.currentBuilder = currentBuilder; } } diff --git a/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/api/AbstractExchangisJobBuilder.java b/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/api/AbstractExchangisJobBuilder.java index 37e833011..9f8fa61d8 100644 --- a/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/api/AbstractExchangisJobBuilder.java +++ b/exchangis-job/exchangis-job-builder/src/main/java/com/webank/wedatasphere/exchangis/job/builder/api/AbstractExchangisJobBuilder.java @@ -42,13 +42,16 @@ public boolean canBuild(T inputJob) { @Override public E build(T inputJob, E expectOut, ExchangisJobBuilderContext ctx) throws ExchangisJobException { + ExchangisJobBuilder beforeJoBuilder = ctx.getCurrentBuilder(); JobParamDefine.defaultParam.set(new JobParamSet()); contextThreadLocal.set(ctx); + ctx.setCurrentBuilder(this); try { return buildJob(inputJob, expectOut, ctx); } finally{ - JobParamDefine.defaultParam.remove(); + ctx.setCurrentBuilder(beforeJoBuilder); contextThreadLocal.remove(); + JobParamDefine.defaultParam.remove(); } } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/ServiceInExchangisJobBuilderContext.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/ServiceInExchangisJobBuilderContext.java new file mode 100644 index 000000000..2f73f7ca0 --- /dev/null +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/ServiceInExchangisJobBuilderContext.java @@ -0,0 +1,76 @@ +package com.webank.wedatasphere.exchangis.job.server.builder; + +import com.webank.wedatasphere.exchangis.datasource.core.service.MetadataInfoService; +import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext; +import com.webank.wedatasphere.exchangis.job.builder.api.ExchangisJobBuilder; +import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo; +import com.webank.wedatasphere.exchangis.job.listener.JobLogListener; +import com.webank.wedatasphere.exchangis.job.listener.events.JobLogEvent; +import com.webank.wedatasphere.exchangis.job.server.log.JobServerLogging; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +/** + * Service in job builder context + */ +public class ServiceInExchangisJobBuilderContext extends ExchangisJobBuilderContext { + + /** + * Meta info service + */ + private MetadataInfoService metadataInfoService; + + /** + * Job execution id + */ + private String jobExecutionId; + + /** + * Logging + */ + private JobServerLogging> logging; + + public ServiceInExchangisJobBuilderContext(ExchangisJobInfo originalJob, + JobLogListener jobLogListener) { + super(originalJob); + this.logging = new JobServerLogging>() { + @Override + public Logger getLogger() { + return Objects.nonNull(currentBuilder)? + LoggerFactory.getLogger(currentBuilder.getClass()) : null; + } + + @Override + public JobLogListener getJobLogListener() { + return jobLogListener; + } + + @Override + public JobLogEvent getJobLogEvent(JobLogEvent.Level level, ExchangisJobBuilder builder, String message, Object... args) { + return new JobLogEvent(level, originalJob.getExecuteUser(), jobExecutionId, message, args); + } + }; + } + + public String getJobExecutionId() { + return jobExecutionId; + } + + public void setJobExecutionId(String jobExecutionId) { + this.jobExecutionId = jobExecutionId; + } + + public MetadataInfoService getMetadataInfoService() { + return metadataInfoService; + } + + public void setMetadataInfoService(MetadataInfoService metadataInfoService) { + this.metadataInfoService = metadataInfoService; + } + + public JobServerLogging> getLogging() { + return logging; + } +} diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/AbstractLoggingExchangisJobBuilder.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/AbstractLoggingExchangisJobBuilder.java new file mode 100644 index 000000000..26b1bc672 --- /dev/null +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/AbstractLoggingExchangisJobBuilder.java @@ -0,0 +1,52 @@ +package com.webank.wedatasphere.exchangis.job.server.builder.engine; + +import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext; +import com.webank.wedatasphere.exchangis.job.builder.api.AbstractExchangisJobBuilder; +import com.webank.wedatasphere.exchangis.job.domain.ExchangisBase; +import com.webank.wedatasphere.exchangis.job.domain.ExchangisJob; +import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException; +import com.webank.wedatasphere.exchangis.job.server.builder.ServiceInExchangisJobBuilderContext; + +/** + * Abstract implement for engine job builder + */ +public abstract class AbstractLoggingExchangisJobBuilder extends + AbstractExchangisJobBuilder { + + /** + * Get builder context + * @return context + * @throws ExchangisJobException.Runtime exception + */ + protected static ServiceInExchangisJobBuilderContext getServiceInBuilderContext() throws ExchangisJobException.Runtime{ + ExchangisJobBuilderContext context = getCurrentBuilderContext(); + if (!(context instanceof ServiceInExchangisJobBuilderContext)) { + throw new ExchangisJobException.Runtime(-1, "The job builder context cannot not be casted to " + ServiceInExchangisJobBuilderContext.class.getCanonicalName(), null); + } + return (ServiceInExchangisJobBuilderContext)context; + } + + /** + * Warn message + * @param message message + */ + public static void warn(String message, Object... args){ + getServiceInBuilderContext().getLogging().warn(null, message, args); + } + + public static void warn(String message, Throwable t){ + getServiceInBuilderContext().getLogging().warn(null, message, t); + } + + /** + * Info message + * @param message message + */ + public static void info(String message, Object... args){ + getServiceInBuilderContext().getLogging().info(null, message, args); + } + + public static void info(String message, Throwable t){ + getServiceInBuilderContext().getLogging().info(null, message, t); + } +} diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/SqoopExchangisEngineJobBuilder.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/SqoopExchangisEngineJobBuilder.java index 93a62543e..7ab5e2fd7 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/SqoopExchangisEngineJobBuilder.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/builder/engine/SqoopExchangisEngineJobBuilder.java @@ -4,7 +4,6 @@ import com.webank.wedatasphere.exchangis.datasource.core.exception.ExchangisDataSourceException; import com.webank.wedatasphere.exchangis.datasource.core.utils.Json; import com.webank.wedatasphere.exchangis.job.builder.ExchangisJobBuilderContext; -import com.webank.wedatasphere.exchangis.job.builder.api.AbstractExchangisJobBuilder; import com.webank.wedatasphere.exchangis.job.domain.ExchangisEngineJob; import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo; import com.webank.wedatasphere.exchangis.job.domain.SubExchangisJob; @@ -15,6 +14,7 @@ import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException; import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobExceptionCode; import com.webank.wedatasphere.exchangis.job.server.builder.JobParamConstraints; +import com.webank.wedatasphere.exchangis.job.server.builder.ServiceInExchangisJobBuilderContext; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ import static com.webank.wedatasphere.exchangis.job.server.builder.engine.SqoopExchangisEngineJobBuilder.MODE_TYPE.EXPORT; import static com.webank.wedatasphere.exchangis.job.server.builder.engine.SqoopExchangisEngineJobBuilder.MODE_TYPE.IMPORT; -public class SqoopExchangisEngineJobBuilder extends AbstractExchangisJobBuilder { +public class SqoopExchangisEngineJobBuilder extends AbstractLoggingExchangisJobBuilder { private static final Logger LOG = LoggerFactory.getLogger(SqoopExchangisEngineJobBuilder.class); @@ -111,11 +111,36 @@ public enum MODE_TYPE { IMPORT, EXPORT} return null; }); + /** + * Meta columns + */ + private static final JobParamDefine> META_COLUMNS = JobParams.define("sqoop.meta.table.columns", (BiFunction>) (k, paramSet) -> { + ServiceInExchangisJobBuilderContext context = getServiceInBuilderContext(); + JobParam dataSourceId = paramSet.get(JobParamConstraints.DATA_SOURCE_ID); + JobParam database = paramSet.get(JobParamConstraints.DATABASE, String.class); + JobParam table = paramSet.get(JobParamConstraints.TABLE, String.class); + try { + return context.getMetadataInfoService().getColumns(context.getOriginalJob().getCreateUser(), + Long.valueOf(dataSourceId.getValue()), database.getValue(), table.getValue()); + } catch (ExchangisDataSourceException e) { + throw new ExchangisJobException.Runtime(e.getErrCode(), e.getMessage(), e.getCause()); + } + }); + + /** + * Meta hadoop columns + */ + private static final JobParamDefine> META_HADOOP_COLUMNS = JobParams.define("sqoop.meta.hadoop.table.columns", (BiFunction>) (k, job) -> META_COLUMNS.newValue(MODE_HADOOP_PARAMS.getValue(job))); + + /** + * Meta rdbms columns + */ + private static final JobParamDefine> META_RDBMS_COLUMNS = JobParams.define("sqoop.meta.rdbms.table.columns", (BiFunction>) (k, job) -> META_COLUMNS.newValue(MODE_RDBMS_PARAMS.getValue(job))); /** * Meta table/partition props */ - private static final JobParamDefine> META_TABLE_PROPS = JobParams.define("sqoop.meta.table.props", (BiFunction>) (k, job) ->{ - ExchangisJobBuilderContext context = getCurrentBuilderContext(); + private static final JobParamDefine> META_HADOOP_TABLE_PROPS = JobParams.define("sqoop.meta.hadoop.table.props", (BiFunction>) (k, job) ->{ + ServiceInExchangisJobBuilderContext context = getServiceInBuilderContext(); ExchangisJobInfo jobInfo = context.getOriginalJob(); // Use the creator as userName String userName = jobInfo.getCreateUser(); @@ -142,7 +167,7 @@ public enum MODE_TYPE { IMPORT, EXPORT} }); private static final JobParamDefine IS_TEXT_FILE_TYPE = JobParams.define("sqoop.file.is.text", (BiFunction)(k, job) -> { - Map tableProps = META_TABLE_PROPS.getValue(job); + Map tableProps = META_HADOOP_TABLE_PROPS.getValue(job); return HADOOP_TEXT_INPUT_FORMAT.contains(tableProps.getOrDefault(META_INPUT_FORMAT, "")) || HADOOP_TEXT_OUTPUT_FORMAT.contains(tableProps.getOrDefault(META_OUTPUT_FORMAT, "")); }); @@ -345,7 +370,7 @@ public enum MODE_TYPE { IMPORT, EXPORT} * Import: Hive-delete-target-dir */ private static final JobParamDefine HIVE_DELETE_TARGET = JobParams.define("sqoop.args.delete.target.dir", (BiFunction) (k, job) -> { - if (Objects.nonNull(HIVE_IMPORT.getValue(job)) && Objects.isNull(HIVE_APPEND.getValue(job))){ + if (Objects.nonNull(HIVE_IMPORT.getValue(job))){ return ""; } return null; @@ -356,7 +381,7 @@ public enum MODE_TYPE { IMPORT, EXPORT} */ private static final JobParamDefine HIVE_FIELDS_TERMINATED_BY = JobParams.define("sqoop.args.fields.terminated.by", (BiFunction) (k, job) -> { if (MODE_ENUM.getValue(job) == IMPORT && "hive".equalsIgnoreCase(job.getSinkType())){ - return META_TABLE_PROPS.getValue(job).getOrDefault(META_FIELD_DELIMITER, "\u0001"); + return META_HADOOP_TABLE_PROPS.getValue(job).getOrDefault(META_FIELD_DELIMITER, "\u0001"); } return null; }); @@ -401,19 +426,8 @@ public enum MODE_TYPE { IMPORT, EXPORT} if (MODE_ENUM.getValue(job) == EXPORT ){ JobParam writeMode = MODE_RDBMS_PARAMS.getValue(job).get(JobParamConstraints.WRITE_MODE, String.class); if (Objects.nonNull(writeMode) && StringUtils.isNotBlank(writeMode.getValue()) && !"insert".equalsIgnoreCase(writeMode.getValue())){ - ExchangisJobBuilderContext context = getCurrentBuilderContext(); - JobParamSet paramSet = MODE_RDBMS_PARAMS.getValue(job); - JobParam dataSourceId = paramSet.get(JobParamConstraints.DATA_SOURCE_ID); - JobParam database = paramSet.get(JobParamConstraints.DATABASE, String.class); - JobParam table = paramSet.get(JobParamConstraints.TABLE, String.class); - try { - return context.getMetadataInfoService().getColumns(context.getOriginalJob().getCreateUser(), - Long.valueOf(dataSourceId.getValue()), database.getValue(), table.getValue()).stream() - .filter(MetaColumn::isPrimaryKey) - .map(MetaColumn::getName).collect(Collectors.joining(",")); - } catch (ExchangisDataSourceException e) { - throw new ExchangisJobException.Runtime(e.getErrCode(), e.getMessage(), e.getCause()); - } + return META_RDBMS_COLUMNS.getValue(job).stream().filter(MetaColumn::isPrimaryKey) + .map(MetaColumn::getName).collect(Collectors.joining(",")); } } return null; @@ -477,7 +491,7 @@ public enum MODE_TYPE { IMPORT, EXPORT} */ private static final JobParamDefine HIVE_INPUT_FIELDS_TERMINATED_KEY = JobParams.define("sqoop.args.input.fields.terminated.by", (BiFunction) (k, job) -> { if (MODE_ENUM.getValue(job) == EXPORT && "hive".equalsIgnoreCase(job.getSourceType())){ - return META_TABLE_PROPS.getValue(job).getOrDefault(META_FIELD_DELIMITER, "\u0001"); + return META_HADOOP_TABLE_PROPS.getValue(job).getOrDefault(META_FIELD_DELIMITER, "\u0001"); } return null; }); @@ -534,6 +548,25 @@ public enum MODE_TYPE { IMPORT, EXPORT} return StringUtils.join(columnSerial, ","); }); + /** + * Inspection of the definitions above + */ + private static final JobParamDefine DEFINE_INSPECTION = JobParams.define("", (BiFunction) (key, job) -> { + List rdbmsColumns = new ArrayList<>(Arrays.asList(COLUMN_SERIAL.getValue(job).split(","))); + List hadoopColumns = META_HADOOP_COLUMNS.getValue(job).stream().map(MetaColumn::getName) + .collect(Collectors.toList()); + if (IS_USE_HCATALOG.getValue(job)){ + rdbmsColumns.removeAll(hadoopColumns); + if (!rdbmsColumns.isEmpty()){ + warn("NOTE: task:[name:{}, id:{}] 在使用Hcatalog方式下,关系型数据库字段 [" + StringUtils.join(rdbmsColumns, ",") + "] 在hive/hbase表中未查询到对应字段", + job.getName(), job.getId()); + } + }else { + warn("NOTE: task:[name: {}, id:{}] 将使用非Hcatalog方式(原生)导数, 将顺序匹配关系型数据库字段和hive/hbase字段,否则请改变写入方式为APPEND追加", + job.getName(), job.getId()); + } + return null; + }); @Override public int priority() { return 1; @@ -579,7 +612,9 @@ private JobParamDefine[] getParamDefinitions(){ EXPORT_DIR, UPDATE_KEY, UPDATE_MODE, HCATALOG_DATABASE, HCATALOG_TABLE, HCATALOG_PARTITION_KEY, HCATALOG_PARTITION_VALUE, HIVE_INPUT_FIELDS_TERMINATED_KEY, HIVE_INPUT_NULL_STRING, HIVE_INPUT_NULL_NON_STRING, - COLUMN_SERIAL + COLUMN_SERIAL,DEFINE_INSPECTION }; } + + } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/generator/DefaultTaskGenerator.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/generator/DefaultTaskGenerator.java index 7dcfc6e54..8248bae5a 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/generator/DefaultTaskGenerator.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/execution/generator/DefaultTaskGenerator.java @@ -9,6 +9,7 @@ import com.webank.wedatasphere.exchangis.job.exception.ExchangisJobException; import com.webank.wedatasphere.exchangis.job.launcher.domain.LaunchableExchangisJob; import com.webank.wedatasphere.exchangis.job.launcher.domain.LaunchableExchangisTask; +import com.webank.wedatasphere.exchangis.job.server.builder.ServiceInExchangisJobBuilderContext; import com.webank.wedatasphere.exchangis.job.server.builder.transform.TransformExchangisJob; import com.webank.wedatasphere.exchangis.job.server.exception.ExchangisTaskGenerateException; import com.webank.wedatasphere.exchangis.job.server.execution.generator.events.TaskGenerateErrorEvent; @@ -74,9 +75,10 @@ protected void execute(LaunchableExchangisJob launchableExchangisJob, throw throwable; } ExchangisJobBuilderManager jobBuilderManager = getExchangisJobBuilderManager(); - ExchangisJobBuilderContext ctx = new ExchangisJobBuilderContext(jobInfo, generatorContext.getJobLogListener()); + ServiceInExchangisJobBuilderContext ctx = new ServiceInExchangisJobBuilderContext(jobInfo, generatorContext.getJobLogListener()); // Set the metadata service ctx.setMetadataInfoService(generatorContext.getMetadataInfoService()); + ctx.setJobExecutionId(launchableExchangisJob.getJobExecutionId()); ctx.putEnv("USER_NAME", tenancy); // ExchangisJobInfo -> TransformExchangisJob(SubExchangisJob) try { diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/log/JobServerLogging.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/log/JobServerLogging.java index 5002891fd..2b4e11dba 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/log/JobServerLogging.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/log/JobServerLogging.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; import java.util.Optional; public interface JobServerLogging { @@ -14,7 +15,7 @@ default Logger getLogger(){ default void trace(T entity, String message, Object... args) { Logger logger = getLogger(); - if (logger.isTraceEnabled()){ + if (Objects.nonNull(logger) && logger.isTraceEnabled()){ logger.trace(message, args); } Optional.ofNullable(getJobLogListener()).ifPresent(listener -> @@ -23,14 +24,14 @@ default void trace(T entity, String message, Object... args) { default void debug(T entity, String message){ Logger logger = getLogger(); - if (logger.isDebugEnabled()){ + if (Objects.nonNull(logger) && logger.isDebugEnabled()){ logger.debug(message); } } default void info(T entity, String message, Object... args){ Logger logger = getLogger(); - if (logger.isInfoEnabled()){ + if (Objects.nonNull(logger) && logger.isInfoEnabled()){ logger.info(message, args); } Optional.ofNullable(getJobLogListener()).ifPresent(listener -> @@ -39,25 +40,25 @@ default void info(T entity, String message, Object... args){ default void info(T entity, String message, Throwable t){ Logger logger = getLogger(); - if (logger.isInfoEnabled()){ + if (Objects.nonNull(logger) && logger.isInfoEnabled()){ logger.info(message, t); } Optional.ofNullable(getJobLogListener()).ifPresent(listener -> listener.onAsyncEvent(getJobLogEvent(JobLogEvent.Level.INFO, entity, message, t))); } - default void warn(T entity, String message){ + default void warn(T entity, String message, Object... args){ Logger logger = getLogger(); - if (logger.isWarnEnabled()){ - logger.warn(message); + if (Objects.nonNull(logger) && logger.isWarnEnabled()){ + logger.warn(message, args); } Optional.ofNullable(getJobLogListener()).ifPresent(listener -> - listener.onAsyncEvent(getJobLogEvent(JobLogEvent.Level.WARN, entity, message))); + listener.onAsyncEvent(getJobLogEvent(JobLogEvent.Level.WARN, entity, message, args))); } default void warn(T entity, String message, Throwable t){ Logger logger = getLogger(); - if (logger.isWarnEnabled()){ + if (Objects.nonNull(logger) && logger.isWarnEnabled()){ logger.warn(message, t); } Optional.ofNullable(getJobLogListener()).ifPresent(listener -> @@ -65,7 +66,7 @@ default void warn(T entity, String message, Throwable t){ } default void error(T entity, String message, Object... args){ - getLogger().error(message, args); + Optional.ofNullable(getLogger()).ifPresent(logger -> logger.error(message, args)); Optional.ofNullable(getJobLogListener()).ifPresent(listener -> listener.onAsyncEvent(getJobLogEvent(JobLogEvent.Level.ERROR, entity, message, args))); } diff --git a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/restful/execute/ExchangisJobExecuteRestfulApi.java b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/restful/execute/ExchangisJobExecuteRestfulApi.java index 954c51e82..690ab3f8f 100644 --- a/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/restful/execute/ExchangisJobExecuteRestfulApi.java +++ b/exchangis-job/exchangis-job-server/src/main/java/com/webank/wedatasphere/exchangis/job/server/restful/execute/ExchangisJobExecuteRestfulApi.java @@ -52,7 +52,7 @@ public Message executeJob(@RequestBody(required = false) Map pe if (Objects.isNull(jobVo)){ return Message.error("Job related the id: [" + id + "] is Empty(关联的任务不存在)"); } - // Convert to the job info + // Convert to the job info TODO cannot find the execute user ExchangisJobInfo jobInfo = new ExchangisJobInfo(jobVo); String loginUser = SecurityFilter.getLoginUsername(request); if (!hasAuthority(loginUser, jobInfo)){ diff --git a/exchangis-job/exchangis-job-server/src/main/scala/com/webank/wedatasphere/exchangis/job/server/log/DefaultRpcJobLogger.scala b/exchangis-job/exchangis-job-server/src/main/scala/com/webank/wedatasphere/exchangis/job/server/log/DefaultRpcJobLogger.scala index 2fb087e7c..62cc242d3 100644 --- a/exchangis-job/exchangis-job-server/src/main/scala/com/webank/wedatasphere/exchangis/job/server/log/DefaultRpcJobLogger.scala +++ b/exchangis-job/exchangis-job-server/src/main/scala/com/webank/wedatasphere/exchangis/job/server/log/DefaultRpcJobLogger.scala @@ -22,6 +22,7 @@ class DefaultRpcJobLogger extends JobLogListener{ event.getLevel match { case Level.INFO => getLogger.info(message, event.getArgs: _*) case Level.ERROR => getLogger.error(message, event.getArgs: _*) + case Level.WARN => getLogger.warn(message, event.getArgs: _*) case _ => getLogger.trace(message, event.getArgs) } }