Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for reading Parquet V2 pages #5508

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.util.function.Function;

public interface ColumnChunkReader {
/**
* @return The name of the column this ColumnChunk represents.
*/
String columnName();

/**
* @return The URI of the file this column chunk reader is reading from.
*/
URI getURI();

/**
* @return The number of rows in this ColumnChunk, or -1 if it's unknown.
*/
Expand Down Expand Up @@ -117,4 +128,5 @@ public int getMaxId() {
* @return The channel provider for this column chunk reader.
*/
SeekableChannelsProvider getChannelsProvider();

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@

final class ColumnChunkReaderImpl implements ColumnChunkReader {

private final String columnName;
private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private OffsetIndexReader offsetIndexReader;
private final OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory nullMaterializerFactory;
Expand All @@ -58,8 +59,16 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
*/
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI rootURI,
MessageType type, List<Type> fieldTypes, final long numRows, final String version) {
ColumnChunkReaderImpl(
final String columnName,
final ColumnChunk columnChunk,
final SeekableChannelsProvider channelsProvider,
final URI rootURI,
final MessageType type,
final List<Type> fieldTypes,
final long numRows,
final String version) {
this.columnName = columnName;
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.path = type
Expand Down Expand Up @@ -88,6 +97,11 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
: OffsetIndexReader.NULL;
}

@Override
public String columnName() {
return columnName;
}

@Override
public long numRows() {
return numRows;
Expand Down Expand Up @@ -128,7 +142,8 @@ public ColumnPageDirectAccessor getPageAccessor(final OffsetIndex offsetIndex) {
return new ColumnPageDirectAccessorImpl(offsetIndex);
}

private URI getURI() {
@Override
public URI getURI() {
return columnChunkURI;
}

Expand Down Expand Up @@ -244,7 +259,7 @@ public boolean hasNext() {
@Override
public ColumnPageReader next(@NotNull final SeekableChannelContext channelContext) {
if (!hasNext()) {
throw new NoSuchElementException("No next element");
throw new NoSuchElementException("No next element in column: " + columnName + ", uri: " + getURI());
}
// NB: The channels provider typically caches channels; this avoids maintaining a handle per column chunk
final long headerOffset = nextHeaderOffset;
Expand All @@ -271,11 +286,11 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex
remainingValues -= numValuesInPage;
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
} catch (IOException e) {
throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " +
"file " + getURI(), e);
"column: " + columnName + ", uri: " + getURI(), e);
}
}
}
Expand All @@ -287,15 +302,15 @@ private Function<SeekableChannelContext, Dictionary> getPageDictionarySupplier(f
: (SeekableChannelContext context) -> NULL_DICTIONARY;
}

private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHeader) {
private org.apache.parquet.format.Encoding getEncoding(final PageHeader pageHeader) {
switch (pageHeader.type) {
case DATA_PAGE:
return pageHeader.getData_page_header().getEncoding();
case DATA_PAGE_V2:
return pageHeader.getData_page_header_v2().getEncoding();
default:
throw new UncheckedDeephavenException(
"Unknown parquet data page header type " + pageHeader.type);
throw new UncheckedDeephavenException("Unknown parquet data page header type " + pageHeader.type +
" for column: " + columnName + ", uri: " + getURI());
}
}

Expand Down Expand Up @@ -323,7 +338,8 @@ private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAcce
public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) {
if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) {
throw new IndexOutOfBoundsException(
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount() +
" for column: " + columnName + ", uri: " + getURI());
}

// Read the page header to determine whether we need to use dictionary for this page
Expand All @@ -341,12 +357,12 @@ public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelCo
}
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
return new ColumnPageReaderImpl(columnName, channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
getNumValues(pageHeader));
} catch (final IOException e) {
throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum +
" at offset " + headerOffset + " for file " + getURI(), e);
" at offset " + headerOffset + " for column: " + columnName + ", uri: " + getURI(), e);
}
}
}
Expand Down
Loading
Loading