diff --git a/Base/src/main/java/io/deephaven/base/FileUtils.java b/Base/src/main/java/io/deephaven/base/FileUtils.java index 983c7d3a5b7..367c6896a05 100644 --- a/Base/src/main/java/io/deephaven/base/FileUtils.java +++ b/Base/src/main/java/io/deephaven/base/FileUtils.java @@ -8,6 +8,9 @@ import org.jetbrains.annotations.Nullable; import java.io.*; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Path; import java.util.ArrayList; public class FileUtils { @@ -249,4 +252,72 @@ public boolean accept(File pathname) { || (pathname.isFile() && (normalFileFilter == null || normalFileFilter.accept(pathname))); } } + + /** + * Take the file source path or URI string and convert it to a URI object. + * + * @param source The file source path or URI + * @param isDirectory Whether the source is a directory + * @return The URI object + */ + public static URI convertToURI(final String source, final boolean isDirectory) { + if (source.isEmpty()) { + throw new IllegalArgumentException("Cannot convert empty source to URI"); + } + final URI uri; + try { + uri = new URI(source); + } catch (final URISyntaxException e) { + // If the URI is invalid, assume it's a file path + return convertToURI(new File(source), isDirectory); + } + if (uri.getScheme() == null) { + // Convert to a "file" URI + return convertToURI(new File(source), isDirectory); + } + return uri; + } + + /** + * Takes a file and convert it to a URI object with {@code "file"} scheme. This method is preferred instead of + * {@link File#toURI()} because {@link File#toURI()} internally calls {@link File#isDirectory()}, which typically + * invokes the {@code stat} system call, resulting in filesystem metadata access. + * + * @param file The file + * @param isDirectory Whether the source file is a directory + * @return The URI object + */ + public static URI convertToURI(final File file, final boolean isDirectory) { + String absPath = file.getAbsolutePath(); + if (File.separatorChar != '/') { + absPath = absPath.replace(File.separatorChar, '/'); + } + if (absPath.charAt(0) != '/') { + absPath = "/" + absPath; + } + if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') { + absPath = absPath + "/"; + } + if (absPath.startsWith("//")) { + absPath = "//" + absPath; + } + try { + return new URI("file", null, absPath, null); + } catch (final URISyntaxException e) { + throw new IllegalStateException("Failed to convert file to URI: " + file, e); + } + } + + /** + * Takes a path and convert it to a URI object with {@code "file"} scheme. This method is preferred instead of + * {@link Path#toUri()} because {@link Path#toUri()} internally invokes the {@code stat} system call, resulting in + * filesystem metadata access. + * + * @param path The path + * @param isDirectory Whether the file is a directory + * @return The URI object + */ + public static URI convertToURI(final Path path, final boolean isDirectory) { + return convertToURI(path.toFile(), isDirectory); + } } diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java index ea25e8ee712..ba7bcf49c74 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProvider.java @@ -6,43 +6,22 @@ import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URI; -import java.net.URISyntaxException; import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.nio.file.Paths; -public interface SeekableChannelsProvider extends SafeCloseable { +import static io.deephaven.base.FileUtils.convertToURI; - /** - * Take the file source path or URI and convert it to a URI object. - * - * @param source The file source path or URI - * @return The URI object - */ - static URI convertToURI(final String source) { - final URI uri; - try { - uri = new URI(source); - } catch (final URISyntaxException e) { - // If the URI is invalid, assume it's a file path - return new File(source).toURI(); - } - if (uri.getScheme() == null) { - // Need to convert to a "file" URI - return new File(source).toURI(); - } - return uri; - } +public interface SeekableChannelsProvider extends SafeCloseable { /** * Wraps {@link SeekableChannelsProvider#getInputStream(SeekableByteChannel)} to ensure the channel's position is * incremented the exact amount that has been consumed from the resulting input stream. To remain valid, the caller * must ensure that the resulting input stream isn't re-wrapped by any downstream code in a way that would adversely - * effect the position (such as re-wrapping the resulting input stream with buffering). + * affect the position (such as re-wrapping the resulting input stream with buffering). * *

* Equivalent to {@code ChannelPositionInputStream.of(ch, provider.getInputStream(ch))}. @@ -79,7 +58,7 @@ default SeekableChannelContext makeSingleUseContext() { default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr) throws IOException { - return getReadChannel(channelContext, convertToURI(uriStr)); + return getReadChannel(channelContext, convertToURI(uriStr, false)); } SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri) diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java index e553331daa7..1166b3956b4 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java @@ -32,6 +32,7 @@ import java.util.NoSuchElementException; import java.util.function.Function; +import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY; import static org.apache.parquet.format.Encoding.RLE_DICTIONARY; @@ -120,7 +121,7 @@ private URI getURI() { return uri; } if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) { - return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri(); + return uri = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false); } else { // TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs return uri = rootURI; diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 3a2971b02fe..e97576f95a7 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -10,15 +10,15 @@ import org.apache.parquet.format.Type; import org.apache.parquet.schema.*; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.nio.channels.SeekableByteChannel; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; import java.util.*; -import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI; +import static io.deephaven.base.FileUtils.convertToURI; /** * Top level accessor for a parquet file which can read both from a file path string or a CLI style file URI, @@ -39,17 +39,29 @@ public class ParquetFileReader { private final URI rootURI; private final MessageType type; + /** + * Create a new ParquetFileReader for the provided source. + * + * @param source The source path or URI for the parquet file or the parquet metadata file + * @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file + */ public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider) throws IOException { - this(convertToURI(source), channelsProvider); + this(convertToURI(source, false), channelsProvider); } + /** + * Create a new ParquetFileReader for the provided source. + * + * @param parquetFileURI The URI for the parquet file or the parquet metadata file + * @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file + */ public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider) throws IOException { this.channelsProvider = channelsProvider; if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) { // Construct a new file URI for the parent directory - rootURI = Path.of(parquetFileURI).getParent().toUri(); + rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true); } else { // TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs rootURI = parquetFileURI; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index f2b470f9be7..66f93a7073b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -45,7 +45,7 @@ import java.nio.file.Paths; import java.util.*; -import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI; +import static io.deephaven.base.FileUtils.convertToURI; /** * API for writing DH tables in parquet format @@ -314,7 +314,7 @@ private static ParquetFileWriter getParquetFileWriter( final Map extraMetaData = new HashMap<>(tableMeta); extraMetaData.put(METADATA_KEY, tableInfoBuilder.build().serializeToJSON()); return new ParquetFileWriter(path, - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(path), null), + SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(path, false), null), writeInstructions.getTargetPageSize(), new HeapByteBufferAllocator(), mappedSchema.getParquetSchema(), writeInstructions.getCompressionCodecName(), extraMetaData); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 2e5fceb7e3b..dbbdfd88ad7 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -54,8 +54,8 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.*; +import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; -import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI; import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION; import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed; @@ -97,7 +97,7 @@ private ParquetTools() {} * @see ParquetFlatPartitionedLayout */ public static Table readTable(@NotNull final String source) { - return readTableInternal(convertToURI(source), ParquetInstructions.EMPTY); + return readTableInternal(convertParquetSourceToURI(source), ParquetInstructions.EMPTY); } /** @@ -128,7 +128,7 @@ public static Table readTable(@NotNull final String source) { public static Table readTable( @NotNull final String source, @NotNull final ParquetInstructions readInstructions) { - return readTableInternal(convertToURI(source), readInstructions); + return readTableInternal(convertParquetSourceToURI(source), readInstructions); } /** @@ -186,6 +186,19 @@ public static Table readTable( return readTableInternal(sourceFile, readInstructions); } + /** + * Convert a parquet source to a URI. + * + * @param source The path or URI of parquet file or directory to examine + * @return The URI + */ + private static URI convertParquetSourceToURI(@NotNull final String source) { + if (source.endsWith(".parquet")) { + return convertToURI(source, false); + } + return convertToURI(source, true); + } + /** * Write a table to a file. * @@ -961,7 +974,7 @@ public static Table readFlatPartitionedTable( public static Table readSingleFileTable( @NotNull final File file, @NotNull final ParquetInstructions readInstructions) { - return readSingleFileTable(file.toURI(), readInstructions); + return readSingleFileTable(convertToURI(file, false), readInstructions); } /** @@ -980,7 +993,7 @@ public static Table readSingleFileTable( public static Table readSingleFileTable( @NotNull final String source, @NotNull final ParquetInstructions readInstructions) { - return readSingleFileTable(convertToURI(source), readInstructions); + return readSingleFileTable(convertToURI(source, false), readInstructions); } private static Table readSingleFileTable( @@ -1007,7 +1020,7 @@ public static Table readSingleFileTable( @NotNull final File file, @NotNull final ParquetInstructions readInstructions, @NotNull final TableDefinition tableDefinition) { - return readSingleFileTable(file.toURI(), readInstructions, tableDefinition); + return readSingleFileTable(convertToURI(file, false), readInstructions, tableDefinition); } /** @@ -1025,7 +1038,7 @@ public static Table readSingleFileTable( @NotNull final String source, @NotNull final ParquetInstructions readInstructions, @NotNull final TableDefinition tableDefinition) { - return readSingleFileTable(convertToURI(source), readInstructions, tableDefinition); + return readSingleFileTable(convertToURI(source, false), readInstructions, tableDefinition); } private static Table readSingleFileTable( @@ -1105,7 +1118,7 @@ private static ParquetSchemaReader.ColumnDefinitionConsumer makeSchemaReaderCons * Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as * {@link TableDataException}. * - * @param parquetFile The {@link File} to read + * @param parquetFile The parquet file or the parquet metadata file * @param readInstructions the instructions for customizations while reading * @return The new {@link ParquetFileReader} */ @@ -1122,7 +1135,7 @@ public static ParquetFileReader getParquetFileReader(@NotNull final File parquet * Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as * {@link TableDataException}. * - * @param parquetFileURI The {@link URI} to read + * @param parquetFileURI The URI for the parquet file or the parquet metadata file * @param readInstructions the instructions for customizations while reading * @return The new {@link ParquetFileReader} */ @@ -1138,20 +1151,20 @@ public static ParquetFileReader getParquetFileReader(@NotNull final URI parquetF /** * Make a {@link ParquetFileReader} for the supplied {@link File}. * - * @param parquetFile The {@link File} to read + * @param parquetFile The parquet file or the parquet metadata file * @return The new {@link ParquetFileReader} * @throws IOException if an IO exception occurs */ public static ParquetFileReader getParquetFileReaderChecked( @NotNull final File parquetFile, @NotNull final ParquetInstructions readInstructions) throws IOException { - return getParquetFileReaderChecked(parquetFile.toURI(), readInstructions); + return getParquetFileReaderChecked(convertToURI(parquetFile, false), readInstructions); } /** * Make a {@link ParquetFileReader} for the supplied {@link URI}. * - * @param parquetFileURI The {@link URI} to read + * @param parquetFileURI The URI for the parquet file or the parquet metadata file * @return The new {@link ParquetFileReader} * @throws IOException if an IO exception occurs */ diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index 04ab35607c2..b8fdcb1c5be 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.stream.IntStream; +import static io.deephaven.base.FileUtils.convertToURI; + /** * {@link TableLocationKey} implementation for use with data stored in the parquet format. */ @@ -70,7 +72,7 @@ public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int orde } private static URI validateParquetFile(@NotNull final File file) { - return validateParquetFile(file.toURI()); + return validateParquetFile(convertToURI(file, false)); } private static URI validateParquetFile(@NotNull final URI parquetFileUri) { @@ -189,7 +191,7 @@ public synchronized int[] getRowGroupIndices() { // we're not expecting that in this code path. To support it, discovery tools should figure out // the row groups for a partition themselves and call setRowGroupReaders. final String filePath = rowGroups.get(rgi).getColumns().get(0).getFile_path(); - return filePath == null || new File(filePath).getAbsoluteFile().toURI().equals(uri); + return filePath == null || convertToURI(filePath, false).equals(uri); }).toArray(); } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index de729369f55..0dfeec49305 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -88,6 +88,7 @@ import javax.annotation.Nullable; +import static io.deephaven.base.FileUtils.convertToURI; import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.util.TableTools.booleanCol; import static io.deephaven.engine.util.TableTools.byteCol; @@ -874,6 +875,31 @@ private static void verifyFilesInDir(final File parentDir, final String[] expect } } + @Test + public void readFromDirTest() { + final File parentDir = new File(rootFile, "tempDir"); + parentDir.mkdir(); + final Table someTable = TableTools.emptyTable(5).update("A=(int)i"); + final File firstPartition = new File(parentDir, "X=A"); + final File firstDataFile = new File(firstPartition, "data.parquet"); + writeTable(someTable, firstDataFile); + final File secondPartition = new File(parentDir, "X=B"); + final File secondDataFile = new File(secondPartition, "data.parquet"); + writeTable(someTable, secondDataFile); + + final Table expected = readKeyValuePartitionedTable(parentDir, ParquetInstructions.EMPTY); + + String filePath = parentDir.getAbsolutePath(); + Table fromDisk = ParquetTools.readTable(filePath); + assertTableEquals(expected, fromDisk); + + // Read with a trailing slash + assertTrue(!filePath.endsWith("/")); + filePath = filePath + "/"; + fromDisk = ParquetTools.readTable(filePath); + assertTableEquals(expected, fromDisk); + } + /** * These are tests for writing a table to a parquet file and making sure there are no unnecessary files left in the * directory after we finish writing. @@ -927,7 +953,7 @@ public void basicWriteAndReadFromFileURITests() { final String filename = "basicWriteTests.parquet"; final File destFile = new File(rootFile, filename); final String absolutePath = destFile.getAbsolutePath(); - final URI fileURI = destFile.toURI(); + final URI fileURI = convertToURI(destFile, false); ParquetTools.writeTable(tableToSave, absolutePath); // Read from file URI @@ -1204,7 +1230,8 @@ public void partitionedParquetWithDotFilesTest() throws IOException { writeTable(someTable, firstDataFile); writeTable(someTable, secondDataFile); - Table partitionedTable = readKeyValuePartitionedTable(parentDir, EMPTY).select(); + final URI parentURI = convertToURI(parentDir, true); + final Table partitionedTable = readTable(parentURI.toString()).select(); final Set columnsSet = partitionedTable.getDefinition().getColumnNameSet(); assertTrue(columnsSet.size() == 2 && columnsSet.contains("A") && columnsSet.contains("X")); @@ -1214,14 +1241,14 @@ public void partitionedParquetWithDotFilesTest() throws IOException { final File dotDir = new File(firstPartition, ".dotDir"); assertTrue(dotDir.mkdir()); writeTable(someTable, new File(dotDir, "data.parquet")); - Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + Table fromDisk = readTable(parentURI.toString()); assertTableEquals(fromDisk, partitionedTable); // Add a dot parquet file in one of the partitions directory final Table anotherTable = TableTools.emptyTable(5).update("B=(int)i"); final File pqDotFile = new File(secondPartition, ".dotFile.parquet"); writeTable(anotherTable, pqDotFile); - fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + fromDisk = readTable(parentURI.toString()); assertTableEquals(fromDisk, partitionedTable); }