From 61030d8e7a5a05e215efed672267ac163b0cbcf6 Mon Sep 17 00:00:00 2001 From: Yuwei XIAO Date: Mon, 16 May 2022 11:07:01 +0800 Subject: [PATCH] [HUDI-3123] consistent hashing index: basic write path (upsert/insert) (#4480) 1. basic write path(insert/upsert) implementation 2. adapt simple bucket index --- .../client/utils/LazyIterableIterator.java | 4 +- .../apache/hudi/config/HoodieIndexConfig.java | 46 +++- .../apache/hudi/config/HoodieWriteConfig.java | 4 + .../org/apache/hudi/index/HoodieIndex.java | 6 +- .../hudi/index/bucket/BucketIdentifier.java | 49 ++-- .../bucket/BucketIndexLocationMapper.java | 35 +++ .../bucket/ConsistentBucketIdentifier.java | 104 ++++++++ .../hudi/index/bucket/HoodieBucketIndex.java | 119 +++------ .../index/bucket/HoodieSimpleBucketIndex.java | 99 +++++++ .../apache/hudi/io/WriteHandleFactory.java | 3 +- .../commit/BaseCommitActionExecutor.java | 2 +- .../storage/HoodieConsistentBucketLayout.java | 68 +++++ .../table/storage/HoodieDefaultLayout.java | 7 +- .../table/storage/HoodieLayoutFactory.java | 9 +- ...out.java => HoodieSimpleBucketLayout.java} | 32 +-- .../table/storage/HoodieStorageLayout.java | 2 +- .../index/bucket/TestBucketIdentifier.java | 122 +++++++++ .../TestConsistentBucketIdIdentifier.java | 79 ++++++ .../hudi/client/SparkRDDWriteClient.java | 2 +- .../org/apache/hudi/data/HoodieJavaRDD.java | 5 + .../hudi/index/SparkHoodieIndexFactory.java | 16 +- .../HoodieSparkConsistentBucketIndex.java | 210 +++++++++++++++ .../functional/TestConsistentBucketIndex.java | 250 ++++++++++++++++++ .../client/functional/TestHoodieIndex.java | 3 + .../hudi/index/TestHoodieIndexConfigs.java | 14 +- ....java => TestHoodieSimpleBucketIndex.java} | 17 +- .../commit/TestCopyOnWriteActionExecutor.java | 5 +- .../apache/hudi/common/data/HoodieData.java | 9 + .../apache/hudi/common/data/HoodieList.java | 5 + .../org/apache/hudi/common/fs/FSUtils.java | 4 + .../common/model/ConsistentHashingNode.java | 78 ++++++ .../common/model/HoodieCommitMetadata.java | 23 +- .../HoodieConsistentHashingMetadata.java | 142 ++++++++++ .../model/HoodieReplaceCommitMetadata.java | 17 +- .../model/HoodieRollingStatMetadata.java | 4 +- .../common/table/HoodieTableMetaClient.java | 8 + .../apache/hudi/common/util/JsonUtils.java | 38 +++ .../apache/hudi/common/util/hash/HashID.java | 9 + .../TestHoodieConsistentHashingMetadata.java | 31 +++ .../testutils/HoodieCommonTestHarness.java | 4 + .../index/bucket/TestBucketIdentifier.java | 67 ----- 41 files changed, 1512 insertions(+), 239 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java rename hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/{HoodieBucketLayout.java => HoodieSimpleBucketLayout.java} (71%) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java create mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java rename hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/{TestHoodieBucketIndex.java => TestHoodieSimpleBucketIndex.java} (91%) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java delete mode 100644 hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java index 020944e7ab9b..ad54f8c0a099 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java @@ -45,7 +45,7 @@ public LazyIterableIterator(Iterator in) { /** * Called once, before any elements are processed. */ - protected abstract void start(); + protected void start() {} /** * Block computation to be overwritten by sub classes. @@ -55,7 +55,7 @@ public LazyIterableIterator(Iterator in) { /** * Called once, after all elements are processed. */ - protected abstract void end(); + protected void end() {} ////////////////// // iterable implementation diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 7c1f7e00e7fb..dbd45b973828 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -216,19 +216,40 @@ public class HoodieIndexConfig extends HoodieConfig { /** * ***** Bucket Index Configs ***** * Bucket Index is targeted to locate the record fast by hash in big data scenarios. - * The current implementation is a basic version, so there are some constraints: - * 1. Unsupported operation: bulk insert, cluster and so on. - * 2. Bucket num change requires rewriting the partition. - * 3. Predict the table size and future data growth well to set a reasonable bucket num. - * 4. A bucket size is recommended less than 3GB and avoid bing too small. - * more details and progress see [HUDI-3039]. - */ - // Bucket num equals file groups num in each partition. - // Bucket num can be set according to partition size and file group size. + * A bucket size is recommended less than 3GB to avoid being too small. + * For more details and progress, see [HUDI-3039]. + */ + + /** + * Bucket Index Engine Type: implementation of bucket index + * + * SIMPLE: + * 0. Check `HoodieSimpleBucketLayout` for its supported operations. + * 1. Bucket num is fixed and requires rewriting the partition if we want to change it. + * + * CONSISTENT_HASHING: + * 0. Check `HoodieConsistentBucketLayout` for its supported operations. + * 1. Bucket num will auto-adjust by running clustering (still in progress) + */ + public static final ConfigProperty BUCKET_INDEX_ENGINE_TYPE = ConfigProperty + .key("hoodie.index.bucket.engine") + .defaultValue("SIMPLE") + .sinceVersion("0.11.0") + .withDocumentation("Type of bucket index engine to use. Default is SIMPLE bucket index, with fixed number of bucket." + + "Possible options are [SIMPLE | CONSISTENT_HASHING]." + + "Consistent hashing supports dynamic resizing of the number of bucket, solving potential data skew and file size " + + "issues of the SIMPLE hashing engine."); + + /** + * Bucket num equals file groups num in each partition. + * Bucket num can be set according to partition size and file group size. + * + * In dynamic bucket index cases (e.g., using CONSISTENT_HASHING), this config of number of bucket serves as a initial bucket size + */ public static final ConfigProperty BUCKET_INDEX_NUM_BUCKETS = ConfigProperty .key("hoodie.bucket.index.num.buckets") .defaultValue(256) - .withDocumentation("Only applies if index type is BUCKET_INDEX. Determine the number of buckets in the hudi table, " + .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, " + "and each partition is divided to N buckets."); public static final ConfigProperty BUCKET_INDEX_HASH_FIELD = ConfigProperty @@ -463,6 +484,11 @@ public Builder withIndexType(HoodieIndex.IndexType indexType) { return this; } + public Builder withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType bucketType) { + hoodieIndexConfig.setValue(BUCKET_INDEX_ENGINE_TYPE, bucketType.name()); + return this; + } + public Builder withIndexClass(String indexClass) { hoodieIndexConfig.setValue(INDEX_CLASS_NAME, indexClass); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 322c2e84e7e8..3eeb99044b29 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1428,6 +1428,10 @@ public String getIndexClass() { return getString(HoodieIndexConfig.INDEX_CLASS_NAME); } + public HoodieIndex.BucketIndexEngineType getBucketIndexEngineType() { + return HoodieIndex.BucketIndexEngineType.valueOf(getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE)); + } + public int getBloomFilterNumEntries() { return getInt(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java index 922371c4a0f4..1182c45c7247 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -121,7 +121,7 @@ public abstract HoodieData updateLocation( public abstract boolean isImplicitWithStorage(); /** - * If the `getCustomizedPartitioner` returns a partitioner, it has to be true. + * To indicate if a operation type requires location tagging before writing */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public boolean requiresTagging(WriteOperationType operationType) { @@ -143,4 +143,8 @@ public void close() { public enum IndexType { HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE } + + public enum BucketIndexEngineType { + SIMPLE, CONSISTENT_HASHING + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 1a07c4063f35..1f233b429789 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,8 +30,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -public class BucketIdentifier { - // compatible with the spark bucket name +public class BucketIdentifier implements Serializable { + // Compatible with the spark bucket name private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets) { @@ -38,27 +39,41 @@ public static int getBucketId(HoodieRecord record, String indexKeyFields, int nu } public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) { - return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets); + return (getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets; + } + + public static int getBucketId(HoodieKey hoodieKey, List indexKeyFields, int numBuckets) { + return (getHashKeys(hoodieKey.getRecordKey(), indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets; } public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) { - List hashKeyFields; - if (!recordKey.contains(":")) { - hashKeyFields = Collections.singletonList(recordKey); - } else { - Map recordKeyPairs = Arrays.stream(recordKey.split(",")) - .map(p -> p.split(":")) - .collect(Collectors.toMap(p -> p[0], p -> p[1])); - hashKeyFields = Arrays.stream(indexKeyFields.split(",")) - .map(f -> recordKeyPairs.get(f)) - .collect(Collectors.toList()); - } - return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets; + return getBucketId(getHashKeys(recordKey, indexKeyFields), numBuckets); } - // only for test public static int getBucketId(List hashKeyFields, int numBuckets) { - return hashKeyFields.hashCode() % numBuckets; + return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets; + } + + public static List getHashKeys(HoodieKey hoodieKey, String indexKeyFields) { + return getHashKeys(hoodieKey.getRecordKey(), indexKeyFields); + } + + protected static List getHashKeys(String recordKey, String indexKeyFields) { + return !recordKey.contains(":") ? Collections.singletonList(recordKey) : + getHashKeysUsingIndexFields(recordKey, Arrays.asList(indexKeyFields.split(","))); + } + + protected static List getHashKeys(String recordKey, List indexKeyFields) { + return !recordKey.contains(":") ? Collections.singletonList(recordKey) : + getHashKeysUsingIndexFields(recordKey, indexKeyFields); + } + + private static List getHashKeysUsingIndexFields(String recordKey, List indexKeyFields) { + Map recordKeyPairs = Arrays.stream(recordKey.split(",")) + .map(p -> p.split(":")) + .collect(Collectors.toMap(p -> p[0], p -> p[1])); + return indexKeyFields.stream() + .map(f -> recordKeyPairs.get(f)).collect(Collectors.toList()); } public static String partitionBucketIdStr(String partition, int bucketId) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java new file mode 100644 index 000000000000..4955087333a2 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.Option; + +import java.io.Serializable; + +public interface BucketIndexLocationMapper extends Serializable { + + /** + * Get record location given hoodie key and partition path + */ + Option getRecordLocation(HoodieKey key, String partitionPath); + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java new file mode 100644 index 000000000000..c44a8a6ccfb0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.hash.HashID; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +public class ConsistentBucketIdentifier extends BucketIdentifier { + + /** + * Hashing metadata of a partition + */ + private final HoodieConsistentHashingMetadata metadata; + /** + * In-memory structure to speed up ring mapping (hashing value -> hashing node) + */ + private final TreeMap ring; + /** + * Mapping from fileId -> hashing node + */ + private final Map fileIdToBucket; + + public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) { + this.metadata = metadata; + this.fileIdToBucket = new HashMap<>(); + this.ring = new TreeMap<>(); + initialize(); + } + + public Collection getNodes() { + return ring.values(); + } + + public HoodieConsistentHashingMetadata getMetadata() { + return metadata; + } + + public int getNumBuckets() { + return ring.size(); + } + + /** + * Get bucket of the given file group + * + * @param fileId the file group id. NOTE: not filePfx (i.e., uuid) + */ + public ConsistentHashingNode getBucketByFileId(String fileId) { + return fileIdToBucket.get(fileId); + } + + public ConsistentHashingNode getBucket(HoodieKey hoodieKey, List indexKeyFields) { + return getBucket(getHashKeys(hoodieKey.getRecordKey(), indexKeyFields)); + } + + protected ConsistentHashingNode getBucket(List hashKeys) { + int hashValue = HashID.getXXHash32(String.join("", hashKeys), 0); + return getBucket(hashValue & HoodieConsistentHashingMetadata.HASH_VALUE_MASK); + } + + protected ConsistentHashingNode getBucket(int hashValue) { + SortedMap tailMap = ring.tailMap(hashValue); + return tailMap.isEmpty() ? ring.firstEntry().getValue() : tailMap.get(tailMap.firstKey()); + } + + /** + * Initialize necessary data structure to facilitate bucket identifying. + * Specifically, we construct: + * - An in-memory tree (ring) to speed up range mapping searching. + * - A hash table (fileIdToBucket) to allow lookup of bucket using fileId. + */ + private void initialize() { + for (ConsistentHashingNode p : metadata.getNodes()) { + ring.put(p.getValue(), p); + // One bucket has only one file group, so append 0 directly + fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java index a243eea76785..c3584d234a8e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java @@ -26,9 +26,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndexUtils; @@ -37,28 +35,31 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.HashMap; -import java.util.Map; +import java.util.Arrays; +import java.util.List; /** * Hash indexing mechanism. */ -public class HoodieBucketIndex extends HoodieIndex { +public abstract class HoodieBucketIndex extends HoodieIndex { - private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class); + private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class); - private final int numBuckets; + protected final int numBuckets; + protected final List indexKeyFields; public HoodieBucketIndex(HoodieWriteConfig config) { super(config); - numBuckets = config.getBucketIndexNumBuckets(); - LOG.info("use bucket index, numBuckets=" + numBuckets); + + this.numBuckets = config.getBucketIndexNumBuckets(); + this.indexKeyFields = Arrays.asList(config.getBucketIndexHashField().split(",")); + LOG.info("Use bucket index, numBuckets = " + numBuckets + ", indexFields: " + indexKeyFields); } @Override public HoodieData updateLocation(HoodieData writeStatuses, - HoodieEngineContext context, - HoodieTable hoodieTable) + HoodieEngineContext context, + HoodieTable hoodieTable) throws HoodieIndexException { return writeStatuses; } @@ -68,62 +69,35 @@ public HoodieData> tagLocation( HoodieData> records, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { - HoodieData> taggedRecords = records.mapPartitions(recordIter -> { - // partitionPath -> bucketId -> fileInfo - Map>> partitionPathFileIDList = new HashMap<>(); - return new LazyIterableIterator, HoodieRecord>(recordIter) { - - @Override - protected void start() { - - } - - @Override - protected HoodieRecord computeNext() { - HoodieRecord record = recordIter.next(); - int bucketId = BucketIdentifier.getBucketId(record, config.getBucketIndexHashField(), numBuckets); - String partitionPath = record.getPartitionPath(); - if (!partitionPathFileIDList.containsKey(partitionPath)) { - partitionPathFileIDList.put(partitionPath, loadPartitionBucketIdFileIdMapping(hoodieTable, partitionPath)); - } - if (partitionPathFileIDList.get(partitionPath).containsKey(bucketId)) { - Pair fileInfo = partitionPathFileIDList.get(partitionPath).get(bucketId); - return HoodieIndexUtils.getTaggedRecord(record, Option.of( - new HoodieRecordLocation(fileInfo.getRight(), fileInfo.getLeft()) - )); + // Initialize necessary information before tagging. e.g., hashing metadata + List partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList(); + LOG.info("Initializing hashing metadata for partitions: " + partitions); + BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable, partitions); + + return records.mapPartitions(iterator -> + new LazyIterableIterator, HoodieRecord>(iterator) { + @Override + protected HoodieRecord computeNext() { + // TODO maybe batch the operation to improve performance + HoodieRecord record = inputItr.next(); + Option loc = mapper.getRecordLocation(record.getKey(), record.getPartitionPath()); + return HoodieIndexUtils.getTaggedRecord(record, loc); } - return record; - } - - @Override - protected void end() { - } - }; - }, true); - return taggedRecords; + ); } - private Map> loadPartitionBucketIdFileIdMapping( - HoodieTable hoodieTable, - String partition) { - // bucketId -> fileIds - Map> fileIDList = new HashMap<>(); - HoodieIndexUtils - .getLatestBaseFilesForPartition(partition, hoodieTable) - .forEach(file -> { - String fileId = file.getFileId(); - String commitTime = file.getCommitTime(); - int bucketId = BucketIdentifier.bucketIdFromFileId(fileId); - if (!fileIDList.containsKey(bucketId)) { - fileIDList.put(bucketId, Pair.of(fileId, commitTime)); - } else { - // check if bucket data is valid - throw new HoodieIOException("Find multiple files at partition path=" - + partition + " belongs to the same bucket id = " + bucketId); - } - }); - return fileIDList; + @Override + public boolean requiresTagging(WriteOperationType operationType) { + switch (operationType) { + case INSERT: + case INSERT_OVERWRITE: + case UPSERT: + case DELETE: + return true; + default: + return false; + } } @Override @@ -138,7 +112,7 @@ public boolean isGlobal() { @Override public boolean canIndexLogFiles() { - return false; + return true; } @Override @@ -146,19 +120,12 @@ public boolean isImplicitWithStorage() { return true; } - @Override - public boolean requiresTagging(WriteOperationType operationType) { - switch (operationType) { - case INSERT: - case INSERT_OVERWRITE: - case UPSERT: - return true; - default: - return false; - } - } - public int getNumBuckets() { return numBuckets; } + + /** + * Get a location mapper for the given table & partitionPath + */ + protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable table, List partitionPath); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java new file mode 100644 index 000000000000..92ac4f69b2c4 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Simple bucket index implementation, with fixed bucket number. + */ +public class HoodieSimpleBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class); + + public HoodieSimpleBucketIndex(HoodieWriteConfig config) { + super(config); + } + + private Map loadPartitionBucketIdFileIdMapping( + HoodieTable hoodieTable, + String partition) { + // bucketId -> fileIds + Map bucketIdToFileIdMapping = new HashMap<>(); + hoodieTable.getMetaClient().reloadActiveTimeline(); + HoodieIndexUtils + .getLatestBaseFilesForPartition(partition, hoodieTable) + .forEach(file -> { + String fileId = file.getFileId(); + String commitTime = file.getCommitTime(); + int bucketId = BucketIdentifier.bucketIdFromFileId(fileId); + if (!bucketIdToFileIdMapping.containsKey(bucketId)) { + bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId)); + } else { + // Check if bucket data is valid + throw new HoodieIOException("Find multiple files at partition path=" + + partition + " belongs to the same bucket id = " + bucketId); + } + }); + return bucketIdToFileIdMapping; + } + + @Override + public boolean canIndexLogFiles() { + return false; + } + + @Override + protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List partitionPath) { + return new SimpleBucketIndexLocationMapper(table, partitionPath); + } + + public class SimpleBucketIndexLocationMapper implements BucketIndexLocationMapper { + + /** + * Mapping from partitionPath -> bucketId -> fileInfo + */ + private final Map> partitionPathFileIDList; + + public SimpleBucketIndexLocationMapper(HoodieTable table, List partitions) { + partitionPathFileIDList = partitions.stream().collect(Collectors.toMap(p -> p, p -> loadPartitionBucketIdFileIdMapping(table, p))); + } + + @Override + public Option getRecordLocation(HoodieKey key, String partitionPath) { + int bucketId = BucketIdentifier.getBucketId(key, indexKeyFields, numBuckets); + Map bucketIdToFileIdMapping = partitionPathFileIDList.get(partitionPath); + return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, null)); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java index 36fae304d77f..c267b5969d80 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.io; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -32,6 +33,6 @@ public abstract HoodieWriteHandle create(HoodieWriteConfig config, S String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier); protected String getNextFileId(String idPfx) { - return String.format("%s-%d", idPfx, numFilesWritten++); + return FSUtils.createNewFileId(idPfx, numFilesWritten++); } } \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index fb07d35928d7..31c8bbd6d30d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -94,7 +94,7 @@ public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig c this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()); this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient()); this.pendingInflightAndRequestedInstants.remove(instantTime); - if (table.getStorageLayout().doesNotSupport(operationType)) { + if (!table.getStorageLayout().writeOperationSupported(operationType)) { throw new UnsupportedOperationException("Executor " + this.getClass().getSimpleName() + " is not compatible with table layout " + table.getStorageLayout().getClass().getSimpleName()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java new file mode 100644 index 000000000000..0ed2b9c939a7 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.storage; + +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.Set; + +/** + * Storage layout when using consistent hashing bucket index. + */ +public class HoodieConsistentBucketLayout extends HoodieStorageLayout { + public static final Set SUPPORTED_OPERATIONS = CollectionUtils.createImmutableSet( + WriteOperationType.INSERT, + WriteOperationType.INSERT_PREPPED, + WriteOperationType.UPSERT, + WriteOperationType.UPSERT_PREPPED, + WriteOperationType.INSERT_OVERWRITE, + WriteOperationType.DELETE, + WriteOperationType.COMPACT, + WriteOperationType.DELETE_PARTITION + ); + + public HoodieConsistentBucketLayout(HoodieWriteConfig config) { + super(config); + } + + /** + * Bucketing controls the number of file groups directly. + */ + @Override + public boolean determinesNumFileGroups() { + return true; + } + + /** + * Consistent hashing will tag all incoming records, so we could go ahead reusing an existing Partitioner + */ + @Override + public Option layoutPartitionerClass() { + return Option.empty(); + } + + @Override + public boolean writeOperationSupported(WriteOperationType operationType) { + return SUPPORTED_OPERATIONS.contains(operationType); + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java index 09d20707a4c8..28fe37c9b8fe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java @@ -31,15 +31,18 @@ public HoodieDefaultLayout(HoodieWriteConfig config) { super(config); } + @Override public boolean determinesNumFileGroups() { return false; } + @Override public Option layoutPartitionerClass() { return Option.empty(); } - public boolean doesNotSupport(WriteOperationType operationType) { - return false; + @Override + public boolean writeOperationSupported(WriteOperationType operationType) { + return true; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java index e86d253df4bf..e78c15b3a4b2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java @@ -30,7 +30,14 @@ public static HoodieStorageLayout createLayout(HoodieWriteConfig config) { case DEFAULT: return new HoodieDefaultLayout(config); case BUCKET: - return new HoodieBucketLayout(config); + switch (config.getBucketIndexEngineType()) { + case SIMPLE: + return new HoodieSimpleBucketLayout(config); + case CONSISTENT_HASHING: + return new HoodieConsistentBucketLayout(config); + default: + throw new HoodieNotSupportedException("Unknown bucket index engine type: " + config.getBucketIndexEngineType()); + } default: throw new HoodieNotSupportedException("Unknown layout type, set " + config.getLayoutType()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java similarity index 71% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java index deefcfe6a621..be048a23b058 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java @@ -19,31 +19,30 @@ package org.apache.hudi.table.storage; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieLayoutConfig; import org.apache.hudi.config.HoodieWriteConfig; -import java.util.HashSet; import java.util.Set; /** * Storage layout when using bucket index. Data distribution and files organization are in a specific way. */ -public class HoodieBucketLayout extends HoodieStorageLayout { +public class HoodieSimpleBucketLayout extends HoodieStorageLayout { - public static final Set SUPPORTED_OPERATIONS = new HashSet() {{ - add(WriteOperationType.INSERT); - add(WriteOperationType.INSERT_PREPPED); - add(WriteOperationType.UPSERT); - add(WriteOperationType.UPSERT_PREPPED); - add(WriteOperationType.INSERT_OVERWRITE); - add(WriteOperationType.DELETE); - add(WriteOperationType.COMPACT); - add(WriteOperationType.DELETE_PARTITION); - } - }; + public static final Set SUPPORTED_OPERATIONS = CollectionUtils.createImmutableSet( + WriteOperationType.INSERT, + WriteOperationType.INSERT_PREPPED, + WriteOperationType.UPSERT, + WriteOperationType.UPSERT_PREPPED, + WriteOperationType.INSERT_OVERWRITE, + WriteOperationType.DELETE, + WriteOperationType.COMPACT, + WriteOperationType.DELETE_PARTITION + ); - public HoodieBucketLayout(HoodieWriteConfig config) { + public HoodieSimpleBucketLayout(HoodieWriteConfig config) { super(config); } @@ -55,6 +54,7 @@ public boolean determinesNumFileGroups() { return true; } + @Override public Option layoutPartitionerClass() { return config.contains(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME) ? Option.of(config.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key())) @@ -62,7 +62,7 @@ public Option layoutPartitionerClass() { } @Override - public boolean doesNotSupport(WriteOperationType operationType) { - return !SUPPORTED_OPERATIONS.contains(operationType); + public boolean writeOperationSupported(WriteOperationType operationType) { + return SUPPORTED_OPERATIONS.contains(operationType); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java index a0a4eab46304..36be1a8bef6a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java @@ -48,7 +48,7 @@ public HoodieStorageLayout(HoodieWriteConfig config) { /** * Determines if the operation is supported by the layout. */ - public abstract boolean doesNotSupport(WriteOperationType operationType); + public abstract boolean writeOperationSupported(WriteOperationType operationType); public enum LayoutType { DEFAULT, BUCKET diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java new file mode 100644 index 000000000000..31f33890ad31 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.keygen.KeyGenUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +public class TestBucketIdentifier { + + public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": [" + + "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", \"type\": \"long\"}]}"; + public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"ts_ms\", \"type\": \"string\"}," + + "{\"name\": \"pii_col\", \"type\": \"string\"}," + + "{\"name\": \"nested_col\",\"type\": " + + NESTED_COL_SCHEMA + "}" + + "]}"; + + public static GenericRecord getRecord() { + return getRecord(getNestedColRecord("val1", 10L)); + } + + public static GenericRecord getNestedColRecord(String prop1Value, Long prop2Value) { + GenericRecord nestedColRecord = new GenericData.Record(new Schema.Parser().parse(NESTED_COL_SCHEMA)); + nestedColRecord.put("prop1", prop1Value); + nestedColRecord.put("prop2", prop2Value); + return nestedColRecord; + } + + public static GenericRecord getRecord(GenericRecord nestedColRecord) { + GenericRecord record = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA)); + record.put("timestamp", 4357686L); + record.put("_row_key", "key1"); + record.put("ts_ms", "2020-03-21"); + record.put("pii_col", "pi"); + record.put("nested_col", nestedColRecord); + return record; + } + + @Test + public void testBucketFileId() { + int[] ids = {0, 4, 8, 16, 32, 64, 128, 256, 512, 1000, 1024, 4096, 10000, 100000}; + for (int id : ids) { + String bucketIdStr = BucketIdentifier.bucketIdStr(id); + String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketIdStr); + assert BucketIdentifier.bucketIdFromFileId(fileId) == id; + } + } + + @Test + public void testBucketIdWithSimpleRecordKey() { + String recordKeyField = "_row_key"; + String indexKeyField = "_row_key"; + GenericRecord record = getRecord(); + HoodieRecord hoodieRecord = new HoodieAvroRecord( + new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null); + int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); + assert bucketId == BucketIdentifier.getBucketId( + Arrays.asList(record.get(indexKeyField).toString()), 8); + } + + @Test + public void testBucketIdWithComplexRecordKey() { + List recordKeyField = Arrays.asList("_row_key", "ts_ms"); + String indexKeyField = "_row_key"; + GenericRecord record = getRecord(); + HoodieRecord hoodieRecord = new HoodieAvroRecord( + new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null); + int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); + assert bucketId == BucketIdentifier.getBucketId( + Arrays.asList(record.get(indexKeyField).toString()), 8); + } + + @Test + public void testGetHashKeys() { + BucketIdentifier identifier = new BucketIdentifier(); + List keys = identifier.getHashKeys(new HoodieKey("abc", "partition"), ""); + Assertions.assertEquals(1, keys.size()); + Assertions.assertEquals("abc", keys.get(0)); + + keys = identifier.getHashKeys(new HoodieKey("f1:abc", "partition"), "f1"); + Assertions.assertEquals(1, keys.size()); + Assertions.assertEquals("abc", keys.get(0)); + + keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f2"); + Assertions.assertEquals(1, keys.size()); + Assertions.assertEquals("bcd", keys.get(0)); + + keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f1,f2"); + Assertions.assertEquals(2, keys.size()); + Assertions.assertEquals("abc", keys.get(0)); + Assertions.assertEquals("bcd", keys.get(1)); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java new file mode 100644 index 000000000000..3ffe6ded188b --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASH_VALUE_MASK; + +/** + * Unit test of consistent bucket identifier + */ +public class TestConsistentBucketIdIdentifier { + + @Test + public void testGetBucket() { + List nodes = Arrays.asList( + new ConsistentHashingNode(100, "0"), + new ConsistentHashingNode(0x2fffffff, "1"), + new ConsistentHashingNode(0x4fffffff, "2")); + HoodieConsistentHashingMetadata meta = new HoodieConsistentHashingMetadata((short) 0, "", "", 3, 0, nodes); + ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(meta); + + Assertions.assertEquals(3, identifier.getNumBuckets()); + + // Get bucket by hash keys + Assertions.assertEquals(nodes.get(2), identifier.getBucket(Arrays.asList("Hudi"))); + Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("bucket_index"))); + Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("consistent_hashing"))); + Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("bucket_index", "consistent_hashing"))); + int[] ref1 = {2, 2, 1, 1, 0, 1, 1, 1, 0, 1}; + int[] ref2 = {1, 0, 1, 0, 1, 1, 1, 0, 1, 2}; + for (int i = 0; i < 10; ++i) { + Assertions.assertEquals(nodes.get(ref1[i]), identifier.getBucket(Arrays.asList(Integer.toString(i)))); + Assertions.assertEquals(nodes.get(ref2[i]), identifier.getBucket(Arrays.asList(Integer.toString(i), Integer.toString(i + 1)))); + } + + // Get bucket by hash value + Assertions.assertEquals(nodes.get(0), identifier.getBucket(0)); + Assertions.assertEquals(nodes.get(0), identifier.getBucket(50)); + Assertions.assertEquals(nodes.get(0), identifier.getBucket(100)); + Assertions.assertEquals(nodes.get(1), identifier.getBucket(101)); + Assertions.assertEquals(nodes.get(1), identifier.getBucket(0x1fffffff)); + Assertions.assertEquals(nodes.get(1), identifier.getBucket(0x2fffffff)); + Assertions.assertEquals(nodes.get(2), identifier.getBucket(0x40000000)); + Assertions.assertEquals(nodes.get(2), identifier.getBucket(0x40000001)); + Assertions.assertEquals(nodes.get(2), identifier.getBucket(0x4fffffff)); + Assertions.assertEquals(nodes.get(0), identifier.getBucket(0x50000000)); + Assertions.assertEquals(nodes.get(0), identifier.getBucket(HASH_VALUE_MASK)); + + // Get bucket by file id + Assertions.assertEquals(nodes.get(0), identifier.getBucketByFileId(FSUtils.createNewFileId("0", 0))); + Assertions.assertEquals(nodes.get(1), identifier.getBucketByFileId(FSUtils.createNewFileId("1", 0))); + Assertions.assertEquals(nodes.get(2), identifier.getBucketByFileId(FSUtils.createNewFileId("2", 0))); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 3b512f0bdc87..df82e75db92c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -179,7 +179,7 @@ public JavaRDD insert(JavaRDD> records, String inst initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); - HoodieWriteMetadata> result = table.insert(context,instantTime, HoodieJavaRDD.of(records)); + HoodieWriteMetadata> result = table.insert(context, instantTime, HoodieJavaRDD.of(records)); HoodieWriteMetadata> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses())); return postWrite(resultRDD, instantTime, table); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index 66edf607f84d..0843dfc3c992 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -112,6 +112,11 @@ public HoodieData mapPartitions(SerializableFunction, Iterato return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, preservesPartitioning)); } + @Override + public HoodieData mapPartitions(SerializableFunction, Iterator> func) { + return HoodieJavaRDD.of(rddData.mapPartitions(func::apply)); + } + @Override public HoodieData flatMap(SerializableFunction> func) { return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java index d1f40dca484c..4525490c8d16 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java @@ -28,7 +28,8 @@ import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; -import org.apache.hudi.index.bucket.HoodieBucketIndex; +import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex; +import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; @@ -56,8 +57,6 @@ public static HoodieIndex createIndex(HoodieWriteConfig config) { return new SparkHoodieHBaseIndex(config); case INMEMORY: return new HoodieInMemoryHashIndex(config); - case BUCKET: - return new HoodieBucketIndex(config); case BLOOM: return new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance()); case GLOBAL_BLOOM: @@ -66,6 +65,15 @@ public static HoodieIndex createIndex(HoodieWriteConfig config) { return new HoodieSimpleIndex(config, getKeyGeneratorForSimpleIndex(config)); case GLOBAL_SIMPLE: return new HoodieGlobalSimpleIndex(config, getKeyGeneratorForSimpleIndex(config)); + case BUCKET: + switch (config.getBucketIndexEngineType()) { + case SIMPLE: + return new HoodieSimpleBucketIndex(config); + case CONSISTENT_HASHING: + return new HoodieSparkConsistentBucketIndex(config); + default: + throw new HoodieIndexException("Unknown bucket index engine type: " + config.getBucketIndexEngineType()); + } default: throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType()); } @@ -90,6 +98,8 @@ public static boolean isGlobalIndex(HoodieWriteConfig config) { return false; case GLOBAL_SIMPLE: return true; + case BUCKET: + return false; default: return createIndex(config).isGlobal(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java new file mode 100644 index 000000000000..ca6bf0fc7d99 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.ConsistentHashingNode; +import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * Consistent hashing bucket index implementation, with auto-adjust bucket number. + * NOTE: bucket resizing is triggered by clustering. + */ +public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex { + + private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class); + + public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public HoodieData updateLocation(HoodieData writeStatuses, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + return writeStatuses; + } + + /** + * Do nothing. + * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back + * the failed write. Because the hashing metadata created by a writer must have 00000000000000 timestamp and can be viewed + * as the initialization of a partition rather than as a part of the failed write. + */ + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + @Override + protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List partitionPath) { + return new ConsistentBucketIndexLocationMapper(table, partitionPath); + } + + /** + * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage) + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata + */ + public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) { + HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition); + if (metadata != null) { + return metadata; + } + + // There is no metadata, so try to create a new one and save it. + metadata = new HoodieConsistentHashingMetadata(partition, numBuckets); + if (saveMetadata(table, metadata, false)) { + return metadata; + } + + // The creation failed, so try load metadata again. Concurrent creation of metadata should have succeeded. + // Note: the consistent problem of cloud storage is handled internal in the HoodieWrapperFileSystem, i.e., ConsistentGuard + metadata = loadMetadata(table, partition); + ValidationUtils.checkState(metadata != null, "Failed to load or create metadata, partition: " + partition); + return metadata; + } + + /** + * Load hashing metadata of the given partition, if it is not existed, return null + * + * @param table hoodie table + * @param partition table partition + * @return Consistent hashing metadata or null if it does not exist + */ + public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) { + Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition); + + try { + if (!table.getMetaClient().getFs().exists(metadataPath)) { + return null; + } + FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath); + final HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + Predicate metaFilePredicate = fileStatus -> { + String filename = fileStatus.getPath().getName(); + if (!filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)) { + return false; + } + String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(filename); + return completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS); + }; + + // Get a valid hashing metadata with the largest (latest) timestamp + FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate) + .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null); + + if (metaFile == null) { + return null; + } + + byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath())); + return HoodieConsistentHashingMetadata.fromBytes(content); + } catch (IOException e) { + LOG.error("Error when loading hashing metadata, partition: " + partition, e); + throw new HoodieIndexException("Error while loading hashing metadata", e); + } + } + + /** + * Save metadata into storage + * + * @param table hoodie table + * @param metadata hashing metadata to be saved + * @param overwrite whether to overwrite existing metadata + * @return true if the metadata is saved successfully + */ + private static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, boolean overwrite) { + HoodieWrapperFileSystem fs = table.getMetaClient().getFs(); + Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath()); + Path fullPath = new Path(dir, metadata.getFilename()); + try (FSDataOutputStream fsOut = fs.create(fullPath, overwrite)) { + byte[] bytes = metadata.toBytes(); + fsOut.write(bytes); + fsOut.close(); + return true; + } catch (IOException e) { + LOG.warn("Failed to update bucket metadata: " + metadata, e); + } + return false; + } + + public class ConsistentBucketIndexLocationMapper implements BucketIndexLocationMapper { + + /** + * Mapping from partitionPath -> bucket identifier + */ + private final Map partitionToIdentifier; + + public ConsistentBucketIndexLocationMapper(HoodieTable table, List partitions) { + // TODO maybe parallel + partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p -> p, p -> { + HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p); + return new ConsistentBucketIdentifier(metadata); + })); + } + + @Override + public Option getRecordLocation(HoodieKey key, String partitionPath) { + ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); + if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) { + /** + * Dynamic Bucket Index doesn't need the instant time of the latest file group. + * We add suffix 0 here to the file uuid, following the naming convention, i.e., fileId = [uuid]_[numWrites] + */ + return Option.of(new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPrefix(), 0))); + } + + LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: " + + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString()); + throw new HoodieIndexException("Failed to getBucket as hashing node has no file group"); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java new file mode 100644 index 000000000000..e0bc22f70d23 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.RealtimeFileStatus; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; +import org.apache.hudi.testutils.MetadataMergeWriteStatus; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Test consistent hashing index + */ +@Tag("functional") +public class TestConsistentBucketIndex extends HoodieClientTestHarness { + + private final Random random = new Random(1); + private HoodieIndex index; + private HoodieWriteConfig config; + + private static Stream configParams() { + // preserveMetaField, partitioned + Object[][] data = new Object[][] { + {true, false}, + {false, false}, + {true, true}, + {false, true}, + }; + return Stream.of(data).map(Arguments::of); + } + + private void setUp(boolean populateMetaFields, boolean partitioned) throws Exception { + initPath(); + initSparkContexts(); + if (partitioned) { + initTestDataGenerator(); + } else { + initTestDataGenerator(new String[] {""}); + } + initFileSystem(); + Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props); + config = getConfigBuilder() + .withProperties(props) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .fromProperties(props) + .withIndexType(HoodieIndex.IndexType.BUCKET) + .withIndexKeyField("_row_key") + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) + .build()) + .withAutoCommit(false) + .build(); + writeClient = getHoodieWriteClient(config); + index = writeClient.getIndex(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupResources(); + } + + /** + * Test bucket index tagging (always tag regardless of the write status) + * Test bucket index tagging consistency, two tagging result should be same + * + * @param populateMetaFields + * @param partitioned + * @throws Exception + */ + @ParameterizedTest + @MethodSource("configParams") + public void testTagLocation(boolean populateMetaFields, boolean partitioned) throws Exception { + setUp(populateMetaFields, partitioned); + String newCommitTime = "001"; + int totalRecords = 20 + random.nextInt(20); + List records = dataGen.generateInserts(newCommitTime, totalRecords); + JavaRDD writeRecords = jsc.parallelize(records, 2); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + + // The records should be tagged anyway, even though it is the first time doing tagging + List taggedRecord = tagLocation(index, writeRecords, hoodieTable).collect(); + Assertions.assertTrue(taggedRecord.stream().allMatch(r -> r.isCurrentLocationKnown())); + + // Tag again, the records should get the same location (hashing metadata has been persisted after the first tagging) + List taggedRecord2 = tagLocation(index, writeRecords, hoodieTable).collect(); + for (HoodieRecord ref : taggedRecord) { + for (HoodieRecord record : taggedRecord2) { + if (ref.getRecordKey().equals(record.getRecordKey())) { + Assertions.assertEquals(ref.getCurrentLocation(), record.getCurrentLocation()); + break; + } + } + } + } + + @ParameterizedTest + @MethodSource("configParams") + public void testWriteData(boolean populateMetaFields, boolean partitioned) throws Exception { + setUp(populateMetaFields, partitioned); + String newCommitTime = "001"; + int totalRecords = 20 + random.nextInt(20); + List records = dataGen.generateInserts(newCommitTime, totalRecords); + JavaRDD writeRecords = jsc.parallelize(records, 2); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Insert totalRecords records + writeClient.startCommitWithTime(newCommitTime); + List writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect(); + org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); + boolean success = writeClient.commitStats(newCommitTime, writeStatues.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); + Assertions.assertTrue(success); + metaClient = HoodieTableMetaClient.reload(metaClient); + // The number of distinct fileId should be the same as total log file numbers + Assertions.assertEquals(writeStatues.stream().map(WriteStatus::getFileId).distinct().count(), + Arrays.stream(dataGen.getPartitionPaths()).mapToInt(p -> Objects.requireNonNull(listStatus(p, true)).length).sum()); + Assertions.assertEquals(totalRecords, readRecords(dataGen.getPartitionPaths(), populateMetaFields).size()); + + // Upsert the same set of records, the number of records should be same + newCommitTime = "002"; + writeClient.startCommitWithTime(newCommitTime); + writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect(); + org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); + success = writeClient.commitStats(newCommitTime, writeStatues.stream() + .map(WriteStatus::getStat) + .collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); + Assertions.assertTrue(success); + // The number of log file should double after this insertion + long numberOfLogFiles = Arrays.stream(dataGen.getPartitionPaths()) + .mapToInt(p -> { + return Arrays.stream(listStatus(p, true)).mapToInt(fs -> + fs instanceof RealtimeFileStatus ? ((RealtimeFileStatus) fs).getDeltaLogFiles().size() : 1).sum(); + }).sum(); + Assertions.assertEquals(writeStatues.stream().map(WriteStatus::getFileId).distinct().count() * 2, numberOfLogFiles); + // The record number should remain same because of deduplication + Assertions.assertEquals(totalRecords, readRecords(dataGen.getPartitionPaths(), populateMetaFields).size()); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + // Upsert new set of records, and validate the total number of records + newCommitTime = "003"; + records = dataGen.generateInserts(newCommitTime, totalRecords); + writeRecords = jsc.parallelize(records, 2); + writeClient.startCommitWithTime(newCommitTime); + writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect(); + org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); + success = writeClient.commitStats(newCommitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), metaClient.getCommitActionType()); + Assertions.assertTrue(success); + Assertions.assertEquals(totalRecords * 2, readRecords(dataGen.getPartitionPaths(), populateMetaFields).size()); + } + + private List readRecords(String[] partitions, boolean populateMetaFields) { + return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, + Arrays.stream(partitions).map(p -> Paths.get(basePath, p).toString()).collect(Collectors.toList()), + basePath, new JobConf(hadoopConf), true, populateMetaFields); + } + + private FileStatus[] listStatus(String p, boolean realtime) { + JobConf jobConf = new JobConf(hadoopConf); + FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, p).toString()); + FileInputFormat format = HoodieInputFormatUtils.getInputFormat(HoodieFileFormat.PARQUET, realtime, jobConf); + try { + if (realtime) { + return ((HoodieParquetRealtimeInputFormat) format).listStatus(jobConf); + } else { + return ((HoodieParquetInputFormat) format).listStatus(jobConf); + } + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + private HoodieWriteConfig.Builder getConfigBuilder() { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withWriteStatusClass(MetadataMergeWriteStatus.class) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build()) + .forTable("test-trip-table") + .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java index 024cf1ff50ac..8cbb74e6f5e0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java @@ -131,6 +131,9 @@ private void setUp(IndexType indexType, boolean populateMetaFields, boolean roll HoodieIndexConfig.Builder indexBuilder = HoodieIndexConfig.newBuilder().withIndexType(indexType) .fromProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()) .withIndexType(indexType); + if (indexType == IndexType.BUCKET) { + indexBuilder.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE); + } config = getConfigBuilder() .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()) .withRollbackUsingMarkers(rollbackUsingMarkers) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java index 171403eb0384..b84354679947 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java @@ -26,7 +26,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; -import org.apache.hudi.index.bucket.HoodieBucketIndex; +import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex; +import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex; import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieSimpleIndex; @@ -88,8 +89,15 @@ public void testCreateIndex(IndexType indexType) { break; case BUCKET: config = clientConfigBuilder.withPath(basePath) - .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET).build()).build(); - assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieBucketIndex); + .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build()).build(); + assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSimpleBucketIndex); + + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build()) + .build(); + assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSparkConsistentBucketIndex); break; default: // no -op. just for checkstyle errors diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java similarity index 91% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java index 2b3765948bb6..c8b877cecad1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java @@ -52,10 +52,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieBucketIndex extends HoodieClientTestHarness { +public class TestHoodieSimpleBucketIndex extends HoodieClientTestHarness { - private static final Logger LOG = LogManager.getLogger(TestHoodieBucketIndex.class); - private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBucketIndex.class, "/exampleSchema.avsc", true); + private static final Logger LOG = LogManager.getLogger(TestHoodieSimpleBucketIndex.class); + private static final Schema SCHEMA = getSchemaFromResource(TestHoodieSimpleBucketIndex.class, "/exampleSchema.avsc", true); private static final int NUM_BUCKET = 8; @BeforeEach @@ -78,11 +78,15 @@ public void testBucketIndexValidityCheck() { props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "_row_key"); assertThrows(HoodieIndexException.class, () -> { HoodieIndexConfig.newBuilder().fromProperties(props) - .withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("8").build(); + .withIndexType(HoodieIndex.IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) + .withBucketNum("8").build(); }); props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid"); HoodieIndexConfig.newBuilder().fromProperties(props) - .withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("8").build(); + .withIndexType(HoodieIndex.IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) + .withBucketNum("8").build(); } @Test @@ -110,7 +114,7 @@ public void testTagLocation() throws Exception { HoodieWriteConfig config = makeConfig(); HoodieTable table = HoodieSparkTable.create(config, context, metaClient); - HoodieBucketIndex bucketIndex = new HoodieBucketIndex(config); + HoodieSimpleBucketIndex bucketIndex = new HoodieSimpleBucketIndex(config); HoodieData> taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context, table); assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown())); @@ -133,6 +137,7 @@ private HoodieWriteConfig makeConfig() { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()) .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props) .withIndexType(HoodieIndex.IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) .withIndexKeyField("_row_key") .withBucketNum(String.valueOf(NUM_BUCKET)).build()).build(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 9574d35a6541..30f7ad66543d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -148,7 +148,10 @@ private Properties makeIndexConfig(HoodieIndex.IndexType indexType) { props.putAll(indexConfig.build().getProps()); if (indexType.equals(HoodieIndex.IndexType.BUCKET)) { props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); - indexConfig.fromProperties(props).withIndexKeyField("_row_key").withBucketNum("1"); + indexConfig.fromProperties(props) + .withIndexKeyField("_row_key") + .withBucketNum("1") + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE); props.putAll(indexConfig.build().getProps()); props.putAll(HoodieLayoutConfig.newBuilder().fromProperties(props) .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index 4e8d2b7eceae..4b391ecbab75 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -77,6 +77,15 @@ public abstract class HoodieData implements Serializable { public abstract HoodieData mapPartitions( SerializableFunction, Iterator> func, boolean preservesPartitioning); + /** + * @param func serializable map function by taking a partition of objects + * and generating an iterator. + * @param output object type. + * @return {@link HoodieData} containing the result. Actual execution may be deferred. + */ + public abstract HoodieData mapPartitions( + SerializableFunction, Iterator> func); + /** * @param func serializable flatmap function. * @param output object type. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java index c23e712cf41a..28ed2e282deb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java @@ -99,6 +99,11 @@ public HoodieData map(SerializableFunction func) { @Override public HoodieData mapPartitions(SerializableFunction, Iterator> func, boolean preservesPartitioning) { + return mapPartitions(func); + } + + @Override + public HoodieData mapPartitions(SerializableFunction, Iterator> func) { List result = new ArrayList<>(); throwingMapWrapper(func).apply(listData.iterator()).forEachRemaining(result::add); return HoodieList.of(result); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 79badb48a589..aa0cadf5b935 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -348,6 +348,10 @@ public static String createNewFileIdPfx() { return UUID.randomUUID().toString(); } + public static String createNewFileId(String idPfx, int id) { + return String.format("%s-%d", idPfx, id); + } + /** * Get the file extension from the log file. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java b/hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java new file mode 100644 index 000000000000..262bb963223b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.JsonUtils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Used in consistent hashing index, representing nodes in the consistent hash ring. + * Record the end hash range value and its corresponding file group id. + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class ConsistentHashingNode implements Serializable { + + private final int value; + private final String fileIdPrefix; + + @JsonCreator + public ConsistentHashingNode(@JsonProperty("value") int value, @JsonProperty("fileIdPrefix") String fileIdPrefix) { + this.value = value; + this.fileIdPrefix = fileIdPrefix; + } + + public static String toJsonString(List nodes) throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(nodes); + } + + public static List fromJsonString(String json) throws Exception { + if (json == null || json.isEmpty()) { + return Collections.emptyList(); + } + + ConsistentHashingNode[] nodes = JsonUtils.getObjectMapper().readValue(json, ConsistentHashingNode[].class); + return Arrays.asList(nodes); + } + + public int getValue() { + return value; + } + + public String getFileIdPrefix() { + return fileIdPrefix; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ConsistentHashingNode{"); + sb.append("value=").append(value); + sb.append(", fileIdPfx='").append(fileIdPrefix).append('\''); + sb.append('}'); + return sb.toString(); + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 53ceb00409ac..f5077dea859a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -18,17 +18,15 @@ package org.apache.hudi.common.model; -import com.fasterxml.jackson.annotation.JsonAutoDetect; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.JsonUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -227,7 +225,7 @@ public String toJsonString() throws IOException { LOG.info("partition path is null for " + partitionToWriteStats.get(null)); partitionToWriteStats.remove(null); } - return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); } public static T fromJsonString(String jsonStr, Class clazz) throws Exception { @@ -235,7 +233,7 @@ public static T fromJsonString(String jsonStr, Class clazz) throws Except // For empty commit file (no data or somethings bad happen). return clazz.newInstance(); } - return getObjectMapper().readValue(jsonStr, clazz); + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); } // Here the functions are named "fetch" instead of "get", to get avoid of the json conversion. @@ -457,13 +455,6 @@ public static T fromBytes(byte[] bytes, Class clazz) throws IOException { } } - protected static ObjectMapper getObjectMapper() { - ObjectMapper mapper = new ObjectMapper(); - mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - return mapper; - } - @Override public String toString() { return "HoodieCommitMetadata{" + "partitionToWriteStats=" + partitionToWriteStats diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java new file mode 100644 index 000000000000..46f115262745 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.JsonUtils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * All the metadata that is used for consistent hashing bucket index + */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class HoodieConsistentHashingMetadata implements Serializable { + + private static final Logger LOG = LogManager.getLogger(HoodieConsistentHashingMetadata.class); + /** + * Upper-bound of the hash value + */ + public static final int HASH_VALUE_MASK = Integer.MAX_VALUE; + public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta"; + + private final short version; + private final String partitionPath; + private final String instant; + private final int numBuckets; + private final int seqNo; + private final List nodes; + + @JsonCreator + public HoodieConsistentHashingMetadata(@JsonProperty("version") short version, @JsonProperty("partitionPath") String partitionPath, + @JsonProperty("instant") String instant, @JsonProperty("numBuckets") int numBuckets, + @JsonProperty("seqNo") int seqNo, @JsonProperty("nodes") List nodes) { + this.version = version; + this.partitionPath = partitionPath; + this.instant = instant; + this.numBuckets = numBuckets; + this.seqNo = seqNo; + this.nodes = nodes; + } + + /** + * Construct default metadata with all bucket's file group uuid initialized + */ + public HoodieConsistentHashingMetadata(String partitionPath, int numBuckets) { + this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets, 0, constructDefaultHashingNodes(numBuckets)); + } + + private static List constructDefaultHashingNodes(int numBuckets) { + long step = ((long) HASH_VALUE_MASK + numBuckets - 1) / numBuckets; + return IntStream.range(1, numBuckets + 1) + .mapToObj(i -> new ConsistentHashingNode((int) Math.min(step * i, HASH_VALUE_MASK), FSUtils.createNewFileIdPfx())).collect(Collectors.toList()); + } + + public short getVersion() { + return version; + } + + public String getPartitionPath() { + return partitionPath; + } + + public String getInstant() { + return instant; + } + + public int getNumBuckets() { + return numBuckets; + } + + public int getSeqNo() { + return seqNo; + } + + public List getNodes() { + return nodes; + } + + public String getFilename() { + return instant + HASHING_METADATA_FILE_SUFFIX; + } + + public byte[] toBytes() throws IOException { + return toJsonString().getBytes(StandardCharsets.UTF_8); + } + + public static HoodieConsistentHashingMetadata fromBytes(byte[] bytes) throws IOException { + try { + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), HoodieConsistentHashingMetadata.class); + } catch (Exception e) { + throw new IOException("unable to read hashing metadata", e); + } + } + + private String toJsonString() throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + protected static T fromJsonString(String jsonStr, Class clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or something bad happen). + return clazz.newInstance(); + } + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); + } + + /** + * Get instant time from the hashing metadata filename + * Pattern of the filename: .HASHING_METADATA_FILE_SUFFIX + */ + public static String getTimestampFromFile(String filename) { + return filename.split("\\.")[0]; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java index 7cc9ee3a0c14..2dd6cda47d3d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java @@ -18,11 +18,9 @@ package org.apache.hudi.common.model; -import com.fasterxml.jackson.annotation.JsonAutoDetect; +import org.apache.hudi.common.util.JsonUtils; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -80,7 +78,7 @@ public String toJsonString() throws IOException { LOG.info("partition path is null for " + partitionToReplaceFileIds.get(null)); partitionToReplaceFileIds.remove(null); } - return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); } public static T fromJsonString(String jsonStr, Class clazz) throws Exception { @@ -88,7 +86,7 @@ public static T fromJsonString(String jsonStr, Class clazz) throws Except // For empty commit file (no data or somethings bad happen). return clazz.newInstance(); } - return getObjectMapper().readValue(jsonStr, clazz); + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); } @Override @@ -124,13 +122,6 @@ public static T fromBytes(byte[] bytes, Class clazz) throws IOException { } } - protected static ObjectMapper getObjectMapper() { - ObjectMapper mapper = new ObjectMapper(); - mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); - return mapper; - } - @Override public String toString() { return "HoodieReplaceMetadata{" + "partitionToWriteStats=" + partitionToWriteStats diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java index a354092675e4..0a5240ed55d8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.util.JsonUtils; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -81,7 +83,7 @@ public String toJsonString() throws IOException { LOG.info("partition path is null for " + partitionToRollingStats.get(null)); partitionToRollingStats.remove(null); } - return HoodieCommitMetadata.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); } public HoodieRollingStatMetadata merge(HoodieRollingStatMetadata rollingStatMetadata) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 6b10a62820e3..546ddf7a3067 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -85,6 +85,7 @@ public class HoodieTableMetaClient implements Serializable { public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata"; + public static final String HASHING_METADATA_FOLDER_NAME = ".bucket_index" + Path.SEPARATOR + "consistent_hashing_metadata"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR @@ -211,6 +212,13 @@ public String getSchemaFolderName() { return new Path(metaPath.get(), SCHEMA_FOLDER_NAME).toString(); } + /** + * @return Hashing metadata base path + */ + public String getHashingMetadataPath() { + return new Path(metaPath.get(), HASHING_METADATA_FOLDER_NAME).toString(); + } + /** * @return Temp Folder path */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java new file mode 100644 index 000000000000..d820bde178e1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class JsonUtils { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + static { + MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + } + + public static ObjectMapper getObjectMapper() { + return MAPPER; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java index c56d76097866..ccb29dfbb580 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java @@ -106,6 +106,15 @@ public static byte[] hash(final byte[] messageBytes, final Size bits) { } } + public static int getXXHash32(final String message, int hashSeed) { + return getXXHash32(message.getBytes(StandardCharsets.UTF_8), hashSeed); + } + + public static int getXXHash32(final byte[] message, int hashSeed) { + XXHashFactory factory = XXHashFactory.fastestInstance(); + return factory.hash32().hash(message, 0, message.length, hashSeed); + } + private static byte[] getXXHash(final byte[] message, final Size bits) { XXHashFactory factory = XXHashFactory.fastestInstance(); switch (bits) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java new file mode 100644 index 000000000000..8aa2e65561c5 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestHoodieConsistentHashingMetadata { + + @Test + public void testGetTimestamp() { + Assertions.assertTrue(HoodieConsistentHashingMetadata.getTimestampFromFile("0000.hashing_metadata").equals("0000")); + Assertions.assertTrue(HoodieConsistentHashingMetadata.getTimestampFromFile("1234.hashing_metadata").equals("1234")); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java index 311c131d432c..dc64856d3c76 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java @@ -66,6 +66,10 @@ protected void initTestDataGenerator() { dataGen = new HoodieTestDataGenerator(); } + protected void initTestDataGenerator(String[] partitionPaths) { + dataGen = new HoodieTestDataGenerator(partitionPaths); + } + /** * Cleanups test data generator. * diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java deleted file mode 100644 index 4491a74fa62b..000000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.index.bucket; - -import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.keygen.KeyGenUtils; -import org.apache.hudi.testutils.KeyGeneratorTestUtilities; - -import org.apache.avro.generic.GenericRecord; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; - -public class TestBucketIdentifier { - - @Test - public void testBucketFileId() { - for (int i = 0; i < 1000; i++) { - String bucketId = BucketIdentifier.bucketIdStr(i); - String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketId); - assert BucketIdentifier.bucketIdFromFileId(fileId) == i; - } - } - - @Test - public void testBucketIdWithSimpleRecordKey() { - String recordKeyField = "_row_key"; - String indexKeyField = "_row_key"; - GenericRecord record = KeyGeneratorTestUtilities.getRecord(); - HoodieRecord hoodieRecord = new HoodieAvroRecord( - new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null); - int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); - assert bucketId == BucketIdentifier.getBucketId( - Arrays.asList(record.get(indexKeyField).toString()), 8); - } - - @Test - public void testBucketIdWithComplexRecordKey() { - List recordKeyField = Arrays.asList("_row_key","ts_ms"); - String indexKeyField = "_row_key"; - GenericRecord record = KeyGeneratorTestUtilities.getRecord(); - HoodieRecord hoodieRecord = new HoodieAvroRecord( - new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null); - int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8); - assert bucketId == BucketIdentifier.getBucketId( - Arrays.asList(record.get(indexKeyField).toString()), 8); - } -}