Skip to content

Commit

Permalink
Cleaned up duplicated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Kudinkin committed Apr 6, 2022
1 parent 40613eb commit 2f5ca86
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hudi.metadata;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
Expand All @@ -33,28 +36,23 @@
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -170,11 +168,13 @@ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(L
return Collections.emptyIterator();
}

boolean fullKeys = false;

Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecordsWithKeyPrefix(logRecordScanner, keyPrefixes, timings);
readLogRecords(logRecordScanner, keyPrefixes, fullKeys, timings);

List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
readFromBaseAndMergeWithLogRecordsForKeyPrefixes(baseFileReader, keyPrefixes, logRecords, timings, partitionName);
readFromBaseAndMergeWithLogRecords(baseFileReader, keyPrefixes, fullKeys, logRecords, timings, partitionName);

LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
keyPrefixes.size(), timings));
Expand Down Expand Up @@ -235,58 +235,37 @@ private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecords(
boolean fullKey,
List<Long> timings) {
HoodieTimer timer = new HoodieTimer().startTimer();
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
// Retrieve records from log file
timer.startTimer();
if (logRecordScanner != null) {
String partitionName = logRecordScanner.getPartitionName().get();
if (isFullScanAllowedForPartition(partitionName)) {
checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
// Path which does full scan of log files
for (String key : keys) {
logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
}
} else {
// This path will do seeks pertaining to the keys passed in
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
fullKey ? logRecordScanner.getRecordsByKeys(keys) : logRecordScanner.getRecordsByKeyPrefixes(keys);

for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
logRecords.put(entry.getKey(), entry.getValue());
}
}
} else {
for (String key : keys) {
logRecords.put(key, Option.empty());
}
}
timings.add(timer.endTimer());
return logRecords;
}

private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> readLogRecordsWithKeyPrefix(HoodieMetadataMergedLogRecordReader logRecordScanner,
List<String> keys,
List<Long> timings) {
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();
if (logRecordScanner == null) {
timings.add(timer.endTimer());
return Collections.emptyMap();
}

// Retrieve records from log file
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
String partitionName = logRecordScanner.getPartitionName().get();

// This path will do seeks pertaining to the keys passed in
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList = logRecordScanner.getRecordsByKeyPrefixes(keys);
// with prefix look up, return entry count could be more than input size. Also, input keys may not match the keys after look up.
// after look up, keys are fully formed as it seen in the stoage. where as input is a key prefix.
for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
logRecords.put(entry.getKey(), entry.getValue());
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new HashMap<>();
if (isFullScanAllowedForPartition(partitionName)) {
checkArgument(fullKey, "If full-scan is required, only full keys could be used!");
// Path which does full scan of log files
for (String key : keys) {
logRecords.put(key, logRecordScanner.getRecordByKey(key).get(0).getValue());
}
} else {
// This path will do seeks pertaining to the keys passed in
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> logRecordsList =
fullKey ? logRecordScanner.getRecordsByKeys(keys)
: logRecordScanner.getRecordsByKeyPrefixes(keys)
.stream()
.map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
.collect(Collectors.toList());

for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecordsList) {
logRecords.put(entry.getKey(), entry.getValue());
}
}

timings.add(timer.endTimer());

return logRecords;
}

Expand All @@ -296,103 +275,83 @@ private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFrom
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
List<Long> timings,
String partitionName) throws IOException {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();

HoodieRecord<HoodieMetadataPayload> hoodieRecord;

// Retrieve record from base file
if (baseFileReader != null) {
HoodieTimer readTimer = new HoodieTimer();
Map<String, GenericRecord> baseFileRecords =
fullKeys ? getRecordsByKeys(baseFileReader, keys) : getRecordsByKeyPrefixes(baseFileReader, keys);
for (String key : keys) {
readTimer.startTimer();
if (baseFileRecords.containsKey(key)) {
hoodieRecord = getRecord(Option.of(baseFileRecords.get(key)), partitionName);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
// merge base file record w/ log record if present
if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecord.getKey(), mergedPayload))));
} else {
// only base record
result.add(Pair.of(key, Option.of(hoodieRecord)));
}
} else {
// only log record
result.add(Pair.of(key, logRecords.get(key)));
}
}
timings.add(timer.endTimer());
} else {
// no base file at all
if (baseFileReader == null) {
// No base file at all
timings.add(timer.endTimer());
for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : logRecords.entrySet()) {
result.add(Pair.of(entry.getKey(), entry.getValue()));
if (fullKeys) {
// In case full-keys (not key-prefixes) were provided, it's expected that the list of
// records will contain an (optional) entry for each corresponding key
return keys.stream()
.map(key -> Pair.of(key, logRecords.getOrDefault(key, Option.empty())))
.collect(Collectors.toList());
} else {
return logRecords.entrySet().stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}
}
return result;
}

private Map<String, GenericRecord> getRecordsByKeyPrefixes(HoodieFileReader<GenericRecord> baseFileReader, List<String> keyPrefixes) throws IOException {
return toStream(baseFileReader.getRecordsByKeyPrefixIterator(keyPrefixes))
.map(record -> Pair.of((String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME), record))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();

private Map<String, GenericRecord> getRecordsByKeys(HoodieFileReader<GenericRecord> baseFileReader, List<String> keys) throws IOException {
return toStream(baseFileReader.getRecordsByKeysIterator(keys))
.map(record -> Pair.of((String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME), record))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
HoodieTimer readTimer = new HoodieTimer();
readTimer.startTimer();

private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecordsForKeyPrefixes(
HoodieFileReader baseFileReader,
List<String> keys,
Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
List<Long> timings,
String partitionName
) throws IOException {
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = new ArrayList<>();
HoodieTimer timer = new HoodieTimer().startTimer();
timer.startTimer();
// Retrieve record from base file
if (baseFileReader != null) {
Map<String, GenericRecord> baseFileRecords = getRecordsByKeyPrefixes(baseFileReader, keys);
// keys in above map are not same as passed in keys. input keys are just prefixes.
// So we have to iterate over keys from the base file reader look up and ignore input keys
baseFileRecords.forEach((key, v) -> {
HoodieRecord<HoodieMetadataPayload> hoodieRecordLocal = getRecord(Option.of(baseFileRecords.get(key)), partitionName);
if (logRecords.containsKey(key)) { // key is present in both base file and log file
HoodieRecordPayload mergedPayload = logRecords.get(key).get().getData().preCombine(hoodieRecordLocal.getData());
result.add(Pair.of(key, Option.of(new HoodieAvroRecord(hoodieRecordLocal.getKey(), mergedPayload))));
// we can remove the entry from log records map
logRecords.remove(key);
} else { // present only in base file
result.add(Pair.of(key, Option.of(hoodieRecordLocal)));
}
});
Map<String, HoodieRecord<HoodieMetadataPayload>> records =
fetchBaseFileRecordsByKeys(baseFileReader, keys, fullKeys, partitionName);

metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));

// Iterate over all provided log-records, merging them into existing records
for (Option<HoodieRecord<HoodieMetadataPayload>> logRecordOpt : logRecords.values()) {
if (logRecordOpt.isPresent()) {
HoodieRecord<HoodieMetadataPayload> logRecord = logRecordOpt.get();
records.merge(
logRecord.getRecordKey(),
logRecord,
(oldRecord, newRecord) ->
new HoodieAvroRecord<>(oldRecord.getKey(), newRecord.getData().preCombine(oldRecord.getData()))
);
}
}
// iterate over pending entries in log records map and add them to result. these are not present in base file.
// we have already removed the entries in log records map which had corresponding entry in base file. So, we can
// add all remaining entries to result.
logRecords.forEach((key, v) -> {
result.add(Pair.of(key, v));
});

timings.add(timer.endTimer());
return result;

if (fullKeys) {
// In case full-keys (not key-prefixes) were provided, it's expected that the list of
// records will contain an (optional) entry for each corresponding key
return keys.stream()
.map(key -> Pair.of(key, Option.ofNullable(records.get(key))))
.collect(Collectors.toList());
} else {
return records.values().stream()
.map(record -> Pair.of(record.getRecordKey(), Option.of(record)))
.collect(Collectors.toList());
}
}

private Map<String, HoodieRecord<HoodieMetadataPayload>> fetchBaseFileRecordsByKeys(HoodieFileReader<GenericRecord> baseFileReader,
List<String> keys,
boolean fullKeys,
String partitionName) throws IOException {
ClosableIterator<GenericRecord> records = fullKeys ? baseFileReader.getRecordsByKeysIterator(keys)
: baseFileReader.getRecordsByKeyPrefixIterator(keys);

return toStream(records)
.map(record -> Pair.of(
(String) record.get(HoodieMetadataPayload.KEY_FIELD_NAME),
composeRecord(record, partitionName)))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

private HoodieRecord<HoodieMetadataPayload> getRecord(Option<GenericRecord> baseRecord, String partitionName) {
ValidationUtils.checkState(baseRecord.isPresent());
private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord avroRecord, String partitionName) {
if (metadataTableConfig.populateMetaFields()) {
return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false);
}
return SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()),
false, Option.of(partitionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -99,33 +99,33 @@ public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>
return Collections.singletonList(Pair.of(key, Option.ofNullable((HoodieRecord) records.get(key))));
}

public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
@SuppressWarnings("unchecked")
public List<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes) {
// Following operations have to be atomic, otherwise concurrent
// readers would race with each other and could crash when
// processing log block records as part of scan.
records.clear();
scanInternal(Option.of(new KeySpec(keyPrefixes, false)));
return records.values().stream()
.map(record ->
Pair.of(record.getKey().getRecordKey(), Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) record)))
.collect(Collectors.toList());
synchronized (this) {
records.clear();
scanInternal(Option.of(new KeySpec(keyPrefixes, false)));
return records.values().stream()
.filter(Objects::nonNull)
.map(record -> (HoodieRecord<HoodieMetadataPayload>) record)
.collect(Collectors.toList());
}
}

@SuppressWarnings("unchecked")
public synchronized List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> keys) {
// Following operations have to be atomic, otherwise concurrent
// readers would race with each other and could crash when
// processing log block records as part of scan.
records.clear();
scan(keys);
List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> metadataRecords = new ArrayList<>();
keys.forEach(entry -> {
if (records.containsKey(entry)) {
metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) records.get(entry))));
} else {
metadataRecords.add(Pair.of(entry, Option.empty()));
}
});
return metadataRecords;
synchronized (this) {
records.clear();
scan(keys);
return keys.stream()
.map(key -> Pair.of(key, Option.ofNullable((HoodieRecord<HoodieMetadataPayload>) records.get(key))))
.collect(Collectors.toList());
}
}

@Override
Expand Down

0 comments on commit 2f5ca86

Please sign in to comment.