Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3760] Adding capability to fetch Metadata Records by prefix #5208

Merged
merged 73 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
7e33ba2
Added `getRecordsByKeyPrefix` to `HoodieHFileReader`
Apr 1, 2022
cdf5975
Tidying up
Apr 1, 2022
67bec3c
Added `getRecordIteratorByKeyPrefix` to `HoodieHFileReader`;
Apr 1, 2022
57b2c83
Made `HoodieDataBlock` accept keys specified as prefixes;
Apr 1, 2022
0000180
Tidying up
Apr 1, 2022
5d650dd
Modified `AbstractHoodieLogRecordReader` to be able to scan records b…
Apr 1, 2022
87bd0df
Adjusted `HoodieFileReader` interface
Apr 1, 2022
aebceba
Add `HoodieBackedTableMetadata::getRecordsByKeyPrefixes`
Apr 1, 2022
4b07e6c
Added test for `HFileReader` seq fetching records by key-prefixes
Apr 1, 2022
f048d24
Fixing compilation
Apr 2, 2022
c8a3566
Fixing read from Metadata table for col stats partition
nsivabalan Apr 2, 2022
52d2ad0
Fixing prefix lookup in HFile reader and enhancing tests for the same
nsivabalan Apr 2, 2022
184a292
Adding test to TestHoodieBackedMetadata to test prefix look up in col…
nsivabalan Apr 2, 2022
fa2b889
Fixing determining col stats partition availability in dataskipping c…
nsivabalan Apr 3, 2022
d32ff02
Fixing compilation error in test with rebase
nsivabalan Apr 3, 2022
e894a83
Fixing HFileScanner usage and HoodieData for prefix based look up wit…
nsivabalan Apr 4, 2022
2b57016
fixing arg name for useCachedReaders
nsivabalan Apr 4, 2022
260d2b3
Fixing test failures related to write schema not added to Hfile on th…
nsivabalan Apr 4, 2022
166791d
Extracted common utils
Apr 3, 2022
032f0f6
Rebased `ColumnStatsIndexSupport` to fetch CSI records by key-prefix …
Apr 3, 2022
27a900b
Tidying up
Apr 3, 2022
578c1af
Rebased `HoodieFileIndex` onto updated API
Apr 3, 2022
bb15c1f
Fixed `HoodieDataBlock` to properly handle when records are fetched b…
Apr 3, 2022
fc6788c
Fixed `HoodieBackedTableMetadata` to avoid doing full-scans
Apr 3, 2022
039d597
Fixed tests
Apr 3, 2022
8f3a919
Fixing compilation
Apr 4, 2022
558892d
Fixed iterator to properly loop
Apr 4, 2022
3676540
Fixed incorrect key-prefix access gen
Apr 4, 2022
672a23a
Tidying up
Apr 4, 2022
341e338
Removed references to HFile from `HoodieFileReader`;
Apr 4, 2022
edc99f9
Cleaned up `HoodieFileReader` APIs;
Apr 4, 2022
3884f07
Cleaned up `HoodieHFileReader` APIs;
Apr 4, 2022
b55a0d3
Fixing compilation
Apr 4, 2022
4d0a0ba
`lint`
Apr 4, 2022
d24ccf7
Removed APIs which dereference iterators from `HoodieFileReader`, ins…
Apr 4, 2022
e580150
Fixing compilation
Apr 4, 2022
63c7160
Fixed records iterators
Apr 4, 2022
b4c9ef6
Fixed tests
Apr 4, 2022
974d903
Fixing some more
Apr 4, 2022
fe1f54f
Fixing a little more
Apr 4, 2022
53a0369
Fixing task being non-serializable
Apr 4, 2022
eb6773a
Tidying up
Apr 4, 2022
51d708c
Consolidated configuration around `forceFullScan` property for `LogRe…
Apr 4, 2022
54c5ebf
Adding assertions
Apr 4, 2022
8a43c10
Test with all log-files scan modes
Apr 4, 2022
44ccc1b
Killed dead-code
Apr 4, 2022
9189246
Fixing incorrect sorting of the keys
Apr 5, 2022
917a6d4
Fixing tests
Apr 5, 2022
d3d03e3
Tidying up
Apr 5, 2022
4aedf0d
Cleaned up `ColumnStatsIndexSupport` removing duplication
Apr 5, 2022
4fb6dcc
Fixed schemas used to read from MT, to be fetched from HFile
Apr 5, 2022
c84616b
Fixed `TestHoodieFileIndex`
Apr 5, 2022
42144d0
Tidying up
Apr 5, 2022
6abff9e
Disallow full-scans for "column_stats", "bloom_filters" partitions
Apr 5, 2022
bc2de38
Killed `fsDataInputStream`
Apr 5, 2022
a10a6cc
Rebased `schema` field to become final, thread-safe
Apr 5, 2022
8f99436
Tidying up
Apr 5, 2022
7f7a75d
Tidying up
Apr 5, 2022
b035ff3
Added `sharedScanner` instance to be re-used by point-lookup queries
Apr 5, 2022
b911163
Fixed `RecordIterator`, `RecordByKeysIterator`
Apr 5, 2022
87ff41f
Fixed `RecordByKeyPrefixIterator`
Apr 5, 2022
ade64a5
Tidying up
Apr 5, 2022
a640f38
Missing license
Apr 5, 2022
885b577
Fixing tests
Apr 5, 2022
10551b6
Fixed `RecordByKeyPrefixIterator` to properly handle EOF
Apr 5, 2022
4980837
Rebased `HoodieMergeOnReadRDD` to use Metadata Config from the File I…
Apr 5, 2022
1ce0aa9
Fixed opts for MT reads
Apr 5, 2022
0370146
Force MT full-scan when reading it from Spark DS;
Apr 5, 2022
92aab50
Fixed query-type missing from config props in Spark Streaming
Apr 5, 2022
47c2631
Added fallback to default val for `QUERY_TYPE` config in Spark File I…
Apr 5, 2022
40613eb
Fixed `TestLayoutOptimization` missing required MT configs
Apr 6, 2022
f110928
Cleaned up duplicated methods
Apr 6, 2022
0dcfe21
Close reader and remove public API from metadata payload
codope Apr 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordItr().forEachRemaining(readRecords::add);
blk.getRecordIterator().forEachRemaining(readRecords::add);
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
Expand Down Expand Up @@ -155,7 +155,7 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(readRecords::add);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
while (recordItr.hasNext()) {
IndexedRecord ir = recordItr.next();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
Expand Down Expand Up @@ -236,7 +236,7 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordItr()) {
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
allRecords.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOExcept
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordItr().forEachRemaining(records::add);
blk.getRecordIterator().forEachRemaining(records::add);
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
.withFileContext(context)
.create();

writer.appendFileInfo(HoodieHFileReader.KEY_SCHEMA.getBytes(), schema.toString().getBytes());
writer.appendFileInfo(HoodieHFileReader.SCHEMA_KEY.getBytes(), schema.toString().getBytes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -39,7 +28,17 @@
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -51,21 +50,25 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
import static org.apache.hudi.io.storage.HoodieHFileReader.KEY_SCHEMA;
import static org.apache.hudi.io.storage.HoodieHFileReader.SCHEMA_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -124,7 +127,7 @@ protected void verifySchema(Configuration conf, String schemaPath) throws IOExce
FileSystem fs = getFilePath().getFileSystem(conf);
HFile.Reader hfileReader = HoodieHFileUtils.createHFileReader(fs, getFilePath(), new CacheConfig(conf), conf);
assertEquals(getSchemaFromResource(TestHoodieHFileReaderWriter.class, schemaPath),
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(KEY_SCHEMA.getBytes()))));
new Schema.Parser().parse(new String(hfileReader.getHFileInfo().get(SCHEMA_KEY.getBytes()))));
}

private static Stream<Arguments> populateMetaFieldsAndTestAvroWithMeta() {
Expand All @@ -142,7 +145,7 @@ public void testWriteReadHFileWithMetaFields(boolean populateMetaFields, boolean
Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithMetaFields.avsc");
HoodieFileWriter<GenericRecord> writer = createWriter(avroSchema, populateMetaFields);
List<String> keys = new ArrayList<>();
Map<String, GenericRecord> recordMap = new HashMap<>();
Map<String, GenericRecord> recordMap = new TreeMap<>();
for (int i = 0; i < 100; i++) {
GenericRecord record = new GenericData.Record(avroSchema);
String key = String.format("%s%04d", "key", i);
Expand All @@ -163,24 +166,30 @@ public void testWriteReadHFileWithMetaFields(boolean populateMetaFields, boolean

Configuration conf = new Configuration();
HoodieHFileReader hoodieHFileReader = (HoodieHFileReader) createReader(conf);
List<Pair<String, IndexedRecord>> records = hoodieHFileReader.readAllRecords();
records.forEach(entry -> assertEquals(entry.getSecond(), recordMap.get(entry.getFirst())));
List<IndexedRecord> records = HoodieHFileReader.readAllRecords(hoodieHFileReader);
assertEquals(new ArrayList<>(recordMap.values()), records);

hoodieHFileReader.close();

for (int i = 0; i < 2; i++) {
int randomRowstoFetch = 5 + RANDOM.nextInt(10);
Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);

List<String> rowsList = new ArrayList<>(rowsToFetch);
Collections.sort(rowsList);
hoodieHFileReader = (HoodieHFileReader) createReader(conf);
List<Pair<String, GenericRecord>> result = hoodieHFileReader.readRecords(rowsList);
assertEquals(result.size(), randomRowstoFetch);

List<GenericRecord> expectedRecords = rowsList.stream().map(recordMap::get).collect(Collectors.toList());

hoodieHFileReader = (HoodieHFileReader<GenericRecord>) createReader(conf);
List<GenericRecord> result = HoodieHFileReader.readRecords(hoodieHFileReader, rowsList);

assertEquals(expectedRecords, result);

result.forEach(entry -> {
assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
if (populateMetaFields && testAvroWithMeta) {
assertNotNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNotNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
assertNull(entry.getSecond().get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
assertNull(entry.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
}
});
hoodieHFileReader.close();
Expand All @@ -202,7 +211,7 @@ public void testReadHFileFormatRecords() throws Exception {
fs.open(getFilePath()), (int) fs.getFileStatus(getFilePath()).getLen());
// Reading byte array in HFile format, without actual file path
HoodieHFileReader<GenericRecord> hfileReader =
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
assertEquals(NUM_RECORDS, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
Expand All @@ -217,7 +226,7 @@ public void testReaderGetRecordIterator() throws Exception {
IntStream.concat(IntStream.range(40, NUM_RECORDS * 2), IntStream.range(10, 20))
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toList());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
Iterator<GenericRecord> iterator = hfileReader.getRecordIterator(keys, avroSchema);
Iterator<GenericRecord> iterator = hfileReader.getRecordsByKeysIterator(keys, avroSchema);

List<Integer> expectedIds =
IntStream.concat(IntStream.range(40, NUM_RECORDS), IntStream.range(10, 20))
Expand All @@ -233,6 +242,59 @@ public void testReaderGetRecordIterator() throws Exception {
}
}

@Test
public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
writeFileWithSimpleSchema();
HoodieHFileReader<GenericRecord> hfileReader =
(HoodieHFileReader<GenericRecord>) createReader(new Configuration());

Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");

List<String> keyPrefixes = Collections.singletonList("key");
Iterator<GenericRecord> iterator =
hfileReader.getRecordsByKeyPrefixIterator(keyPrefixes, avroSchema);

List<GenericRecord> recordsByPrefix = toStream(iterator).collect(Collectors.toList());

List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator()).collect(Collectors.toList());

assertEquals(allRecords, recordsByPrefix);

// filter for "key1" : entries from key10 to key19 should be matched
List<GenericRecord> expectedKey1s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key1")).collect(Collectors.toList());
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(expectedKey1s, recordsByPrefix);

// exact match
List<GenericRecord> expectedKey25 = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key25")).collect(Collectors.toList());
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key25"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(expectedKey25, recordsByPrefix);

// no match. key prefix is beyond entries in file.
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key99"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);

// no match. but keyPrefix is in between the entries found in file.
iterator =
hfileReader.getRecordsByKeyPrefixIterator(Collections.singletonList("key1234"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);
}

@ParameterizedTest
@ValueSource(strings = {
"/hudi_0_9_hbase_1_2_3", "/hudi_0_10_hbase_1_2_3", "/hudi_0_11_hbase_2_4_9"})
Expand All @@ -253,15 +315,15 @@ public void testHoodieHFileCompatibility(String hfilePrefix) throws IOException
HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
HoodieHFileReader<GenericRecord> hfileReader =
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");
assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));

content = readHFileFromResources(complexHFile);
verifyHFileReader(HoodieHFileUtils.createHFileReader(fs, new Path(DUMMY_BASE_PATH), content),
hfilePrefix, true, HFILE_COMPARATOR.getClass(), NUM_RECORDS_FIXTURE);
hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content);
hfileReader = new HoodieHFileReader<>(fs, new Path(DUMMY_BASE_PATH), content, Option.empty());
avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchemaWithUDT.avsc");
assertEquals(NUM_RECORDS_FIXTURE, hfileReader.getTotalRecords());
verifySimpleRecords(hfileReader.getRecordIterator(avroSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -217,7 +218,7 @@ protected void verifyComplexRecords(Iterator<GenericRecord> iterator) {

private void verifyFilterRowKeys(HoodieFileReader<GenericRecord> hoodieReader) {
Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toSet());
.mapToObj(i -> "key" + String.format("%02d", i)).collect(Collectors.toCollection(TreeSet::new));
List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
.mapToObj(i -> "key" + String.format("%02d", i)).sorted().collect(Collectors.toList());
assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi

import org.apache.hudi.common.config.TypedProperties

object HoodieConversionUtils {

def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] =
Expand All @@ -26,4 +28,10 @@ object HoodieConversionUtils {
def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] =
if (opt.isPresent) Some(opt.get) else None

def toProperties(params: Map[String, String]): TypedProperties = {
val props = new TypedProperties()
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}

}
Loading