Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DataSkipping for hudi connector #18606

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

xiarixiaoyao
Copy link

@xiarixiaoyao xiarixiaoyao commented Nov 2, 2022

What's the change?

  1. support DataSkipping for hudi connector.
  2. support partition prune by hudi mdt to reduce rpc calls for hive
  3. support filter push down for hudi cow table.

design
image

test result:
ssb benchmark,
datasize: 1.5TB, 12billion
env: 1CN+3WN Container 170GB,136GB JVM heap, 95GB Max Query Memory,40vcore
image

Test plan - (Please fill in how you tested your changes)

Test plan - unit test

== NO RELEASE NOTE ==
General Changes
* support dataSkipping for hudi connector.
* support partition prune by hudi mdt to reduce rpc calls for hive
* support filter push down for hudi cow table.

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Nov 2, 2022

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: xiarixiaoyao (d0e6d469b47f3de0bc464fc2cc6228fb6a6ab7fd)

@pratyakshsharma
Copy link
Contributor

Will have a look at it over the weekend.

Copy link
Contributor

@pratyakshsharma pratyakshsharma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for raising this detailed PR. My other PR based on RFC-58 was trying to do data skipping using metadata table in a more generic way. Basically rather than introducing the changes in specific query engines like presto and trino, the idea was to introduce the changes as part of hudi itself and simply call them from presto/trino.
Anyways I can make changes accordingly later. I am still going through the changes and have given few comments for changes/clarification. Please have a look.

{
SchemaEvolutionContext schemaEvolutionContext = hudiSplit.getSchemaEvolutionContext();
InternalSchema internalSchema = SerDeHelper.fromJson(schemaEvolutionContext.getRequiredSchema()).orElse(InternalSchema.getEmptyInternalSchema());
if (internalSchema.isEmptySchema() || !hudiSplit.getBaseFile().isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to understand why we cannot do schema evolution if only log files are present?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have processed the mor table in the hudi kernel
apache/hudi#6989

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing me to this PR, will have a look and ask doubts, if any


public static SchemaEvolutionContext createSchemaEvolutionContext(HoodieTableMetaClient metaClient)
{
// no need to do schema evolution for mor table, since hudi kernel will do it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why there is no need to do schema evolution for MoR tables? Are we taking care of schema evolution with the getSplits call for MoR in Hudi kernel?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have processed the mor table in the hudi kernel
apache/hudi#6989


public class SchemaEvolutionContext
{
private final String requiredSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add one line comment for these 2 variables here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if I understand properly, this variable corresponds to latest internal schema, probably we can update the variable name too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,Thank you for your good suggest

TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata().map(SerDeHelper::toJson).orElse("");
HoodieTimeline hoodieTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
String validCommits = hoodieTimeline.getInstants().map(HoodieInstant::getFileName).collect(Collectors.joining(","));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess better to update the variable name to validCommitFiles and also update this variable in SchemaEvolutionContext class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will be good to have some test cases covering the scenarios of different types of schema evolutions. That should clear most of my doubts as well.

agree , as apache/hudi#6989 has merged in hudi kernel, we should add test cases to convering schema evolution.

Map<String, Map<String, String>> partitionMap = HudiPartitionManager
.getPartitions(partitionColumns.stream().map(f -> f.getName()).collect(Collectors.toList()), partitions);
if (partitions.size() == 1 && partitions.get(0).isEmpty()) {
// non-non-partitioned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: non-partitioned

}).map(entry -> entry.getKey()).collect(Collectors.toList());
}

private boolean evaluatePartitionPredice(TupleDomain<String> partitionPredicate, List<HudiColumnHandle> partitionColumnHandles, String partitionPathValue, String partitionName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to evaluatePartitionPredicate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for raising this detailed PR. My other PR based on RFC-58 was trying to do data skipping using metadata table in a more generic way. Basically rather than introducing the changes in specific query engines like presto and trino, the idea was to introduce the changes as part of hudi itself and simply call them from presto/trino. Anyways I can make changes accordingly later. I am still going through the changes and have given few comments for changes/clarification. Please have a look.

yes If rfc-58 is completed, we only need to convert the presto filter into a hudi filter, and then call the interface directly, just like iceberg. and also RFC-64 is abstracting interfaces, however this may take a long time.

once rfc-58/rfc-64 completed, we can remove those logical directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I am aligned on this.

Copy link
Contributor

@pratyakshsharma pratyakshsharma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it will be good to have some test cases covering the scenarios of different types of schema evolutions. That should clear most of my doubts as well.

String commitTime = FSUtils.getCommitTime(baseFilePath.getName());
InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(Long.parseUnsignedLong(commitTime), tablePath, hadoopConf, schemaEvolutionContext.getValidCommits());
log.debug(String.format(Locale.ENGLISH, "File schema from hudi base file is %s", fileSchema));
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you probably wanted to add some comment here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, forget add comments

return Pair.of(oldColumnHandle, ImmutableMap.of());
}
// prune internalSchema: columns prune
InternalSchema prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, oldColumnHandle.stream().map(HudiColumnHandle::getName).collect(Collectors.toList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud, please correct me if I am wrong. oldColumnHandle list comes from metastore and it will have actual columns present in the metastore. There can be a case where latest commit resulted in some column deletion and user did not run hiveSync, so the latest schema was not synced with HMS. Now if you call pruneInternalSchema, it can result in prunedSchema having less number of columns than oldColumnHandle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question.
At present, hudi cannot guarantee that the metadata in hive is consistent with the metadata of the current table, and users need to ensure that. this is a big problem.

In this case, we it will be better to throw an exception directly and prompt the user that the metadata information of the current hive table is inconsistent with the data information of the hudi table

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this seems to be a good approach. let us do this. Also would like to hear @codope's thoughts on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error as metastore is behind hudi table and needs to be synced again.

ImmutableList.Builder<HudiColumnHandle> builder = ImmutableList.builder();
for (int i = 0; i < oldColumnHandle.size(); i++) {
HiveType hiveType = constructPrestoTypeFromType(mergedSchema.columns().get(i).type());
HudiColumnHandle hudiColumnHandle = oldColumnHandle.get(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer the comment on line 87 above. Now mergedSchema has the same number of columns as prunedSchema and this can be smaller than oldColumnHandle's size. This can create problems with the above logic. WDYT? @xiarixiaoyao

return Pair.of(builder.build(), collectTypeChangedCols(prunedSchema, mergedSchema));
}

private static Map<String, HiveType> collectTypeChangedCols(InternalSchema schema, InternalSchema oldSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe change the name of oldSchema to querySchema?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

if (!columnCoercions.isEmpty() &&
columnCoercions.containsKey(column.getName()) &&
!column.getHiveType().equals(columnCoercions.get(column.getName()))) {
coercersBuilder.add(Optional.of(HiveCoercer.createCoercer(typeManager, columnCoercions.get(column.getName()), column.getHiveType())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columnCoercions has the new HiveType of columns after doing schema evolution. Should we reverse the last 2 variables in this call? The method signature goes like this - static HiveCoercer createCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType). That is why thinking maybe column.getHiveType() should be the second parameter in this call? Please correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! What @pratyakshsharma is suggesting seems right.

Copy link
Author

@xiarixiaoyao xiarixiaoyao Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static HiveCoercer createCoercer(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)

column.getHiveType() is column type from hive metastore, In theory, it is the latest schema
columnCoercions.get(column.getName()) return a old hive type(not the new type) before DDL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for clarifying that.

@xiarixiaoyao
Copy link
Author

@pratyakshsharma
Thank you for your valuable comments, will addressed all comments.

Copy link
Contributor

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super useful feature for the connector. Thanks @xiarixiaoyao for taking it up. It would be great if you could also add a few tests.

stringProperty(
HOODIE_FILESYSTEM_VIEW_SPILLABLE_DIR,
"Path on local storage to use, when file system view is held in a spillable map.",
"/tmp/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't this session property be part of HudiConfig as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

HoodieTimeline activeTimeline = metaClient.reloadActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
// build system view.
fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not necessarily be HoodieMetadataFileSystemView. Should we use one of the FileSystemViewManager APIs to build the view based in metadata config?

log.warn(String.format("failed to do data skipping for table: %s, fallback to all files scan", metaClient.getBasePathV2().toString()), e);
candidateFileSlices = allInputFileSlices;
}
int candidateFileSize = candidateFileSlices.entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, (n1, n2) -> n1 + n2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int candidateFileSize = candidateFileSlices.entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, (n1, n2) -> n1 + n2);
int candidateFileSize = candidateFileSlices.values().stream().map(List::size).reduce(0, Integer::sum);

int candidateFileSize = candidateFileSlices.entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, (n1, n2) -> n1 + n2);
int totalFiles = allInputFileSlices.entrySet().stream().map(entry -> entry.getValue().size()).reduce(0, (n1, n2) -> n1 + n2);
double skippingPercent = totalFiles == 0 ? 0.0d : (totalFiles - candidateFileSize) / (totalFiles + 0.0d);
log.info(String.format("Total files: %s; candidate files after data skipping: %s; skipping percent %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like these variables candidateFileSize and totalFiles are just for logging purpose? We can avoid churning of maps if it isn't strictly necessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

{
}

public static SchemaEvolutionContext createSchemaEvolutionContext(HoodieTableMetaClient metaClient)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this method gets called just once throughout the lifecycle of query right?
Maybe as a followup we can cache it by instant and make it visible for all queries to reduce the i/o load.

Optional<HudiColumnHandle> columnHandleOpt = partitionColumnHandles.stream().filter(f -> f.getName().equals(partitionName)).findFirst();
if (columnHandleOpt.isPresent()) {
Domain domain = getDomain(columnHandleOpt.get(), partitionPathValue);
Domain columnPredicate = partitionPredicate.getDomains().get().get(partitionName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionPredicate.getDomains() can be an empty optional.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but thats what L202 to 204 handles right?

@xiarixiaoyao
Copy link
Author

@pratyakshsharma @codope
Thank you for viewing very much.
working on add ut

@xiarixiaoyao
Copy link
Author

xiarixiaoyao commented Nov 17, 2022

@pratyakshsharma @codope
add UT and addressed all comments .
could you pls help me review again, thanks

// should remove this function, once we bump hudi to 0.13.0.
// old hudi-presto-bundle has not include lz4 and caffeine jar which is used by schema evolution and data-skipping.
private void shouldRemoved()
{
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only used to pass ci, as we directly introduced lz4 and caffeine into the pom file

@xiarixiaoyao xiarixiaoyao force-pushed the sp branch 5 times, most recently from 21043e6 to a1dab79 Compare November 17, 2022 13:06
Copy link

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on the patch. results look amazing!

Optional<HudiColumnHandle> columnHandleOpt = partitionColumnHandles.stream().filter(f -> f.getName().equals(partitionName)).findFirst();
if (columnHandleOpt.isPresent()) {
Domain domain = getDomain(columnHandleOpt.get(), partitionPathValue);
Domain columnPredicate = partitionPredicate.getDomains().get().get(partitionName);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but thats what L202 to 204 handles right?

return true;
}
for (String regularColumn : regularColumns) {
Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it not handled in L204?

}
List<String> regularColumns = regularColumnPredicates.getDomains().get().entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
// get filter columns
List<String> encodedTargetColumnNames = regularColumns.stream().map(col -> new ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList());
Copy link

@nsivabalan nsivabalan Nov 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be in a follow up PR. we should also wire in pruned list of partitions here, so that we prefix look up only in pruned partitions rather than all partitions. For eg, if there are 1000 partitions and 5 cols w/ predicate, and only 10 partitions are matched after pruning,

exiting call will fetch 5 cols * 1000 partitions = 5k entries from col_stats partition in MDT to do file skipping.
where as if we wire in pruned list of partitions, then we only need to do file skipping from 50 entries. 

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess we missed this even for spark impl in Hudi. will file a jira on this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Domain columnPredicate = regularColumnPredicates.getDomains().get().get(regularColumn);
Optional<HoodieMetadataColumnStats> currentColumnStats = stats.stream().filter(s -> s.getColumnName().equals(regularColumn)).findFirst();
if (!currentColumnStats.isPresent()) {
// no stats for column

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should not happen right. can we throw here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i donnot think so.
The index may be expired, at this time we must return true directly instead of throwing an exception

@xiarixiaoyao
Copy link
Author

@pratyakshsharma @nsivabalan @codope
addressed all comments, could you pls help me review again,thanks

presto-hudi/pom.xml Show resolved Hide resolved
if (!columnCoercions.isEmpty() &&
columnCoercions.containsKey(column.getName()) &&
!column.getHiveType().equals(columnCoercions.get(column.getName()))) {
coercersBuilder.add(Optional.of(HiveCoercer.createCoercer(typeManager, columnCoercions.get(column.getName()), column.getHiveType())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for clarifying that.

return Pair.of(oldColumnHandle, ImmutableMap.of());
}
// prune internalSchema: columns prune
InternalSchema prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, oldColumnHandle.stream().map(HudiColumnHandle::getName).collect(Collectors.toList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error as metastore is behind hudi table and needs to be synced again.

HoodieParquetRealtimeInputFormat.class.getName(),
MapredParquetOutputFormat.class.getName());

// spark.sql(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it's time to introduce the Java write client for testing purposes, instead of simulating commits this way. We already do it that way in Trino. I am ok with this change. We can take it up as a followup. But what do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i will try
thanks

Copy link
Member

@7c00 7c00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we introduce data skipping and schema evolution in two separate PRs?


long duration = System.currentTimeMillis() - startTime;

log.info(String.format("prepare query files for table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In presto, we tend to use XXXStats to track method performance. For example, com.facebook.presto.hive.metastore.thrift.HiveMetastoreApiStats.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion,

@xiarixiaoyao
Copy link
Author

@7c00
Thank you for your review.
addressed all comments

@xiarixiaoyao xiarixiaoyao force-pushed the sp branch 2 times, most recently from bd62943 to 16d7cfd Compare December 7, 2022 07:30
Copy link
Contributor

@pratyakshsharma pratyakshsharma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for patiently addressing all comments throughout. Few minor comments. Also wanted to know if you raised another PR for schema evolution? If so, please mention this PR there, so both the PRs are linked.

return true;
}

if (columnPredicate.intersect(domain).isNone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can simplify it to return !columnPredicate.intersect(domain).isNone();

import static org.testng.Assert.assertEquals;

/**
* Integration tests for reading Delta tables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This class is not intended for delta tables.

.build();

// setup file metastore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra line

});
}

private HoodieTableQueryType getQueryType(String hudiInputFormat)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse this method from HudiSplitManager class?

}

// Create the test hudi tables for dataSkipping/partition prune in HMS
registerHudiTableInHMS(HoodieTableType.COPY_ON_WRITE, HUDI_SKIPPING_TABLE, testingDataDir, Streams.concat(HUDI_META_COLUMNS.stream(), DATA_COLUMNS.stream()).collect(Collectors.toList()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the tests only cover CoW table type. Let us add for MoR table type as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao I guess test case for MoR type is still not added.

@codope
Copy link
Contributor

codope commented Dec 23, 2022

@xiarixiaoyao @7c00 @pratyakshsharma This PR looks in pretty good shape and near landing now (except for last minor comments). It has also been well-tested both by @xiarixiaoyao and @nsivabalan on separate datasets. Would really appreciate if we can land this sooner.

@nsivabalan
Copy link

Hey folks, can we try to land this in 2022 :) would be good to close it out before end of this year.

@xiarixiaoyao
Copy link
Author

@pratyakshsharma
Sorry for the late reply.
busy at work last month
The pr of schema evoluton will be raised tomorrow, thanks

@pratyakshsharma
Copy link
Contributor

@xiarixiaoyao Is it good for another pass now?

@xiarixiaoyao
Copy link
Author

@pratyakshsharma
Yes, it will be ok to review. thanks very much
if there are any new problems, I will modify them as soon as possible.

@pratyakshsharma
Copy link
Contributor

@xiarixiaoyao I guess MoR test case is still missing. Can you please confirm?

Copy link
Collaborator

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few cursory comments. Happy to do a deeper pass, once we rebase this again on top of the async splits pr.

import java.util.Map;
import java.util.Optional;

public class HudiPredicates
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we unit test these classes? and the other new ones?

boolean hudiMetadataTableEnabled = isHudiMetadataTableEnabled(session);
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(hudiMetadataTableEnabled).build();
Configuration conf = fs.getConf();
HoodieTableMetaClient metaClient = HoodieTableMetaClient
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would n't we otherwise create a metaClient here anyways? could we reuse instead of. creating a new one for data skipping alone?

@vinothchandar
Copy link
Collaborator

@codope One question I had was - does the hudi connector now leverage the metadata/alluxio caching that's in the hive connector? I had a deep dive with someone at Uber, and if that works, it could end up being a faster path (local, in-memory cache, maintained in parallel at the workers)

@vinothchandar
Copy link
Collaborator

@xiarixiaoyao any updates on this?

@xiarixiaoyao
Copy link
Author

@xiarixiaoyao any updates on this?

I'm glad you're following this PR, will update this pr next few days. thanks

requireNonNull(partitions, "partitions is null");
requireNonNull(spillableDir, "spillableDir is null");
requireNonNull(engineContext, "engineContext is null");
this.queryType = requireNonNull(queryType, "queryType is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we remove this variable since it is not getting used anywhere.

@vinothchandar
Copy link
Collaborator

@xiarixiaoyao Ping again :)

@yihua
Copy link
Contributor

yihua commented Feb 1, 2024

Hey @xiarixiaoyao Hope you're doing well. If you're busy, we can help rebase the PR on the latest master and drive it to completion.

@tdcmeehan tdcmeehan self-assigned this Feb 2, 2024
@xiarixiaoyao
Copy link
Author

xiarixiaoyao commented Feb 5, 2024

Hey @xiarixiaoyao Hope you're doing well. If you're busy, we can help rebase the PR on the latest master and drive it to completion.

@yihua I'm sorry, I am quite busy currently. I'm glad you're interested in this PR. I hope you can continue this PR, thank you very much.

@steveburnett
Copy link
Contributor

Consider revising the release note entry in the Description following the the release note guidelines.

== RELEASE NOTES ==
Hudi Connector Changes
* Add dataSkipping for Hudi connector.
* Add partition prune by Hudi MDT to reduce RPC calls for Hive.
* Add filter push down for Hudi COW table.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants