Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Small file creation while writing to a Hudi Table #5047

Closed
srinikandi opened this issue Mar 15, 2022 · 8 comments
Closed

[SUPPORT] Small file creation while writing to a Hudi Table #5047

srinikandi opened this issue Mar 15, 2022 · 8 comments
Assignees
Labels
on-call-triaged priority:major degraded perf; unable to move forward; potential bugs writer-core Issues relating to core transactions/write actions

Comments

@srinikandi
Copy link

srinikandi commented Mar 15, 2022

Hi
I have been using Apache Hudi Connector for Glue (hudi 0.90 version) and facing small file creation problem while inserting data into a hudi table. The input file is about 17 GB with 313 parquet part files. Each averaging around 70 mb. When I try to insert the data into Hudi table with overwrite option, this ends up creating some 7000 plus parquet part files, each with 6.5 MB. I did utilize the small file size and max file size parameters while writing.
here is the config that I used . The intention was to create file sizes between 60 - 80 MB.

-- For testing purposed, i limited the reading the first 25 part files from the input dataset, which resulted in creating 1000 plus 6.5 MB files.

common_config = {
"hoodie.datasource.write.hive_style_partitioning": "true",
"className": "org.apache.hudi",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.write.recordkey.field": "C1,C2,C3,C4",
"hoodie.datasource.write.precombine.field": "",
"hoodie.table.name": "TEST_TABLE",
"hoodie.datasource.hive_sync.database": "TEST_DATABASE_RAW",
"hoodie.datasource.hive_sync.table": "TEST_TABLE",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.datasource.write.partitionpath.field": "",
"hoodie.datasource.hive_sync.support_timestamp": "true",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.parquet.small.file.limit": "62914560",
"hoodie.parquet.max.file.limit": "83886080",
"hoodie.parquet.block.size": "62914560",
"hoodie.insert.shuffle.parallelism": "5",
"hoodie.datasource.write.operation": "insert"
}
full_ref_files is list with all of the parquet part file from the input folder.
full_load_df = spark.read.parquet(*full_ref_files)
full_load_df.write.format("hudi").options(**conf).mode("overwrite").save(raw_table_path)

Below are the spark logs, where the last line shows that it create some 1000 plus partitions while writing to Hudi table.
Any insights are deeply appreciated.

[Stage 1 (javaToPython at NativeMethodAccessorImpl.java:0):> (0 + 113) / 116]

[Stage 1 (javaToPython at NativeMethodAccessorImpl.java:0):> (9 + 107) / 116]
[Stage 1 (javaToPython at NativeMethodAccessorImpl.java:0):==> (100 + 16) / 116]
[Stage 3 (collect at /tmp/stage-to-raw-etl-glue-job-poc-1.py:105):()(83 + 33) / 116]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(0 + 116) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(1 + 116) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(3 + 114) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(3 + 114) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(5 + 112) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(5 + 112) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(10 + 107) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(11 + 106) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(12 + 105) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(13 + 104) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(13 + 104) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(16 + 101) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(17 + 100) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(17 + 100) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(19 + 98) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(20 + 97) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(21 + 96) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(21 + 96) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(23 + 94) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(25 + 92) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(26 + 91) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(28 + 89) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(28 + 89) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(29 + 88) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(31 + 86) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(33 + 84) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(34 + 83) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(36 + 81) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(36 + 81) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(38 + 79) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(40 + 77) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(45 + 72) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(46 + 71) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(49 + 68) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(50 + 67) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(53 + 64) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(59 + 58) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(64 + 53) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(66 + 51) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(68 + 49) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(69 + 48) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(72 + 45) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(76 + 41) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(80 + 37) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(84 + 33) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(86 + 31) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(89 + 28) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(90 + 27) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(93 + 24) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(97 + 20) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(102 + 15) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(105 + 12) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(107 + 10) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(110 + 7) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(113 + 4) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(114 + 3) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(116 + 1) / 117]
[Stage 6 (countByKey at BaseSparkCommitActionExecutor.java:175):()(117 + 0) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (0 + 117) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (2 + 115) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (6 + 111) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):()(10 + 107) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):()(15 + 102) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (18 + 99) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (23 + 94) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (29 + 88) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (41 + 76) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (64 + 53) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (89 + 28) / 117]
[Stage 9 (mapToPair at BaseSparkCommitActionExecutor.java:209):> (110 + 7) / 117]
[Stage 12 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (1 + 3) / 4]
[Stage 14 (isEmpty at HoodieSparkSqlWriter.scala:609):> (1 + 19) / 20]
[Stage 14 (isEmpty at HoodieSparkSqlWriter.scala:609):> (3 + 17) / 20]
[Stage 16 (isEmpty at HoodieSparkSqlWriter.scala:609):> (0 + 100) / 100]
[Stage 16 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (35 + 65) / 100]
[Stage 16 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (37 + 63) / 100]
[Stage 16 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (51 + 49) / 100]
[Stage 16 (isEmpty at HoodieSparkSqlWriter.scala:609):======> (77 + 23) / 100]
[Stage 16 (isEmpty at HoodieSparkSqlWriter.scala:609):=========> (93 + 7) / 100]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):> (0 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):> (68 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):> (110 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):> (116 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=> (155 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (214 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (231 + 116) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (232 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (237 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):===> (287 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (320 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (345 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (348 + 116) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (349 + 117) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=====> (386 + 114) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):======> (421 + 79) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=======> (452 + 48) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=======> (465 + 35) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=======> (466 + 34) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=======> (470 + 30) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):=======> (481 + 19) / 500]
[Stage 18 (isEmpty at HoodieSparkSqlWriter.scala:609):========> (496 + 4) / 500]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):> (0 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):> (61 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):> (114 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):> (116 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):=> (158 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (205 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (231 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):==> (232 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):===> (261 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (300 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (335 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (347 + 117) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):====> (348 + 116) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):=====> (372 + 101) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):======> (406 + 67) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):=======> (445 + 28) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):========> (465 + 8) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):========> (468 + 5) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):========> (470 + 3) / 473]
[Stage 20 (isEmpty at HoodieSparkSqlWriter.scala:609):=========>(473 + 0) / 473]
[Stage 22 (collect at SparkRDDWriteClient.java:123):=====> (773 + 115) / 1098]
Screen Shot 2022-03-15 at 8 08 54 AM

@srinikandi srinikandi changed the title [SUPPORT] [SUPPORT] Small file creation while writing to a Hudi Table Mar 15, 2022
@xushiyan xushiyan added priority:major degraded perf; unable to move forward; potential bugs writer-core Issues relating to core transactions/write actions labels Mar 15, 2022
@srinikandi
Copy link
Author

After some research, I found that by setting the below parameter to a lower value than the default value of 1024, the small file problem during the first load is fixed to an extent. Is this the recommended practice or should the small file size and the max file size limit parameters work for the first time load as well.

"hoodie.copyonwrite.record.size.estimate": "100"

@tjtoll
Copy link

tjtoll commented Mar 17, 2022

I am experiencing the same exact problem - also with AWS Glue. I tried the below settings and they increased the file sizes to a more appropriate size, but the insert is taking 10x longer. Also interested in some guidance on this issue.

'hoodie.copyonwrite.insert.auto.split': 'false',
'hoodie.copyonwrite.insert.split.size': 100000000,

@nsivabalan
Copy link
Contributor

for the first time load to a new hudi table, "hoodie.copyonwrite.record.size.estimate" might be required. it will define how small files are handled. for subsequent commits, hudi can determine the record size from previous commits.

@nsivabalan nsivabalan self-assigned this Mar 19, 2022
@nsivabalan
Copy link
Contributor

@srinikandi : did you get a chance to try out the above config.

@nsivabalan
Copy link
Contributor

recently we did fix the small file configs behavior
#5129

also, try setting "hoodie.parquet.block.size" in addition to "hoodie.parquet.max.file.size" which could impact your file sizes.

Please feel free to re-open if above suggestions does not work and if you can reproduce w/ latest master.
thanks!

@srinikandi
Copy link
Author

@srinikandi : did you get a chance to try out the above config. Yes, I did have success using the parameters hoodie.copyonwrite.record.size.estimate and hoodie.parquet.block.size. Thanks for the suggestion.

@srinikandi
Copy link
Author

Is there any recommendation on how big these individual parquet files can be for optimal performance?.

@nsivabalan
Copy link
Contributor

it depends on your cluster and workload charaterization. tough to give one number. But for most cases, 120MB works well. unless your scale is huge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:major degraded perf; unable to move forward; potential bugs writer-core Issues relating to core transactions/write actions
Projects
None yet
Development

No branches or pull requests

4 participants