diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 3802bb46a0f5..fbfc1d8ec902 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -28,7 +28,7 @@ import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.BaseFileUtils import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} -import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter} +import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter} import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -50,8 +50,7 @@ object SparkHelpers { // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) - val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(), - true) + val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString if (!keysToSkip.contains(key)) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java similarity index 51% rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java index 095cacc144a9..6f7940d04d0f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java @@ -18,22 +18,15 @@ package org.apache.hudi.io.storage; -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetWriter; import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; /** * HoodieParquetWriter extends the ParquetWriter to help limit the size of underlying file. Provides a way to check if @@ -42,45 +35,24 @@ * ATTENTION: HoodieParquetWriter is not thread safe and developer should take care of the order of write and close */ @NotThreadSafe -public class HoodieParquetWriter - extends ParquetWriter implements HoodieFileWriter { +public class HoodieAvroParquetWriter + extends HoodieBaseParquetWriter + implements HoodieFileWriter { - private static AtomicLong recordIndex = new AtomicLong(1); - - private final Path file; - private final HoodieWrapperFileSystem fs; - private final long maxFileSize; - private final HoodieAvroWriteSupport writeSupport; + private final String fileName; private final String instantTime; private final TaskContextSupplier taskContextSupplier; private final boolean populateMetaFields; + private final HoodieAvroWriteSupport writeSupport; - public HoodieParquetWriter(String instantTime, - Path file, - HoodieAvroParquetConfig parquetConfig, - Schema schema, - TaskContextSupplier taskContextSupplier, - boolean populateMetaFields) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, - parquetConfig.getWriteSupport(), - parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), - parquetConfig.getPageSize(), - parquetConfig.getPageSize(), - parquetConfig.dictionaryEnabled(), - DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, - FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()); - this.fs = - (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - // We cannot accurately measure the snappy compressed output file size. We are choosing a - // conservative 10% - // TODO - compute this compression ratio dynamically by looking at the bytes written to the - // stream and the actual file size reported by HDFS - this.maxFileSize = parquetConfig.getMaxFileSize() - + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); + @SuppressWarnings({"unchecked", "rawtypes"}) + public HoodieAvroParquetWriter(Path file, + HoodieAvroParquetConfig parquetConfig, + String instantTime, + TaskContextSupplier taskContextSupplier, + boolean populateMetaFields) throws IOException { + super(file, (HoodieBaseParquetConfig) parquetConfig); + this.fileName = file.getName(); this.writeSupport = parquetConfig.getWriteSupport(); this.instantTime = instantTime; this.taskContextSupplier = taskContextSupplier; @@ -91,7 +63,7 @@ public HoodieParquetWriter(String instantTime, public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException { if (populateMetaFields) { prepRecordWithMetadata(key, avroRecord, instantTime, - taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); + taskContextSupplier.getPartitionIdSupplier().get(), getWrittenRecordCount(), fileName); super.write(avroRecord); writeSupport.add(key.getRecordKey()); } else { @@ -99,11 +71,6 @@ public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOExceptio } } - @Override - public boolean canWrite() { - return getDataSize() < maxFileSize; - } - @Override public void writeAvro(String key, IndexedRecord object) throws IOException { super.write(object); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java new file mode 100644 index 000000000000..b4aa6de1bd57 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Base class of Hudi's custom {@link ParquetWriter} implementations + * + * @param target type of the object being written into Parquet files (for ex, + * {@code IndexedRecord}, {@code InternalRow}) + */ +public abstract class HoodieBaseParquetWriter extends ParquetWriter { + + private static final int WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK = 1000; + + private final AtomicLong writtenRecordCount = new AtomicLong(0); + private final long maxFileSize; + private long lastCachedDataSize = -1; + + public HoodieBaseParquetWriter(Path file, + HoodieBaseParquetConfig> parquetConfig) throws IOException { + super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), + ParquetFileWriter.Mode.CREATE, + parquetConfig.getWriteSupport(), + parquetConfig.getCompressionCodecName(), + parquetConfig.getBlockSize(), + parquetConfig.getPageSize(), + parquetConfig.getPageSize(), + parquetConfig.dictionaryEnabled(), + DEFAULT_IS_VALIDATING_ENABLED, + DEFAULT_WRITER_VERSION, + FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + + // We cannot accurately measure the snappy compressed output file size. We are choosing a + // conservative 10% + // TODO - compute this compression ratio dynamically by looking at the bytes written to the + // stream and the actual file size reported by HDFS + this.maxFileSize = parquetConfig.getMaxFileSize() + + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); + } + + public boolean canWrite() { + // TODO we can actually do evaluation more accurately: + // if we cache last data size check, since we account for how many records + // were written we can accurately project avg record size, and therefore + // estimate how many more records we can write before cut off + if (lastCachedDataSize == -1 || getWrittenRecordCount() % WRITTEN_RECORDS_THRESHOLD_FOR_FILE_SIZE_CHECK == 0) { + lastCachedDataSize = getDataSize(); + } + return lastCachedDataSize < maxFileSize; + } + + @Override + public void write(R object) throws IOException { + super.write(object); + writtenRecordCount.incrementAndGet(); + } + + protected long getWrittenRecordCount() { + return writtenRecordCount.get(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index 1d1dd5c9bae6..cce59d3b6624 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -18,14 +18,12 @@ package org.apache.hudi.io.storage; -import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.avro.generic.IndexedRecord; - import java.io.IOException; public interface HoodieFileWriter { @@ -38,8 +36,8 @@ public interface HoodieFileWriter { void writeAvro(String key, R oldRecord) throws IOException; - default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { - String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); + default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, long recordIndex, String fileName) { + String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName); HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); return; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 7d0c307dbfe5..ffdff25738ed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -81,7 +81,7 @@ private static HoodieFi config.getParquetBlockSize(), config.getParquetPageSize(), config.getParquetMaxFileSize(), conf, config.getParquetCompressionRatio(), config.parquetDictionaryEnabled()); - return new HoodieParquetWriter<>(instantTime, path, parquetConfig, schema, taskContextSupplier, populateMetaFields); + return new HoodieAvroParquetWriter<>(path, parquetConfig, instantTime, taskContextSupplier, populateMetaFields); } static HoodieFileWriter newHFileFileWriter( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index 91f79cefa23d..f065608b29bd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -113,7 +113,7 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException { if (populateMetaFields) { prepRecordWithMetadata(key, avroRecord, instantTime, - taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); + taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement(), file.getName()); writeAvro(key.getRecordKey(), avroRecord); } else { writeAvro(key.getRecordKey(), avroRecord); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java index 17d5ead3efb7..a532ac66c987 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -97,7 +97,7 @@ public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Sc @Override public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException { prepRecordWithMetadata(key, avroRecord, instantTime, - taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName()); + taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX.getAndIncrement(), file.getName()); writeAvro(key.getRecordKey(), avroRecord); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index 007ad290aadd..6b847d4960fb 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -41,7 +41,7 @@ import org.apache.hudi.io.storage.HoodieAvroParquetConfig; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieOrcWriter; -import org.apache.hudi.io.storage.HoodieParquetWriter; +import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.avro.Schema; @@ -113,10 +113,9 @@ public Path withInserts(String partition, String fileId, List reco HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue())); - try (HoodieParquetWriter writer = new HoodieParquetWriter( - currentInstantTime, - new Path(Paths.get(basePath, partition, fileName).toString()), - config, schema, contextSupplier, populateMetaFields)) { + try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter<>( + new Path(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime, + contextSupplier, populateMetaFields)) { int seqId = 1; for (HoodieRecord record : records) { GenericRecord avroRecord = (GenericRecord) ((HoodieRecordPayload) record.getData()).getInsertValue(schema).get(); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index 518414d614e8..7b0c4dbdf2a9 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -379,7 +379,7 @@ public void testFileSizeUpsertRecords() throws Exception { List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB - for (int i = 0; i < 2000; i++) { + for (int i = 0; i < 2050; i++) { String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString() + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}"; RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); @@ -402,7 +402,8 @@ public void testFileSizeUpsertRecords() throws Exception { counts++; } } - assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file"); + // we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records. + assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); } @Test diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index 7e64d83879f0..5a0a60ea0750 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -19,11 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; - -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.hudi.io.storage.HoodieBaseParquetWriter; import org.apache.spark.sql.catalyst.InternalRow; import java.io.IOException; @@ -31,32 +27,16 @@ /** * Parquet's impl of {@link HoodieInternalRowFileWriter} to write {@link InternalRow}s. */ -public class HoodieInternalRowParquetWriter extends ParquetWriter +public class HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter implements HoodieInternalRowFileWriter { - private final Path file; - private final HoodieWrapperFileSystem fs; - private final long maxFileSize; private final HoodieRowParquetWriteSupport writeSupport; public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetConfig) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(), - DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()); - this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, - parquetConfig.getHadoopConf())); - this.maxFileSize = parquetConfig.getMaxFileSize() - + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); - this.writeSupport = parquetConfig.getWriteSupport(); - } + super(file, parquetConfig); - @Override - public boolean canWrite() { - return getDataSize() < maxFileSize; + this.writeSupport = parquetConfig.getWriteSupport(); } @Override @@ -69,9 +49,4 @@ public void writeRow(String key, InternalRow row) throws IOException { public void writeRow(InternalRow row) throws IOException { super.write(row); } - - @Override - public void close() throws IOException { - super.close(); - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java index b7f34ab2b24d..66016305d7ad 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java @@ -49,7 +49,7 @@ public void testGetFileWriter() throws IOException { SparkTaskContextSupplier supplier = new SparkTaskContextSupplier(); HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter(instantTime, parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); - assertTrue(parquetWriter instanceof HoodieParquetWriter); + assertTrue(parquetWriter instanceof HoodieAvroParquetWriter); // hfile format. final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 8114daa30f76..9574d35a6541 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -419,7 +419,7 @@ public void testFileSizeUpsertRecords() throws Exception { List records = new ArrayList<>(); // Approx 1150 records are written for block size of 64KB - for (int i = 0; i < 2000; i++) { + for (int i = 0; i < 2050; i++) { String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString() + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}"; RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); @@ -441,7 +441,8 @@ public void testFileSizeUpsertRecords() throws Exception { counts++; } } - assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file"); + // we check canWrite only once every 1000 records. and so 2 files with 1000 records and 3rd file with 50 records. + assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); } @Test