Skip to content

Commit

Permalink
Review with Ryan part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed May 7, 2024
1 parent 6806c1f commit dffaa26
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 310 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* A service loader class for loading {@link SeekableChannelsProviderPlugin} implementations at runtime and provide
* {@link SeekableChannelsProvider} implementations for different URI schemes, e.g., S3.
*/
public final class SeekableChannelsProviderLoader implements SeekableChannelsProviderFactory {
public final class SeekableChannelsProviderLoader {

private static volatile SeekableChannelsProviderLoader instance;

Expand All @@ -37,16 +37,16 @@ private SeekableChannelsProviderLoader() {
}

/**
* Create a new {@link SeekableChannelsProvider} based on given URI and object using the plugins loaded by the
* {@link ServiceLoader}. For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can
* read files from S3.
* Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the
* plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a
* {@link SeekableChannelsProvider} which can read files from S3.
*
* @param uri The URI
* @param specialInstructions An optional object to pass special instructions to the provider.
* @return A {@link SeekableChannelsProvider} for the given URI.
*/
@Override
public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object specialInstructions) {
public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri,
@Nullable final Object specialInstructions) {
for (final SeekableChannelsProviderPlugin plugin : providers) {
if (plugin.isCompatible(uri, specialInstructions)) {
return plugin.createProvider(uri, specialInstructions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public FileKeyValuePartitionLayout(
@NotNull final BiFunction<Path, Map<String, Comparable<?>>, TLK> keyFactory,
final int maxPartitioningLevels) {
super(keyFactory);
// TODO This can be an issue because table location keys are being generated for us here. So they would
// internally
// make a different provider for each location key. I know that enterprise uses this constructor.
// Should I just break the constructor of ParquetTableLocationKey and force add a channels provider?
this.tableRootDirectory = tableRootDirectory;
this.pathFilter = pathFilter;
this.locationTableBuilderFactory = locationTableBuilderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3890,7 +3890,7 @@ public void testMultiPartitionSymbolTableBy() throws IOException {
t4.updateView("Date=`2021-07-21`", "Num=400")).moveColumnsUp("Date", "Num");

final Table loaded = ParquetTools.readPartitionedTableInferSchema(
new ParquetKeyValuePartitionedLayout(testRootFile, 2, ParquetInstructions.EMPTY),
new ParquetKeyValuePartitionedLayout(testRootFile.toURI(), 2, ParquetInstructions.EMPTY),
ParquetInstructions.EMPTY);

// verify the sources are identical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
//
package io.deephaven.parquet.base;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.format.*;
import org.apache.parquet.format.ColumnOrder;
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.util.*;
Expand All @@ -44,94 +42,50 @@ public class ParquetFileReader {

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
final SeekableChannelsProvider channelsProvider) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFile, channelsProvider);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFile, e);
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link UncheckedDeephavenException}.
* {@link UncheckedIOException}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final URI parquetFileURI,
@Nullable final SeekableChannelsProvider channelsProvider) {
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return createChecked(parquetFileURI, channelsProvider);
} catch (IOException e) {
throw new UncheckedDeephavenException("Failed to create Parquet file reader: " + parquetFileURI, e);
return new ParquetFileReader(parquetFileURI, channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFileURI, e);
}
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final File parquetFile,
final SeekableChannelsProvider channelsProvider) throws IOException {
return createChecked(convertToURI(parquetFile, false), channelsProvider);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader createChecked(
@NotNull final URI parquetFileURI,
final SeekableChannelsProvider channelsProvider) throws IOException {
return new ParquetFileReader(parquetFileURI, new CachedChannelProvider(channelsProvider, 1 << 7));
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*
* @deprecated: Use {@link #createChecked(URI, SeekableChannelsProvider)} instead
*/
@Deprecated
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source, false), channelsProvider);
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*
* @deprecated: Use {@link #createChecked(URI, SeekableChannelsProvider)} instead
* @param provider The {@link SeekableChannelsProvider} to use for reading the file
*/
@Deprecated
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
private ParquetFileReader(
@NotNull final URI parquetFileURI,
@NotNull final SeekableChannelsProvider provider) throws IOException {
this.channelsProvider = new CachedChannelProvider(provider, 1 << 7);
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
Expand Down Expand Up @@ -270,7 +224,7 @@ private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {

/**
* Create a {@link RowGroupReader} object for provided row group number
*
*
* @param version The "version" string from deephaven specific parquet metadata, or null if it's not present.
*/
public RowGroupReader getRowGroup(final int groupNumber, final String version) {
Expand Down Expand Up @@ -506,7 +460,7 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedTyp

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderFactory;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -103,9 +100,6 @@ public static int getDefaltMaximumDictionarySize() {

private static final boolean DEFAULT_IS_REFRESHING = false;

private static final SeekableChannelsProviderFactory DEFAULT_CHANNELS_PROVIDER_FACTORY =
SeekableChannelsProviderLoader.getInstance();

/**
* Set the default target page size (in bytes) used to section rows of data into pages during column writing. This
* number should be no smaller than {@link #MIN_TARGET_PAGE_SIZE}.
Expand Down Expand Up @@ -225,15 +219,6 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par

public abstract Optional<Collection<List<String>>> getIndexColumns();

/**
* @return a {@link SeekableChannelsProvider} compatible for reading and writing to the given parquet file URI.
*
* @param parquetFileURI the URI of the parquet file
* @param specialInstructions An optional object to pass special instructions to the provider
*/
public abstract SeekableChannelsProvider getChannelsProvider(URI parquetFileURI,
@Nullable Object specialInstructions);

/**
* Creates a new {@link ParquetInstructions} object with the same properties as the current object but definition
* set as the provided {@link TableDefinition}.
Expand Down Expand Up @@ -365,16 +350,6 @@ public Optional<Collection<List<String>>> getIndexColumns() {
return Optional.empty();
}

@Override
public SeekableChannelsProvider getChannelsProvider(final URI parquetFileURI,
@Nullable final Object specialInstructions) {
return DEFAULT_CHANNELS_PROVIDER_FACTORY.createProvider(parquetFileURI, specialInstructions);
}

private SeekableChannelsProviderFactory getChannelsProviderFactory() {
return DEFAULT_CHANNELS_PROVIDER_FACTORY;
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition tableDefinition) {
return withTableDefinitionAndLayout(tableDefinition, null);
Expand All @@ -387,15 +362,15 @@ public ParquetInstructions withTableDefinitionAndLayout(
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
fileLayout, tableDefinition, null, getChannelsProviderFactory());
fileLayout, tableDefinition, null);
}

@Override
ParquetInstructions withIndexColumns(final Collection<List<String>> indexColumns) {
return new ReadOnly(null, null, getCompressionCodecName(), getMaximumDictionaryKeys(),
getMaximumDictionarySize(), isLegacyParquet(), getTargetPageSize(), isRefreshing(),
getSpecialInstructions(), generateMetadataFiles(), baseNameForPartitionedParquetData(),
null, null, indexColumns, getChannelsProviderFactory());
null, null, indexColumns);
}
};

Expand Down Expand Up @@ -470,7 +445,6 @@ private static final class ReadOnly extends ParquetInstructions {
private final ParquetFileLayout fileLayout;
private final TableDefinition tableDefinition;
private final Collection<List<String>> indexColumns;
private final SeekableChannelsProviderFactory channelsProviderFactory;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -486,8 +460,7 @@ private ReadOnly(
final String baseNameForPartitionedParquetData,
final ParquetFileLayout fileLayout,
final TableDefinition tableDefinition,
final Collection<List<String>> indexColumns,
final SeekableChannelsProviderFactory channelsProviderFactory) {
final Collection<List<String>> indexColumns) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -505,7 +478,6 @@ private ReadOnly(
: indexColumns.stream()
.map(List::copyOf)
.collect(Collectors.toUnmodifiableList());
this.channelsProviderFactory = Require.neqNull(channelsProviderFactory, "channelsProviderFactory");
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -625,16 +597,6 @@ public Optional<Collection<List<String>>> getIndexColumns() {
return Optional.ofNullable(indexColumns);
}

@Override
public SeekableChannelsProvider getChannelsProvider(final URI parquetFileURI,
@Nullable final Object specialInstructions) {
return channelsProviderFactory.createProvider(parquetFileURI, specialInstructions);
}

private SeekableChannelsProviderFactory getChannelsProviderFactory() {
return channelsProviderFactory;
}

@Override
public ParquetInstructions withTableDefinition(@Nullable final TableDefinition useDefinition) {
return withTableDefinitionAndLayout(useDefinition, getFileLayout().orElse(null));
Expand All @@ -648,7 +610,7 @@ public ParquetInstructions withTableDefinitionAndLayout(
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), useLayout, useDefinition,
indexColumns, channelsProviderFactory);
indexColumns);
}

@Override
Expand All @@ -657,7 +619,7 @@ ParquetInstructions withIndexColumns(final Collection<List<String>> useIndexColu
getCompressionCodecName(), getMaximumDictionaryKeys(), getMaximumDictionarySize(),
isLegacyParquet(), getTargetPageSize(), isRefreshing(), getSpecialInstructions(),
generateMetadataFiles(), baseNameForPartitionedParquetData(), fileLayout,
tableDefinition, useIndexColumns, channelsProviderFactory);
tableDefinition, useIndexColumns);
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
Expand Down Expand Up @@ -716,7 +678,6 @@ public static class Builder {
private ParquetFileLayout fileLayout;
private TableDefinition tableDefinition;
private Collection<List<String>> indexColumns;
private SeekableChannelsProviderFactory channelsProviderFactory = DEFAULT_CHANNELS_PROVIDER_FACTORY;

/**
* For each additional field added, make sure to update the copy constructor builder
Expand Down Expand Up @@ -744,7 +705,6 @@ public Builder(final ParquetInstructions parquetInstructions) {
fileLayout = readOnlyParquetInstructions.getFileLayout().orElse(null);
tableDefinition = readOnlyParquetInstructions.getTableDefinition().orElse(null);
indexColumns = readOnlyParquetInstructions.getIndexColumns().orElse(null);
channelsProviderFactory = readOnlyParquetInstructions.getChannelsProviderFactory();
}

private void newColumnNameToInstructionsMap() {
Expand Down Expand Up @@ -1012,12 +972,6 @@ public Builder addAllIndexColumns(final Iterable<List<String>> indexColumns) {
return this;
}

public Builder setChannelsProviderFactory(
@NotNull final SeekableChannelsProviderFactory channelsProviderFactory) {
this.channelsProviderFactory = channelsProviderFactory;
return this;
}

public ParquetInstructions build() {
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructionsOut = columnNameToInstructions;
columnNameToInstructions = null;
Expand All @@ -1027,7 +981,7 @@ public ParquetInstructions build() {
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData, fileLayout,
tableDefinition, indexColumns, channelsProviderFactory);
tableDefinition, indexColumns);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private static class ParquetFileMetadata {
}
}
this.parquetFileMetadataList = new ArrayList<>(destinations.length);
this.channelsProvider = SeekableChannelsProviderLoader.getInstance().createProvider(
this.channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
convertToURI(metadataRootDirAbsPathString, true), null);
this.partitioningColumnsSchema = partitioningColumnsSchema;

Expand Down
Loading

0 comments on commit dffaa26

Please sign in to comment.