Skip to content

Commit

Permalink
[feature](iceberg) iceberg write support insert overwrite and optimiz…
Browse files Browse the repository at this point in the history
…e hive write transaction statistics and (#37191) (#38097)

bp #37191

Co-authored-by: kang <35803862+ghkang98@users.noreply.github.com>
Co-authored-by: lik40 <lik40@chinatelecom.cn>
  • Loading branch information
3 people authored Jul 19, 2024
1 parent 4d03e28 commit bb2b777
Show file tree
Hide file tree
Showing 14 changed files with 1,359 additions and 417 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
Expand Down Expand Up @@ -292,25 +293,23 @@ public List<String> listDatabaseNames() {
}

public void updateTableStatistics(
String dbName,
String tableName,
SimpleTableInfo tableInfo,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updateTableStatistics(dbName, tableName, update);
client.updateTableStatistics(tableInfo.getDbName(), tableInfo.getTbName(), update);
}

void updatePartitionStatistics(
String dbName,
String tableName,
SimpleTableInfo tableInfo,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updatePartitionStatistics(dbName, tableName, partitionName, update);
client.updatePartitionStatistics(tableInfo.getDbName(), tableInfo.getTbName(), partitionName, update);
}

public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(dbName, tableName, partitions);
public void addPartitions(SimpleTableInfo tableInfo, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(tableInfo.getDbName(), tableInfo.getTbName(), partitions);
}

public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
client.dropPartition(dbName, tableName, partitionValues, deleteData);
public void dropPartition(SimpleTableInfo tableInfo, List<String> partitionValues, boolean deleteData) {
client.dropPartition(tableInfo.getDbName(), tableInfo.getTbName(), partitionValues, deleteData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Column;
import org.apache.doris.common.info.SimpleTableInfo;

import com.google.common.base.Preconditions;
import lombok.Data;
Expand All @@ -31,8 +32,7 @@ public class HivePartition {
public static final String LAST_MODIFY_TIME_KEY = "transient_lastDdlTime";
public static final String FILE_NUM_KEY = "numFiles";

private String dbName;
private String tblName;
private SimpleTableInfo tableInfo;
private String inputFormat;
private String path;
private List<String> partitionValues;
Expand All @@ -43,10 +43,9 @@ public class HivePartition {
private List<FieldSchema> columns;

// If you want to read the data under a partition, you can use this constructor
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
public HivePartition(SimpleTableInfo tableInfo, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this.dbName = dbName;
this.tblName = tblName;
this.tableInfo = tableInfo;
this.isDummyPartition = isDummyPartition;
// eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
this.inputFormat = inputFormat;
Expand All @@ -57,17 +56,31 @@ public HivePartition(String dbName, String tblName, boolean isDummyPartition,
this.parameters = parameters;
}

public HivePartition(String database, String tableName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this(new SimpleTableInfo(database, tableName), isDummyPartition, inputFormat, path, partitionValues,
parameters);
}

// If you want to update hms with partition, then you can use this constructor,
// as updating hms requires some additional information, such as outputFormat and so on
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(dbName, tblName, isDummyPartition, inputFormat, path, partitionValues, parameters);
public HivePartition(SimpleTableInfo tableInfo, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(tableInfo, isDummyPartition, inputFormat, path, partitionValues, parameters);
this.outputFormat = outputFormat;
this.serde = serde;
this.columns = columns;
}

public String getDbName() {
return tableInfo.getDbName();
}

public String getTblName() {
return tableInfo.getTbName();
}

// return partition name like: nation=cn/city=beijing
public String getPartitionName(List<Column> partColumns) {
Preconditions.checkState(partColumns.size() == partitionValues.size());
Expand All @@ -94,6 +107,7 @@ public long getLastModifiedTime() {

/**
* If there are no files, it proves that there is no data under the partition, we return 0
*
* @return
*/
public long getLastModifiedTimeIgnoreInit() {
Expand All @@ -112,12 +126,17 @@ public long getFileNum() {

@Override
public String toString() {
return "HivePartition{"
+ "dbName='" + dbName + '\''
+ ", tblName='" + tblName + '\''
+ ", isDummyPartition='" + isDummyPartition + '\''
+ ", inputFormat='" + inputFormat + '\''
+ ", path='" + path + '\''
+ ", partitionValues=" + partitionValues + '}';
final StringBuilder sb = new StringBuilder("HivePartition{");
sb.append("tableInfo=").append(tableInfo);
sb.append(", inputFormat='").append(inputFormat).append('\'');
sb.append(", path='").append(path).append('\'');
sb.append(", partitionValues=").append(partitionValues);
sb.append(", isDummyPartition=").append(isDummyPartition);
sb.append(", parameters=").append(parameters);
sb.append(", outputFormat='").append(outputFormat).append('\'');
sb.append(", serde='").append(serde).append('\'');
sb.append(", columns=").append(columns);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,28 @@

package org.apache.doris.datasource.hive;

import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.datasource.statistics.CommonStatistics.ReduceOperator;

import com.google.common.collect.ImmutableMap;

import java.util.Map;

public class HivePartitionStatistics {
public static final HivePartitionStatistics EMPTY =
new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of());
new HivePartitionStatistics(CommonStatistics.EMPTY, ImmutableMap.of());

private final HiveCommonStatistics commonStatistics;
private final CommonStatistics commonStatistics;
private final Map<String, HiveColumnStatistics> columnStatisticsMap;

public HivePartitionStatistics(
HiveCommonStatistics commonStatistics,
CommonStatistics commonStatistics,
Map<String, HiveColumnStatistics> columnStatisticsMap) {
this.commonStatistics = commonStatistics;
this.columnStatisticsMap = columnStatisticsMap;
}

public HiveCommonStatistics getCommonStatistics() {
public CommonStatistics getCommonStatistics() {
return commonStatistics;
}

Expand All @@ -48,7 +51,7 @@ public Map<String, HiveColumnStatistics> getColumnStatisticsMap() {

public static HivePartitionStatistics fromCommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
return new HivePartitionStatistics(
new HiveCommonStatistics(rowCount, fileCount, totalFileBytes),
new CommonStatistics(rowCount, fileCount, totalFileBytes),
ImmutableMap.of()
);
}
Expand All @@ -62,56 +65,32 @@ public static HivePartitionStatistics merge(HivePartitionStatistics current, Hiv
}

return new HivePartitionStatistics(
reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
CommonStatistics
.reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
}

public static HivePartitionStatistics reduce(
HivePartitionStatistics first,
HivePartitionStatistics second,
ReduceOperator operator) {
HiveCommonStatistics left = first.getCommonStatistics();
HiveCommonStatistics right = second.getCommonStatistics();
CommonStatistics left = first.getCommonStatistics();
CommonStatistics right = second.getCommonStatistics();
return HivePartitionStatistics.fromCommonStatistics(
reduce(left.getRowCount(), right.getRowCount(), operator),
reduce(left.getFileCount(), right.getFileCount(), operator),
reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
CommonStatistics.reduce(left.getRowCount(), right.getRowCount(), operator),
CommonStatistics.reduce(left.getFileCount(), right.getFileCount(), operator),
CommonStatistics.reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
}

public static HiveCommonStatistics reduce(
HiveCommonStatistics current,
HiveCommonStatistics update,
public static CommonStatistics reduce(
CommonStatistics current,
CommonStatistics update,
ReduceOperator operator) {
return new HiveCommonStatistics(
reduce(current.getRowCount(), update.getRowCount(), operator),
reduce(current.getFileCount(), update.getFileCount(), operator),
reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
}

public static long reduce(long current, long update, ReduceOperator operator) {
if (current >= 0 && update >= 0) {
switch (operator) {
case ADD:
return current + update;
case SUBTRACT:
return current - update;
case MAX:
return Math.max(current, update);
case MIN:
return Math.min(current, update);
default:
throw new IllegalArgumentException("Unexpected operator: " + operator);
}
}

return 0;
return new CommonStatistics(
CommonStatistics.reduce(current.getRowCount(), update.getRowCount(), operator),
CommonStatistics.reduce(current.getFileCount(), update.getFileCount(), operator),
CommonStatistics.reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
}

public enum ReduceOperator {
ADD,
SUBTRACT,
MIN,
MAX,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.statistics.CommonStatistics;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -75,14 +76,14 @@ private HiveUtil() {
/**
* get input format class from inputFormatName.
*
* @param jobConf jobConf used when getInputFormatClass
* @param jobConf jobConf used when getInputFormatClass
* @param inputFormatName inputFormat class name
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @return a class of inputFormat.
* @throws UserException when class not found.
*/
public static InputFormat<?, ?> getInputFormat(JobConf jobConf,
String inputFormatName, boolean symlinkTarget) throws UserException {
String inputFormatName, boolean symlinkTarget) throws UserException {
try {
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
Expand Down Expand Up @@ -167,12 +168,12 @@ public static Map<String, Partition> convertToNamePartitionMap(

Map<String, List<String>> partitionNameToPartitionValues =
partitionNames
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));

Map<List<String>, Partition> partitionValuesToPartition =
partitions.stream()
.collect(Collectors.toMap(Partition::getValues, partition -> partition));
.collect(Collectors.toMap(Partition::getValues, partition -> partition));

ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) {
Expand Down Expand Up @@ -312,7 +313,7 @@ public static Database toHiveDatabase(HiveDatabaseMetadata hiveDb) {

public static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
HiveCommonStatistics statistics) {
CommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);

result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
Expand Down Expand Up @@ -345,8 +346,8 @@ public static Partition toMetastoreApiPartition(HivePartitionWithStatistics part

public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getDbName());
result.setTableName(hivePartition.getTblName());
result.setDbName(hivePartition.getTableInfo().getDbName());
result.setTableName(hivePartition.getTableInfo().getTbName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
Expand All @@ -355,7 +356,7 @@ public static Partition toMetastoreApiPartition(HivePartition hivePartition) {

public static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTblName());
serdeInfo.setName(partition.getTableInfo().getTbName());
serdeInfo.setSerializationLib(partition.getSerde());

StorageDescriptor sd = new StorageDescriptor();
Expand Down
Loading

0 comments on commit bb2b777

Please sign in to comment.