diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index dbfe6cfc0648e..7daa6b9416f49 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -18,6 +18,9 @@ package org.apache.hudi.metadata; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -33,17 +36,16 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -51,10 +53,6 @@ import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -170,11 +168,13 @@ public HoodieData> getRecordsByKeyPrefixes(L return Collections.emptyIterator(); } + boolean fullKeys = false; + Map>> logRecords = - readLogRecordsWithKeyPrefix(logRecordScanner, keyPrefixes, timings); + readLogRecords(logRecordScanner, keyPrefixes, fullKeys, timings); List>>> mergedRecords = - readFromBaseAndMergeWithLogRecordsForKeyPrefixes(baseFileReader, keyPrefixes, logRecords, timings, partitionName); + readFromBaseAndMergeWithLogRecords(baseFileReader, keyPrefixes, fullKeys, logRecords, timings, partitionName); LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", keyPrefixes.size(), timings)); @@ -235,58 +235,37 @@ private Map>> readLogRecords( boolean fullKey, List timings) { HoodieTimer timer = new HoodieTimer().startTimer(); - Map>> logRecords = new HashMap<>(); - // Retrieve records from log file timer.startTimer(); - if (logRecordScanner != null) { - String partitionName = logRecordScanner.getPartitionName().get(); - if (isFullScanAllowedForPartition(partitionName)) { - checkArgument(fullKey, "If full-scan is required, only full keys could be used!"); - // Path which does full scan of log files - for (String key : keys) { - logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue()); - } - } else { - // This path will do seeks pertaining to the keys passed in - List>>> logRecordsList = - fullKey ? logRecordScanner.getRecordsByKeys(keys) : logRecordScanner.getRecordsByKeyPrefixes(keys); - - for (Pair>> entry : logRecordsList) { - logRecords.put(entry.getKey(), entry.getValue()); - } - } - } else { - for (String key : keys) { - logRecords.put(key, Option.empty()); - } - } - timings.add(timer.endTimer()); - return logRecords; - } - private Map>> readLogRecordsWithKeyPrefix(HoodieMetadataMergedLogRecordReader logRecordScanner, - List keys, - List timings) { - HoodieTimer timer = new HoodieTimer().startTimer(); - timer.startTimer(); if (logRecordScanner == null) { timings.add(timer.endTimer()); return Collections.emptyMap(); } - // Retrieve records from log file - Map>> logRecords = new HashMap<>(); + String partitionName = logRecordScanner.getPartitionName().get(); - // This path will do seeks pertaining to the keys passed in - List>>> logRecordsList = logRecordScanner.getRecordsByKeyPrefixes(keys); - // with prefix look up, return entry count could be more than input size. Also, input keys may not match the keys after look up. - // after look up, keys are fully formed as it seen in the stoage. where as input is a key prefix. - for (Pair>> entry : logRecordsList) { - logRecords.put(entry.getKey(), entry.getValue()); + Map>> logRecords = new HashMap<>(); + if (isFullScanAllowedForPartition(partitionName)) { + checkArgument(fullKey, "If full-scan is required, only full keys could be used!"); + // Path which does full scan of log files + for (String key : keys) { + logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue()); + } + } else { + // This path will do seeks pertaining to the keys passed in + List>>> logRecordsList = + fullKey ? logRecordScanner.getRecordsByKeys(keys) + : logRecordScanner.getRecordsByKeyPrefixes(keys) + .stream() + .map(record -> Pair.of(record.getRecordKey(), Option.of(record))) + .collect(Collectors.toList()); + + for (Pair>> entry : logRecordsList) { + logRecords.put(entry.getKey(), entry.getValue()); + } } timings.add(timer.endTimer()); - return logRecords; } @@ -296,103 +275,83 @@ private List>>> readFrom Map>> logRecords, List timings, String partitionName) throws IOException { - List>>> result = new ArrayList<>(); HoodieTimer timer = new HoodieTimer().startTimer(); timer.startTimer(); - HoodieRecord hoodieRecord; - - // Retrieve record from base file - if (baseFileReader != null) { - HoodieTimer readTimer = new HoodieTimer(); - Map baseFileRecords = - fullKeys ? getRecordsByKeys(baseFileReader, keys) : getRecordsByKeyPrefixes(baseFileReader, keys); - for (String key : keys) { - readTimer.startTimer(); - if (baseFileRecords.containsKey(key)) { - hoodieRecord = getRecord(Option.of(baseFileRecords.get(key)), partitionName); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); - // merge base file record w/ log record if present - if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) { - HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData()); - result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecord.getKey(), mergedPayload)))); - } else { - // only base record - result.add(Pair.of(key, Option.of(hoodieRecord))); - } - } else { - // only log record - result.add(Pair.of(key, logRecords.get(key))); - } - } - timings.add(timer.endTimer()); - } else { - // no base file at all + if (baseFileReader == null) { + // No base file at all timings.add(timer.endTimer()); - for (Map.Entry>> entry : logRecords.entrySet()) { - result.add(Pair.of(entry.getKey(), entry.getValue())); + if (fullKeys) { + // In case full-keys (not key-prefixes) were provided, it's expected that the list of + // records will contain an (optional) entry for each corresponding key + return keys.stream() + .map(key -> Pair.of(key, logRecords.getOrDefault(key, Option.empty()))) + .collect(Collectors.toList()); + } else { + return logRecords.entrySet().stream() + .map(entry -> Pair.of(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); } } - return result; - } - private Map getRecordsByKeyPrefixes(HoodieFileReader baseFileReader, List keyPrefixes) throws IOException { - return toStream(baseFileReader.getRecordsByKeyPrefixIterator(keyPrefixes)) - .map(record -> Pair.of((String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME), record)) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - } + List>>> result = new ArrayList<>(); - private Map getRecordsByKeys(HoodieFileReader baseFileReader, List keys) throws IOException { - return toStream(baseFileReader.getRecordsByKeysIterator(keys)) - .map(record -> Pair.of((String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME), record)) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - } + HoodieTimer readTimer = new HoodieTimer(); + readTimer.startTimer(); - private List>>> readFromBaseAndMergeWithLogRecordsForKeyPrefixes( - HoodieFileReader baseFileReader, - List keys, - Map>> logRecords, - List timings, - String partitionName - ) throws IOException { - List>>> result = new ArrayList<>(); - HoodieTimer timer = new HoodieTimer().startTimer(); - timer.startTimer(); - // Retrieve record from base file - if (baseFileReader != null) { - Map baseFileRecords = getRecordsByKeyPrefixes(baseFileReader, keys); - // keys in above map are not same as passed in keys. input keys are just prefixes. - // So we have to iterate over keys from the base file reader look up and ignore input keys - baseFileRecords.forEach((key, v) -> { - HoodieRecord hoodieRecordLocal = getRecord(Option.of(baseFileRecords.get(key)), partitionName); - if (logRecords.containsKey(key)) { // key is present in both base file and log file - HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecordLocal.getData()); - result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecordLocal.getKey(), mergedPayload)))); - // we can remove the entry from log records map - logRecords.remove(key); - } else { // present only in base file - result.add(Pair.of(key, Option.of(hoodieRecordLocal))); - } - }); + Map> records = + fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName); + + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer())); + + // Iterate over all provided log-records, merging them into existing records + for (Option> logRecordOpt : logRecords.values()) { + if (logRecordOpt.isPresent()) { + HoodieRecord logRecord = logRecordOpt.get(); + records.merge( + logRecord.getRecordKey(), + logRecord, + (oldRecord, newRecord) -> + new HoodieAvroRecord<>(oldRecord.getKey(), newRecord.getData().preCombine(oldRecord.getData())) + ); + } } - // iterate over pending entries in log records map and add them to result. these are not present in base file. - // we have already removed the entries in log records map which had corresponding entry in base file. So, we can - // add all remaining entries to result. - logRecords.forEach((key, v) -> { - result.add(Pair.of(key, v)); - }); timings.add(timer.endTimer()); - return result; + + if (fullKeys) { + // In case full-keys (not key-prefixes) were provided, it's expected that the list of + // records will contain an (optional) entry for each corresponding key + return keys.stream() + .map(key -> Pair.of(key, Option.ofNullable(records.get(key)))) + .collect(Collectors.toList()); + } else { + return records.values().stream() + .map(record -> Pair.of(record.getRecordKey(), Option.of(record))) + .collect(Collectors.toList()); + } + } + + private Map> fetchBaseFileRecordsByKeys(HoodieFileReader baseFileReader, + List keys, + boolean fullKeys, + String partitionName) throws IOException { + ClosableIterator records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys) + : baseFileReader.getRecordsByKeyPrefixIterator(keys); + + return toStream(records) + .map(record -> Pair.of( + (String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME), + composeRecord(record, partitionName))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } - private HoodieRecord getRecord(Option baseRecord, String partitionName) { - ValidationUtils.checkState(baseRecord.isPresent()); + private HoodieRecord composeRecord(GenericRecord avroRecord, String partitionName) { if (metadataTableConfig.populateMetaFields()) { - return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), + return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false); } - return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), + return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord, metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false, Option.of(partitionName)); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java index 0f471183f8371..d8c631a22a7ef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java @@ -36,9 +36,9 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -99,33 +99,33 @@ public synchronized List return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key)))); } - public synchronized List>>> getRecordsByKeyPrefixes(List keyPrefixes) { + @SuppressWarnings("unchecked") + public List> getRecordsByKeyPrefixes(List keyPrefixes) { // Following operations have to be atomic, otherwise concurrent // readers would race with each other and could crash when // processing log block records as part of scan. - records.clear(); - scanInternal(Option.of(new KeySpec(keyPrefixes, false))); - return records.values().stream() - .map(record -> - Pair.of(record.getKey().getRecordKey(), Option.ofNullable((HoodieRecord) record))) - .collect(Collectors.toList()); + synchronized (this) { + records.clear(); + scanInternal(Option.of(new KeySpec(keyPrefixes, false))); + return records.values().stream() + .filter(Objects::nonNull) + .map(record -> (HoodieRecord) record) + .collect(Collectors.toList()); + } } + @SuppressWarnings("unchecked") public synchronized List>>> getRecordsByKeys(List keys) { // Following operations have to be atomic, otherwise concurrent // readers would race with each other and could crash when // processing log block records as part of scan. - records.clear(); - scan(keys); - List>>> metadataRecords = new ArrayList<>(); - keys.forEach(entry -> { - if (records.containsKey(entry)) { - metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry)))); - } else { - metadataRecords.add(Pair.of(entry, Option.empty())); - } - }); - return metadataRecords; + synchronized (this) { + records.clear(); + scan(keys); + return keys.stream() + .map(key -> Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key)))) + .collect(Collectors.toList()); + } } @Override