diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java index baede154c99e4..a45b8a9aaa3a5 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java @@ -50,6 +50,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -316,15 +317,20 @@ public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception { assertEquals(expectedKey50and0s, recordsByPrefix); // filter for "key1" and "key0" : entries from 'key10 to key19' and 'key00 to key09' should be matched. - List expectedKey1sand0s = expectedKey1s; - expectedKey1sand0s.addAll(allRecords.stream() - .filter(entry -> (entry.get("_row_key").toString()).contains("key0")) - .collect(Collectors.toList())); + List expectedKey1sand0s = allRecords.stream() + .filter(entry -> (entry.get("_row_key").toString()).contains("key1") || (entry.get("_row_key").toString()).contains("key0")) + .collect(Collectors.toList()); iterator = hfileReader.getRecordsByKeyPrefixIterator(Arrays.asList("key1", "key0"), avroSchema); recordsByPrefix = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) .collect(Collectors.toList()); + Collections.sort(recordsByPrefix, new Comparator() { + @Override + public int compare(GenericRecord o1, GenericRecord o2) { + return o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString()); + } + }); assertEquals(expectedKey1sand0s, recordsByPrefix); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index 0bf31d2a2593e..3e5b3ff6acba0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -259,11 +259,9 @@ private static Iterator getRecordByKeyPrefixIteratorInternal(HFil return Collections.emptyIterator(); } } else if (val == -1) { - // If scanner is aleady on the top of hfile. avoid trigger seekTo again. - Option headerCell = Option.fromJavaOptional(scanner.getReader().getFirstKey()); - if (headerCell.isPresent() && !headerCell.get().equals(scanner.getCell())) { - scanner.seekTo(); - } + // Whenever val == -1 HFile reader will place the pointer right before the first record. We have to advance it to the first record + // of the file to validate whether it matches our search criteria + scanner.seekTo(); } class KeyPrefixIterator implements Iterator { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index e8937b39dc7f1..e96889f044a9a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -144,6 +144,10 @@ protected Option> getRecordByKey(String key, @Override public HoodieData> getRecordsByKeyPrefixes(List keyPrefixes, String partitionName) { + // Sort the columns so that keys are looked up in order + List sortedkeyPrefixes = new ArrayList<>(keyPrefixes); + Collections.sort(sortedkeyPrefixes); + // NOTE: Since we partition records to a particular file-group by full key, we will have // to scan all file-groups for all key-prefixes as each of these might contain some // records matching the key-prefix @@ -171,17 +175,17 @@ public HoodieData> getRecordsByKeyPrefixes(L boolean fullKeys = false; Map>> logRecords = - readLogRecords(logRecordScanner, keyPrefixes, fullKeys, timings); + readLogRecords(logRecordScanner, sortedkeyPrefixes, fullKeys, timings); List>>> mergedRecords = - readFromBaseAndMergeWithLogRecords(baseFileReader, keyPrefixes, fullKeys, logRecords, timings, partitionName); + readFromBaseAndMergeWithLogRecords(baseFileReader, sortedkeyPrefixes, fullKeys, logRecords, timings, partitionName); LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", - keyPrefixes.size(), timings)); + sortedkeyPrefixes.size(), timings)); return mergedRecords.iterator(); } catch (IOException ioe) { - throw new HoodieIOException("Error merging records from metadata table for " + keyPrefixes.size() + " key : ", ioe); + throw new HoodieIOException("Error merging records from metadata table for " + sortedkeyPrefixes.size() + " key : ", ioe); } finally { closeReader(readers); } @@ -194,7 +198,10 @@ public HoodieData> getRecordsByKeyPrefixes(L @Override public List>>> getRecordsByKeys(List keys, String partitionName) { - Map, List> partitionFileSliceToKeysMap = getPartitionFileSliceToKeysMapping(partitionName, keys); + // Sort the columns so that keys are looked up in order + List sortedKeys = new ArrayList<>(keys); + Collections.sort(sortedKeys); + Map, List> partitionFileSliceToKeysMap = getPartitionFileSliceToKeysMapping(partitionName, sortedKeys); List>>> result = new ArrayList<>(); AtomicInteger fileSlicesKeysCount = new AtomicInteger(); partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, fileSliceKeys) -> { @@ -219,7 +226,7 @@ public List>>> getRecord fileSliceKeys.size(), timings)); fileSlicesKeysCount.addAndGet(fileSliceKeys.size()); } catch (IOException ioe) { - throw new HoodieIOException("Error merging records from metadata table for " + keys.size() + " key : ", ioe); + throw new HoodieIOException("Error merging records from metadata table for " + sortedKeys.size() + " key : ", ioe); } finally { if (!reuse) { close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId())); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 75d3ce0b71287..b982b1851c326 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -250,7 +250,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup { // We have to include "c1", since we sort the expected outputs by this column - val requestedColumns = Seq("c1", "c4") + val requestedColumns = Seq("c4", "c1") val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema)