Skip to content

Commit

Permalink
Made HoodieListData eager by default;
Browse files Browse the repository at this point in the history
Tidying up
  • Loading branch information
Alexey Kudinkin committed Jul 14, 2022
1 parent 23334b3 commit ec0bf7e
Show file tree
Hide file tree
Showing 19 changed files with 54 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieFlinkTable<T> table = getHoodieTable();
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.of(hoodieRecords), context, table).collectAsList();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ public HoodieAccumulator newAccumulator() {

@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.of(Collections.emptyList());
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.of(data);
return HoodieListData.eager(data);
}

public RuntimeContext getRuntimeContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record -> (HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
return HoodieListData.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
return HoodieListData.eager(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
}

@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
return HoodieListData.of(updateLocation(writeStatuses.collectAsList(), context, hoodieTable));
return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), context, hoodieTable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public HoodieWriteMetadata<List<WriteStatus>> execute(String instantTime,
dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now();
// perform index look up to get existing location of records
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(HoodieListData.of(dedupedRecords), context, table).collectAsList();
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());

// filter out non existent keys/records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime, List<Hoo

@Override
protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
return table.getIndex().tagLocation(HoodieListData.of(dedupedRecords), context, table).collectAsList();
return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea
partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
});

List<Pair<String, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.of(partitionRecordKeyMap)).collectAsList();
List<Pair<String, HoodieKey>> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();

assertEquals(10, comparisonKeyList.size());
java.util.Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected void initMetaClient(HoodieTableType tableType) throws IOException {

protected List<HoodieRecord> tagLocation(
HoodieIndex index, List<HoodieRecord> records, HoodieTable table) {
return ((HoodieData<HoodieRecord>) index.tagLocation(HoodieListData.of(records), context, table)).collectAsList();
return ((HoodieData<HoodieRecord>) index.tagLocation(HoodieListData.eager(records), context, table)).collectAsList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieJavaTable<T> table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context);
Timer.Context indexTimer = metrics.getIndexCtx();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.of(hoodieRecords), context, table).collectAsList();
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList();
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime)));
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieListData.of(writeStatusList));
writeMetadata.setWriteStatuses(HoodieListData.eager(writeStatusList));
return writeMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public HoodieAccumulator newAccumulator() {

@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.of(Collections.emptyList());
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.of(data);
return HoodieListData.eager(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
List<HoodieRecord<T>> hoodieRecords = tagLocation(records.map(record -> (HoodieRecord<T>) record).collectAsList(), context, hoodieTable);
return HoodieListData.of(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
return HoodieListData.eager(hoodieRecords.stream().map(r -> (HoodieRecord<R>) r).collect(Collectors.toList()));
}

@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public HoodieData<WriteStatus> updateLocation(
HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context,
HoodieTable hoodieTable) throws HoodieIndexException {
return HoodieListData.of(updateLocation(writeStatuses.collectAsList(), context, hoodieTable));
return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), context, hoodieTable));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inpu
protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = table.getIndex().updateLocation(HoodieListData.of(writeStatuses), context, table).collectAsList();
List<WriteStatus> statuses = table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, table).collectAsList();
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
return statuses;
Expand Down Expand Up @@ -338,7 +338,7 @@ public Partitioner getInsertPartitioner(WorkloadProfile profile) {
public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = table.getIndex().updateLocation(HoodieListData.of(writeStatuses), context, table).collectAsList();
List<WriteStatus> statuses = table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, table).collectAsList();
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public HoodieWriteMetadata<List<WriteStatus>> execute(String instantTime,
dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
Instant beginTag = Instant.now();
// perform index look up to get existing location of records
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(HoodieListData.of(dedupedRecords), context, table).collectAsList();
List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords = table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());

// filter out non existent keys/records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static JavaWriteHelper newInstance() {

@Override
protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table) {
return table.getIndex().tagLocation(HoodieListData.of(dedupedRecords), context, table).collectAsList();
return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,25 @@ private HoodieListData(List<T> data, boolean lazy) {
}

/**
* Creates instance of {@link HoodieListData} bearing *lazy* execution semantic
* Creates instance of {@link HoodieListData} bearing *eager* execution semantic
*
* @param listData a {@link List} of objects in type T
* @param <T> type of object
* @return a new instance containing the {@link List<T>} reference
*/
// TODO rename to lazy
public static <T> HoodieListData<T> of(List<T> listData) {
return new HoodieListData<>(listData, true);
public static <T> HoodieListData<T> eager(List<T> listData) {
return new HoodieListData<>(listData, false);
}

/**
* Creates instance of {@link HoodieListData} bearing *eager* execution semantic
* Creates instance of {@link HoodieListData} bearing *lazy* execution semantic
*
* @param listData a {@link List} of objects in type T
* @param <T> type of object
* @return a new instance containing the {@link List<T>} reference
*/
public static <T> HoodieListData<T> eager(List<T> listData) {
return new HoodieListData<>(listData, false);
public static <T> HoodieListData<T> lazy(List<T> listData) {
return new HoodieListData<>(listData, true);
}

@Override
Expand All @@ -101,21 +100,6 @@ public void unpersist() {
// No OP
}

@Override
public boolean isEmpty() {
return super.isEmpty();
}

@Override
public long count() {
return super.count();
}

@Override
public List<T> collectAsList() {
return super.collectAsList();
}

@Override
public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
return new HoodieListData<>(asStream().map(throwingMapWrapper(func)), lazy);
Expand Down Expand Up @@ -180,4 +164,19 @@ public HoodieData<T> repartition(int parallelism) {
// no op
return this;
}

@Override
public boolean isEmpty() {
return super.isEmpty();
}

@Override
public long count() {
return super.count();
}

@Override
public List<T> collectAsList() {
return super.collectAsList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ public void unpersist() {
// no-op
}

@Override
public long count() {
return super.count();
}

@Override
public List<Pair<K, V>> collectAsList() {
return super.collectAsList();
}

@Override
public HoodieData<K> keys() {
return new HoodieListData<>(asStream().map(Pair::getKey), lazy);
Expand Down Expand Up @@ -178,6 +168,16 @@ public <W> HoodiePairData<K, Pair<V, Option<W>>> leftOuterJoin(HoodiePairData<K,
return new HoodieListPairData<>(leftOuterJoined, lazy);
}

@Override
public long count() {
return super.count();
}

@Override
public List<Pair<K, V>> collectAsList() {
return super.collectAsList();
}

public static <K, V> HoodieListPairData<K, V> lazy(List<Pair<K, V>> data) {
return new HoodieListPairData<>(data, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ public HoodieAccumulator newAccumulator() {

@Override
public <T> HoodieData<T> emptyHoodieData() {
return HoodieListData.of(Collections.emptyList());
return HoodieListData.eager(Collections.emptyList());
}

@Override
public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
return HoodieListData.of(data);
return HoodieListData.eager(data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static Stream<Arguments> distinctWithKey() {
@ParameterizedTest
@MethodSource
void distinctWithKey(List<Pair<String, Integer>> expected, List<Pair<String, Integer>> originalList) {
List<Pair<String, Integer>> distinctList = HoodieListData.of(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList();
List<Pair<String, Integer>> distinctList = HoodieListData.eager(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList();
assertEquals(expected, distinctList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void doCommit(String instant, Collection<CompactionCommitEvent> events)
.collect(Collectors.toList());

HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(
table, instant, HoodieListData.of(statuses), writeClient.getConfig().getSchema());
table, instant, HoodieListData.eager(statuses), writeClient.getConfig().getSchema());

// commit the compaction
this.writeClient.commitCompaction(instant, metadata, Option.empty());
Expand Down

0 comments on commit ec0bf7e

Please sign in to comment.