Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fix for File.toURI regression #5247

Merged
merged 6 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import org.jetbrains.annotations.Nullable;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;

public class FileUtils {
Expand Down Expand Up @@ -249,4 +252,72 @@ public boolean accept(File pathname) {
|| (pathname.isFile() && (normalFileFilter == null || normalFileFilter.accept(pathname)));
}
}

/**
* Take the file source path or URI string and convert it to a URI object.
*
* @param source The file source path or URI
* @param isDirectory Whether the source is a directory
* @return The URI object
*/
public static URI convertToURI(final String source, final boolean isDirectory) {
if (source.isEmpty()) {
throw new IllegalArgumentException("Cannot convert empty source to URI");
}
final URI uri;
try {
uri = new URI(source);
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return convertToURI(new File(source), isDirectory);
}
if (uri.getScheme() == null) {
// Convert to a "file" URI
return convertToURI(new File(source), isDirectory);
}
return uri;
}

/**
* Takes a file and convert it to a URI object with {@code "file"} scheme. This method is preferred instead of
* {@link File#toURI()} because {@link File#toURI()} internally calls {@link File#isDirectory()}, which typically
* invokes the {@code stat} system call, resulting in filesystem metadata access.
*
* @param file The file
* @param isDirectory Whether the source file is a directory
* @return The URI object
*/
public static URI convertToURI(final File file, final boolean isDirectory) {
String absPath = file.getAbsolutePath();
if (File.separatorChar != '/') {
absPath = absPath.replace(File.separatorChar, '/');
}
if (absPath.charAt(0) != '/') {
absPath = "/" + absPath;
}
if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') {
absPath = absPath + "/";
}
if (absPath.startsWith("//")) {
absPath = "//" + absPath;
}
try {
return new URI("file", null, absPath, null);
} catch (final URISyntaxException e) {
throw new IllegalStateException("Failed to convert file to URI: " + file, e);
}
}

/**
* Takes a path and convert it to a URI object with {@code "file"} scheme. This method is preferred instead of
* {@link Path#toUri()} because {@link Path#toUri()} internally invokes the {@code stat} system call, resulting in
* filesystem metadata access.
*
* @param path The path
* @param isDirectory Whether the file is a directory
* @return The URI object
*/
public static URI convertToURI(final Path path, final boolean isDirectory) {
return convertToURI(path.toFile(), isDirectory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,22 @@
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;

public interface SeekableChannelsProvider extends SafeCloseable {
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Take the file source path or URI and convert it to a URI object.
*
* @param source The file source path or URI
* @return The URI object
*/
static URI convertToURI(final String source) {
final URI uri;
try {
uri = new URI(source);
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return new File(source).toURI();
}
if (uri.getScheme() == null) {
// Need to convert to a "file" URI
return new File(source).toURI();
}
return uri;
}
public interface SeekableChannelsProvider extends SafeCloseable {

rcaudy marked this conversation as resolved.
Show resolved Hide resolved
/**
* Wraps {@link SeekableChannelsProvider#getInputStream(SeekableByteChannel)} to ensure the channel's position is
* incremented the exact amount that has been consumed from the resulting input stream. To remain valid, the caller
* must ensure that the resulting input stream isn't re-wrapped by any downstream code in a way that would adversely
* effect the position (such as re-wrapping the resulting input stream with buffering).
* affect the position (such as re-wrapping the resulting input stream with buffering).
*
* <p>
* Equivalent to {@code ChannelPositionInputStream.of(ch, provider.getInputStream(ch))}.
Expand Down Expand Up @@ -79,7 +58,7 @@ default SeekableChannelContext makeSingleUseContext() {

default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
throws IOException {
return getReadChannel(channelContext, convertToURI(uriStr));
return getReadChannel(channelContext, convertToURI(uriStr, false));
}

SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;
Expand Down Expand Up @@ -120,7 +121,7 @@ private URI getURI() {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
return uri = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;

import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Top level accessor for a parquet file which can read both from a file path string or a CLI style file URI,
Expand All @@ -39,17 +39,29 @@ public class ParquetFileReader {
private final URI rootURI;
private final MessageType type;

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source), channelsProvider);
this(convertToURI(source, false), channelsProvider);
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
// Construct a new file URI for the parent directory
rootURI = Path.of(parquetFileURI).getParent().toUri();
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
rootURI = parquetFileURI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.nio.file.Paths;
import java.util.*;

import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;
import static io.deephaven.base.FileUtils.convertToURI;

/**
* API for writing DH tables in parquet format
Expand Down Expand Up @@ -314,7 +314,7 @@ private static ParquetFileWriter getParquetFileWriter(
final Map<String, String> extraMetaData = new HashMap<>(tableMeta);
extraMetaData.put(METADATA_KEY, tableInfoBuilder.build().serializeToJSON());
return new ParquetFileWriter(path,
SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(path), null),
SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(path, false), null),
writeInstructions.getTargetPageSize(),
new HeapByteBufferAllocator(), mappedSchema.getParquetSchema(),
writeInstructions.getCompressionCodecName(), extraMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;
import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed;

Expand Down Expand Up @@ -97,7 +97,7 @@ private ParquetTools() {}
* @see ParquetFlatPartitionedLayout
*/
public static Table readTable(@NotNull final String source) {
return readTableInternal(convertToURI(source), ParquetInstructions.EMPTY);
return readTableInternal(convertParquetSourceToURI(source), ParquetInstructions.EMPTY);
}

/**
Expand Down Expand Up @@ -128,7 +128,7 @@ public static Table readTable(@NotNull final String source) {
public static Table readTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions) {
return readTableInternal(convertToURI(source), readInstructions);
return readTableInternal(convertParquetSourceToURI(source), readInstructions);
}

/**
Expand Down Expand Up @@ -186,6 +186,19 @@ public static Table readTable(
return readTableInternal(sourceFile, readInstructions);
}

/**
* Convert a parquet source to a URI.
*
* @param source The path or URI of parquet file or directory to examine
* @return The URI
*/
private static URI convertParquetSourceToURI(@NotNull final String source) {
if (source.endsWith(".parquet")) {
return convertToURI(source, false);
}
return convertToURI(source, true);
}

/**
* Write a table to a file.
*
Expand Down Expand Up @@ -961,7 +974,7 @@ public static Table readFlatPartitionedTable(
public static Table readSingleFileTable(
@NotNull final File file,
@NotNull final ParquetInstructions readInstructions) {
return readSingleFileTable(file.toURI(), readInstructions);
return readSingleFileTable(convertToURI(file, false), readInstructions);
}

/**
Expand All @@ -980,7 +993,7 @@ public static Table readSingleFileTable(
public static Table readSingleFileTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions) {
return readSingleFileTable(convertToURI(source), readInstructions);
return readSingleFileTable(convertToURI(source, false), readInstructions);
}

private static Table readSingleFileTable(
Expand All @@ -1007,7 +1020,7 @@ public static Table readSingleFileTable(
@NotNull final File file,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readSingleFileTable(file.toURI(), readInstructions, tableDefinition);
return readSingleFileTable(convertToURI(file, false), readInstructions, tableDefinition);
}

/**
Expand All @@ -1025,7 +1038,7 @@ public static Table readSingleFileTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readSingleFileTable(convertToURI(source), readInstructions, tableDefinition);
return readSingleFileTable(convertToURI(source, false), readInstructions, tableDefinition);
}

private static Table readSingleFileTable(
Expand Down Expand Up @@ -1105,7 +1118,7 @@ private static ParquetSchemaReader.ColumnDefinitionConsumer makeSchemaReaderCons
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link TableDataException}.
*
* @param parquetFile The {@link File} to read
* @param parquetFile The parquet file or the parquet metadata file
* @param readInstructions the instructions for customizations while reading
* @return The new {@link ParquetFileReader}
*/
Expand All @@ -1122,7 +1135,7 @@ public static ParquetFileReader getParquetFileReader(@NotNull final File parquet
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link TableDataException}.
*
* @param parquetFileURI The {@link URI} to read
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param readInstructions the instructions for customizations while reading
* @return The new {@link ParquetFileReader}
*/
Expand All @@ -1138,20 +1151,20 @@ public static ParquetFileReader getParquetFileReader(@NotNull final URI parquetF
/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The {@link File} to read
* @param parquetFile The parquet file or the parquet metadata file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader getParquetFileReaderChecked(
@NotNull final File parquetFile,
@NotNull final ParquetInstructions readInstructions) throws IOException {
return getParquetFileReaderChecked(parquetFile.toURI(), readInstructions);
return getParquetFileReaderChecked(convertToURI(parquetFile, false), readInstructions);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The {@link URI} to read
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.stream.IntStream;

import static io.deephaven.base.FileUtils.convertToURI;

/**
* {@link TableLocationKey} implementation for use with data stored in the parquet format.
*/
Expand Down Expand Up @@ -70,7 +72,7 @@ public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int orde
}

private static URI validateParquetFile(@NotNull final File file) {
return validateParquetFile(file.toURI());
return validateParquetFile(convertToURI(file, false));
}

private static URI validateParquetFile(@NotNull final URI parquetFileUri) {
Expand Down Expand Up @@ -189,7 +191,7 @@ public synchronized int[] getRowGroupIndices() {
// we're not expecting that in this code path. To support it, discovery tools should figure out
// the row groups for a partition themselves and call setRowGroupReaders.
final String filePath = rowGroups.get(rgi).getColumns().get(0).getFile_path();
return filePath == null || new File(filePath).getAbsoluteFile().toURI().equals(uri);
return filePath == null || convertToURI(filePath, false).equals(uri);
}).toArray();
}

Expand Down
Loading
Loading