From 8547899a39168c8399d32ee7ee22f35dfe3f7c84 Mon Sep 17 00:00:00 2001 From: komao Date: Thu, 30 Jun 2022 20:48:50 +0800 Subject: [PATCH 1/3] =?UTF-8?q?[HUDI-4285]=20add=20ByteBuffer#rewind=20aft?= =?UTF-8?q?er=20ByteBuffer#get=20in=20AvroDeseria=E2=80=A6=20(#5907)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HUDI-4285] add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer * add ut Co-authored-by: wangzixuan.wzxuan --- .../apache/hudi/TestAvroConversionUtils.scala | 57 ++++++++++++++++++- .../spark/sql/avro/AvroDeserializer.scala | 2 + .../spark/sql/avro/AvroDeserializer.scala | 2 + .../spark/sql/avro/AvroDeserializer.scala | 2 + 4 files changed, 62 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala index bacd44753df3..16df1f869c6b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala @@ -18,8 +18,13 @@ package org.apache.hudi +import java.nio.ByteBuffer +import java.util.Objects import org.apache.avro.Schema -import org.apache.spark.sql.types.{DataTypes, StructType, StringType, ArrayType} +import org.apache.avro.generic.GenericData +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, DataTypes, MapType, StringType, StructField, StructType} import org.scalatest.{FunSuite, Matchers} class TestAvroConversionUtils extends FunSuite with Matchers { @@ -377,4 +382,54 @@ class TestAvroConversionUtils extends FunSuite with Matchers { assert(avroSchema.equals(expectedAvroSchema)) } + + test("test converter with binary") { + val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\"" + + ":[{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}") + val sparkSchema = StructType(List(StructField("col9", BinaryType, nullable = true))) + // create a test record with avroSchema + val avroRecord = new GenericData.Record(avroSchema) + val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53)) + avroRecord.put("col9", bb) + val row1 = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, sparkSchema).apply(avroRecord).get + val row2 = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, sparkSchema).apply(avroRecord).get + internalRowCompare(row1, row2, sparkSchema) + } + + private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = { + schema match { + case StructType(fields) => + val expectedRow = expected.asInstanceOf[InternalRow] + val actualRow = actual.asInstanceOf[InternalRow] + fields.zipWithIndex.foreach { case (field, i) => internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, field.dataType), field.dataType) } + case ArrayType(elementType, _) => + val expectedArray = expected.asInstanceOf[ArrayData].toSeq[Any](elementType) + val actualArray = actual.asInstanceOf[ArrayData].toSeq[Any](elementType) + if (expectedArray.size != actualArray.size) { + throw new AssertionError() + } else { + expectedArray.zip(actualArray).foreach { case (e1, e2) => internalRowCompare(e1, e2, elementType) } + } + case MapType(keyType, valueType, _) => + val expectedKeyArray = expected.asInstanceOf[MapData].keyArray() + val expectedValueArray = expected.asInstanceOf[MapData].valueArray() + val actualKeyArray = actual.asInstanceOf[MapData].keyArray() + val actualValueArray = actual.asInstanceOf[MapData].valueArray() + internalRowCompare(expectedKeyArray, actualKeyArray, ArrayType(keyType)) + internalRowCompare(expectedValueArray, actualValueArray, ArrayType(valueType)) + case StringType => if (checkNull(expected, actual) || !expected.toString.equals(actual.toString)) { + throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + case BinaryType => if (checkNull(expected, actual) || !expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]])) { + throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + case _ => if (!Objects.equals(expected, actual)) { + throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + } + } + + private def checkNull(left: Any, right: Any): Boolean = { + (left == null && right != null) || (left == null && right != null) + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 2e0946f1eb98..385577dd30b8 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -146,6 +146,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { case b: ByteBuffer => val bytes = new Array[Byte](b.remaining) b.get(bytes) + // Do not forget to reset the position + b.rewind() bytes case b: Array[Byte] => b case other => throw new RuntimeException(s"$other is not a valid avro binary.") diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 717df0f4076e..5fb6d907bdc8 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -167,6 +167,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, case b: ByteBuffer => val bytes = new Array[Byte](b.remaining) b.get(bytes) + // Do not forget to reset the position + b.rewind() bytes case b: Array[Byte] => b case other => throw new RuntimeException(s"$other is not a valid avro binary.") diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index ef9b5909207c..0b609330756e 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -181,6 +181,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, case b: ByteBuffer => val bytes = new Array[Byte](b.remaining) b.get(bytes) + // Do not forget to reset the position + b.rewind() bytes case b: Array[Byte] => b case other => From 397fd3014225c2c2ac3b563541425f2ce51eba54 Mon Sep 17 00:00:00 2001 From: miomiocat <284487410@qq.com> Date: Fri, 1 Jul 2022 01:00:13 +0800 Subject: [PATCH 2/3] [HUDI-3984] Remove mandatory check of partiton path for cli command (#5458) --- .../hudi/cli/commands/CompactionCommand.java | 2 +- .../cli/commands/FileSystemViewCommand.java | 2 +- .../commands/HDFSParquetImportCommand.java | 2 +- .../hudi/cli/commands/MetadataCommand.java | 10 +- .../commands/TestFileSystemViewCommand.java | 121 ++++++++++++++---- 5 files changed, 106 insertions(+), 31 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 097c68a542c4..d3845137c8e2 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -558,7 +558,7 @@ public String unscheduleCompaction( @CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId") public String unscheduleCompactFile( @CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId, - @CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath, + @CliOption(key = "partitionPath", unspecifiedDefaultValue = "", help = "partition path") final String partitionPath, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index a506c8030a55..792128c0b8ae 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -119,7 +119,7 @@ public String showAllFileSlices( @CliCommand(value = "show fsview latest", help = "Show latest file-system view") public String showLatestFileSlices( - @CliOption(key = {"partitionPath"}, help = "A valid partition path", mandatory = true) String partition, + @CliOption(key = {"partitionPath"}, help = "A valid partition path", unspecifiedDefaultValue = "") String partition, @CliOption(key = {"baseFileOnly"}, help = "Only display base file view", unspecifiedDefaultValue = "false") boolean baseFileOnly, @CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed", diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java index 93866cafcd32..5c6407cea144 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java @@ -53,7 +53,7 @@ public String convert( @CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, @CliOption(key = "tableType", mandatory = true, help = "Table type") final String tableType, @CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, - @CliOption(key = "partitionPathField", mandatory = true, + @CliOption(key = "partitionPathField", unspecifiedDefaultValue = "", help = "Partition path field name") final String partitionPathField, @CliOption(key = {"parallelism"}, mandatory = true, help = "Parallelism for hoodie insert") final String parallelism, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 637f1393f51a..e3d25e06b886 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieBackedTableMetadata; @@ -225,7 +226,7 @@ public String listPartitions( @CliCommand(value = "metadata list-files", help = "Print a list of all files in a partition from the metadata") public String listFiles( - @CliOption(key = {"partition"}, help = "Name of the partition to list files", mandatory = true) final String partition) throws IOException { + @CliOption(key = {"partition"}, help = "Name of the partition to list files", unspecifiedDefaultValue = "") final String partition) throws IOException { HoodieCLI.getTableMetaClient(); HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata( @@ -235,8 +236,13 @@ public String listFiles( return "[ERROR] Metadata Table not enabled/initialized\n\n"; } + Path partitionPath = new Path(HoodieCLI.basePath); + if (!StringUtils.isNullOrEmpty(partition)) { + partitionPath = new Path(HoodieCLI.basePath, partition); + } + HoodieTimer timer = new HoodieTimer().startTimer(); - FileStatus[] statuses = metaReader.getAllFilesInPartition(new Path(HoodieCLI.basePath, partition)); + FileStatus[] statuses = metaReader.getAllFilesInPartition(partitionPath); LOG.debug("Took " + timer.endTimer() + " ms"); final List rows = new ArrayList<>(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestFileSystemViewCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestFileSystemViewCommand.java index d5c535ebfe00..b6813a2146f8 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestFileSystemViewCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestFileSystemViewCommand.java @@ -59,24 +59,73 @@ @Tag("functional") public class TestFileSystemViewCommand extends CLIFunctionalTestHarness { + private String nonpartitionedTablePath; + private String partitionedTablePath; private String partitionPath; - private SyncableFileSystemView fsView; + private SyncableFileSystemView nonpartitionedFsView; + private SyncableFileSystemView partitionedFsView; @BeforeEach public void init() throws IOException { + createNonpartitionedTable(); + createPartitionedTable(); + } + + private void createNonpartitionedTable() throws IOException { HoodieCLI.conf = hadoopConf(); // Create table and connect - String tableName = tableName(); - String tablePath = tablePath(tableName); + String nonpartitionedTableName = "nonpartitioned_" + tableName(); + nonpartitionedTablePath = tablePath(nonpartitionedTableName); new TableCommand().createTable( - tablePath, tableName, - "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); + nonpartitionedTablePath, nonpartitionedTableName, + "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + Files.createDirectories(Paths.get(nonpartitionedTablePath)); + + // Generate 2 commits + String commitTime1 = "3"; + String commitTime2 = "4"; + + String fileId1 = UUID.randomUUID().toString(); + + // Write date files and log file + String testWriteToken = "2-0-2"; + Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils + .makeBaseFileName(commitTime1, testWriteToken, fileId1))); + Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils + .makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0, testWriteToken))); + Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils + .makeBaseFileName(commitTime2, testWriteToken, fileId1))); + Files.createFile(Paths.get(nonpartitionedTablePath, FSUtils + .makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken))); + + // Write commit files + Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime1 + ".commit")); + Files.createFile(Paths.get(nonpartitionedTablePath, ".hoodie", commitTime2 + ".commit")); + + // Reload meta client and create fsView + metaClient = HoodieTableMetaClient.reload(metaClient); + + nonpartitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true); + } + + private void createPartitionedTable() throws IOException { + HoodieCLI.conf = hadoopConf(); + + // Create table and connect + String partitionedTableName = "partitioned_" + tableName(); + partitionedTablePath = tablePath(partitionedTableName); + new TableCommand().createTable( + partitionedTablePath, partitionedTableName, + "COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload"); HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); partitionPath = HoodieTestCommitMetadataGenerator.DEFAULT_FIRST_PARTITION_PATH; - String fullPartitionPath = Paths.get(tablePath, partitionPath).toString(); + String fullPartitionPath = Paths.get(partitionedTablePath, partitionPath).toString(); Files.createDirectories(Paths.get(fullPartitionPath)); // Generate 2 commits @@ -97,13 +146,13 @@ public void init() throws IOException { .makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, testWriteToken))); // Write commit files - Files.createFile(Paths.get(tablePath, ".hoodie", commitTime1 + ".commit")); - Files.createFile(Paths.get(tablePath, ".hoodie", commitTime2 + ".commit")); + Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime1 + ".commit")); + Files.createFile(Paths.get(partitionedTablePath, ".hoodie", commitTime2 + ".commit")); // Reload meta client and create fsView metaClient = HoodieTableMetaClient.reload(metaClient); - fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true); + partitionedFsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline(), true); } /** @@ -116,7 +165,7 @@ public void testShowCommits() { assertTrue(cr.isSuccess()); // Get all file groups - Stream fileGroups = fsView.getAllFileGroups(partitionPath); + Stream fileGroups = partitionedFsView.getAllFileGroups(partitionPath); List rows = new ArrayList<>(); fileGroups.forEach(fg -> fg.getAllFileSlices().forEach(fs -> { @@ -164,7 +213,7 @@ public void testShowCommitsWithSpecifiedValues() { assertTrue(cr.isSuccess()); List rows = new ArrayList<>(); - Stream fileGroups = fsView.getAllFileGroups(partitionPath); + Stream fileGroups = partitionedFsView.getAllFileGroups(partitionPath); // Only get instant 1, since maxInstant was specified 2 fileGroups.forEach(fg -> fg.getAllFileSlices().filter(fs -> fs.getBaseInstantTime().equals("1")).forEach(fs -> { @@ -197,17 +246,7 @@ public void testShowCommitsWithSpecifiedValues() { assertEquals(expected, got); } - /** - * Test case for command 'show fsview latest'. - */ - @Test - public void testShowLatestFileSlices() { - // Test show with partition path '2016/03/15' - CommandResult cr = shell().executeCommand("show fsview latest --partitionPath " + partitionPath); - assertTrue(cr.isSuccess()); - - Stream fileSlice = fsView.getLatestFileSlices(partitionPath); - + private List fileSlicesToCRList(Stream fileSlice, String partitionPath) { List rows = new ArrayList<>(); fileSlice.forEach(fs -> { int idx = 0; @@ -245,7 +284,14 @@ public void testShowLatestFileSlices() { .collect(Collectors.toList()).toString(); rows.add(row); }); + return rows; + } + /**( + * Test case for command 'show fsview latest'. + */ + @Test + public void testShowLatestFileSlices() throws IOException { Function converterFunction = entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))); Map> fieldNameToConverterMap = new HashMap<>(); @@ -267,9 +313,32 @@ public void testShowLatestFileSlices() { .addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_BASE_UNSCHEDULED) .addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_SCHEDULED) .addTableHeaderField(HoodieTableHeaderFields.HEADER_DELTA_FILES_UNSCHEDULED); - String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows); - expected = removeNonWordAndStripSpace(expected); - String got = removeNonWordAndStripSpace(cr.getResult().toString()); - assertEquals(expected, got); + + // Test show with partition path '2016/03/15' + new TableCommand().connect(partitionedTablePath, null, false, 0, 0, 0); + CommandResult partitionedTableCR = shell().executeCommand("show fsview latest --partitionPath " + partitionPath); + assertTrue(partitionedTableCR.isSuccess()); + + Stream partitionedFileSlice = partitionedFsView.getLatestFileSlices(partitionPath); + + List partitionedRows = fileSlicesToCRList(partitionedFileSlice, partitionPath); + String partitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, partitionedRows); + partitionedExpected = removeNonWordAndStripSpace(partitionedExpected); + String partitionedResults = removeNonWordAndStripSpace(partitionedTableCR.getResult().toString()); + assertEquals(partitionedExpected, partitionedResults); + + // Test show for non-partitioned table + new TableCommand().connect(nonpartitionedTablePath, null, false, 0, 0, 0); + CommandResult nonpartitionedTableCR = shell().executeCommand("show fsview latest"); + assertTrue(nonpartitionedTableCR.isSuccess()); + + Stream nonpartitionedFileSlice = nonpartitionedFsView.getLatestFileSlices(""); + + List nonpartitionedRows = fileSlicesToCRList(nonpartitionedFileSlice, ""); + + String nonpartitionedExpected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, nonpartitionedRows); + nonpartitionedExpected = removeNonWordAndStripSpace(nonpartitionedExpected); + String nonpartitionedResults = removeNonWordAndStripSpace(nonpartitionedTableCR.getResult().toString()); + assertEquals(nonpartitionedExpected, nonpartitionedResults); } } From 62a0c962aceae5d1c803d48c943c2155ec3ef5f1 Mon Sep 17 00:00:00 2001 From: RexAn Date: Fri, 1 Jul 2022 02:07:40 +0800 Subject: [PATCH 3/3] [HUDI-3634] Could read empty or partial HoodieCommitMetaData in downstream if using HDFS (#5048) Add the differentiated logic of creating immutable file in HDFS by first creating the file.tmp and then renaming the file --- .../common/fs/HoodieWrapperFileSystem.java | 64 +++++++++++++++++++ .../table/timeline/HoodieActiveTimeline.java | 31 +-------- .../timeline/TestHoodieActiveTimeline.java | 44 +++++++++++++ 3 files changed, 110 insertions(+), 29 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index a79d1571afe7..bceea8e3671c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -60,6 +61,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; +import static org.apache.hudi.common.fs.StorageSchemes.HDFS; + /** * HoodieWrapperFileSystem wraps the default file system. It holds state about the open streams in the file system to * support getting the written size to each of the open streams. @@ -68,6 +71,8 @@ public class HoodieWrapperFileSystem extends FileSystem { public static final String HOODIE_SCHEME_PREFIX = "hoodie-"; + private static final String TMP_PATH_POSTFIX = ".tmp"; + protected enum MetricName { create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write } @@ -986,6 +991,65 @@ public long getBytesWritten(Path file) { file.toString() + " does not have a open stream. Cannot get the bytes written on the stream"); } + protected boolean needCreateTempFile() { + return HDFS.getScheme().equals(fileSystem.getScheme()); + } + + /** + * Creates a new file with overwrite set to false. This ensures files are created + * only once and never rewritten, also, here we take care if the content is not + * empty, will first write the content to a temp file if {needCreateTempFile} is + * true, and then rename it back after the content is written. + * + * @param fullPath File Path + * @param content Content to be stored + */ + public void createImmutableFileInPath(Path fullPath, Option content) + throws HoodieIOException { + FSDataOutputStream fsout = null; + Path tmpPath = null; + + boolean needTempFile = needCreateTempFile(); + + try { + if (!content.isPresent()) { + fsout = fileSystem.create(fullPath, false); + } + + if (content.isPresent() && needTempFile) { + Path parent = fullPath.getParent(); + tmpPath = new Path(parent, fullPath.getName() + TMP_PATH_POSTFIX); + fsout = fileSystem.create(tmpPath, false); + fsout.write(content.get()); + } + + if (content.isPresent() && !needTempFile) { + fsout = fileSystem.create(fullPath, false); + fsout.write(content.get()); + } + } catch (IOException e) { + String errorMsg = "Failed to create file" + (tmpPath != null ? tmpPath : fullPath); + throw new HoodieIOException(errorMsg, e); + } finally { + try { + if (null != fsout) { + fsout.close(); + } + } catch (IOException e) { + String errorMsg = "Failed to close file" + (needTempFile ? tmpPath : fullPath); + throw new HoodieIOException(errorMsg, e); + } + + try { + if (null != tmpPath) { + fileSystem.rename(tmpPath, fullPath); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to rename " + tmpPath + " to the target " + fullPath, e); + } + } + } + public FileSystem getFileSystem() { return fileSystem; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 6e7f6a2430ed..b3dbe422b10a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -30,7 +30,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -539,7 +538,7 @@ protected void transitionState(HoodieInstant fromInstant, HoodieInstant toInstan if (allowRedundantTransitions) { FileIOUtils.createFileInPath(metaClient.getFs(), getInstantFileNamePath(toInstant.getFileName()), data); } else { - createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data); + metaClient.getFs().createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data); } LOG.info("Create new file for toInstant ?" + getInstantFileNamePath(toInstant.getFileName())); } @@ -706,33 +705,7 @@ protected void createFileInMetaPath(String filename, Option content, boo if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content); } else { - createImmutableFileInPath(fullPath, content); - } - } - - /** - * Creates a new file in timeline with overwrite set to false. This ensures - * files are created only once and never rewritten - * @param fullPath File Path - * @param content Content to be stored - */ - private void createImmutableFileInPath(Path fullPath, Option content) { - FSDataOutputStream fsout = null; - try { - fsout = metaClient.getFs().create(fullPath, false); - if (content.isPresent()) { - fsout.write(content.get()); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to create file " + fullPath, e); - } finally { - try { - if (null != fsout) { - fsout.close(); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to close file " + fullPath, e); - } + metaClient.getFs().createImmutableFileInPath(fullPath, content); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 767a16f0c035..5692337471a9 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.fs.NoOpConsistencyGuard; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -32,6 +34,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; @@ -45,6 +48,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -199,6 +203,25 @@ public void testTimelineOperations() { assertTrue(activeCommitTimeline.isBeforeTimelineStarts("00")); } + @Test + public void testAllowTempCommit() { + shouldAllowTempCommit(true, hoodieMetaClient -> { + timeline = new HoodieActiveTimeline(hoodieMetaClient); + + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1"); + timeline.createNewInstant(instant1); + + byte[] data = "commit".getBytes(StandardCharsets.UTF_8); + timeline.saveAsComplete(new HoodieInstant(true, instant1.getAction(), + instant1.getTimestamp()), Option.of(data)); + + timeline = timeline.reload(); + + assertTrue(timeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent()); + assertEquals(instant1.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp()); + }); + } + @Test public void testGetContiguousCompletedWriteTimeline() { // a mock timeline with holes @@ -594,4 +617,25 @@ private List getAllInstants() { } return allInstants; } + + private void shouldAllowTempCommit(boolean allowTempCommit, Consumer fun) { + if (allowTempCommit) { + HoodieWrapperFileSystem fs = metaClient.getFs(); + HoodieWrapperFileSystem newFs = new HoodieWrapperFileSystem(fs.getFileSystem(), new NoOpConsistencyGuard()) { + @Override + protected boolean needCreateTempFile() { + return true; + } + }; + metaClient.setFs(newFs); + try { + fun.accept(metaClient); + } finally { + metaClient.setFs(fs); + } + return; + } + fun.accept(metaClient); + } + }