[HUDI-1792] Fix flink-client query error when processing files larger than 128mb #2814
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Tips
What is the purpose of the pull request
fix flink-client query error when processing files larger than 128mb
Use the flink client to query the cow table and report an error. The error message is as follows:
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.fs.HdfsBlockLocation cannot be cast to java.lang.Comparable at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:260) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:866) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:257) at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:322) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:276) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:249) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39) at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478) ... 4 more Caused by: java.lang.ClassCastException: org.apache.hadoop.fs.HdfsBlockLocation cannot be cast to java.lang.Comparable at java.util.ComparableTimSort.countRunAndMakeAscending(ComparableTimSort.java:320) at java.util.ComparableTimSort.sort(ComparableTimSort.java:188) at java.util.Arrays.sort(Arrays.java:1246) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:212) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:64) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:247) ... 18 more
The reason is that the implementation object of FileSystem is hadoop, and the BlockLocation implementation class of the getFileBlockLocations method to obtain the block does not implement the compare method. Here, the Arrays.sort method is used for sorting. When the number of blocks is greater than 1, the comparison will report an error. I think the CopyOnWriteInputFormat class imitates FileInputFormat for block fragment acquisition and sorting. The FileInputFormat class implements HadoopBlockLocation to obtain fragmentation information for sorting, and HadoopBlockLocation implements the compareTo method. Here modify the implementation of the incoming inner class to do sorting.
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.