Skip to content

Commit

Permalink
[Improve][Connector-V2][Hive] Improve config check logic, add the lim…
Browse files Browse the repository at this point in the history
…it of partition expression
  • Loading branch information
TyrantLucifer committed Jan 17, 2023
1 parent 7efe690 commit e178bf8
Showing 1 changed file with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
Expand Down Expand Up @@ -85,6 +84,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
if (pluginConfig.hasPath(BaseSinkConfig.PARTITION_DIR_EXPRESSION.key())) {
throw new HiveConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format(
"Hive sink connector does not support setting %s", BaseSinkConfig.PARTITION_DIR_EXPRESSION.key()
));
}
Pair<String[], Table> tableInfo = HiveConfig.getTableInfo(pluginConfig);
dbName = tableInfo.getLeft()[0];
tableName = tableInfo.getLeft()[1];
Expand All @@ -110,14 +114,6 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
throw new HiveConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"Hive connector only support [text parquet orc] table now");
}
String partDirExpression = pluginConfig.getString(BaseSinkConfig.PARTITION_DIR_EXPRESSION.key());
if (StringUtils.isNotBlank(partDirExpression)){
for (String part:partitionKeys){
if (!partDirExpression.contains(part)){
throw new HiveConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, "partition_dir_expression parameter configuration must contain the partition name, otherwise the partition loading will fail");
}
}
}
pluginConfig = pluginConfig
.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE.key(), ConfigValueFactory.fromAnyRef(false))
.withValue(FILE_NAME_EXPRESSION.key(), ConfigValueFactory.fromAnyRef("${transactionId}"))
Expand Down

0 comments on commit e178bf8

Please sign in to comment.