Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to write metadata files in parquet #5105

Merged
merged 50 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8ae4ef
Copied methods from inside parquet hadoop to write metadata files
malhotrashivam Jan 30, 2024
0ef7f79
Calling methods from inside ParquetHadoop for writing metadata files
malhotrashivam Jan 31, 2024
44d031f
Cleaned up the code and added a lot of TODOs for review
malhotrashivam Feb 13, 2024
6394bac
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 13, 2024
6c6b075
Some more changes
malhotrashivam Feb 13, 2024
9263aa0
WIP commit
malhotrashivam Feb 14, 2024
8c887c4
Added a custom metadata file writer
malhotrashivam Feb 16, 2024
05400c5
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 16, 2024
b696649
Minor fix
malhotrashivam Feb 16, 2024
797b3dc
Fixed failing test
malhotrashivam Feb 17, 2024
d987a06
Moved some code around
malhotrashivam Feb 20, 2024
d4da175
Minor change
malhotrashivam Feb 20, 2024
f33d712
Review comments
malhotrashivam Feb 22, 2024
7d66445
Merge branch 'main' into sm-pq-metadata
malhotrashivam Feb 26, 2024
238f5f5
Read offset index from column chunk on demand
malhotrashivam Feb 26, 2024
7a3652d
Fixed failing test
malhotrashivam Feb 26, 2024
1f41eef
Added support for partitioned parquet writing
malhotrashivam Feb 27, 2024
c8aa764
Added some more tests
malhotrashivam Feb 27, 2024
ff99a36
Added some more tests
malhotrashivam Feb 27, 2024
3e48937
Added a new API for writing a partitioned table directly
malhotrashivam Feb 28, 2024
e584343
Improved the tests
malhotrashivam Feb 29, 2024
f659f3c
Review with Ryan part 1
malhotrashivam Mar 4, 2024
3651e5f
Added more tests
malhotrashivam Mar 4, 2024
51eefe1
Iterating using chunked iterators
malhotrashivam Mar 4, 2024
60ee1f5
Removed some unnecessary includes
malhotrashivam Mar 4, 2024
5c7353f
Added support for {index} and {partition} in file basename
malhotrashivam Mar 4, 2024
c174452
Review comments
malhotrashivam Mar 5, 2024
ab73df0
Minor touchups
malhotrashivam Mar 5, 2024
1cb8b81
Added fix and tests for big decimals
malhotrashivam Mar 6, 2024
20e8204
Updated a comment
malhotrashivam Mar 6, 2024
9892d14
Review with Ryan part 1
malhotrashivam Mar 7, 2024
1d98927
Review with Ryan part 2
malhotrashivam Mar 11, 2024
5018feb
Minor touchups
malhotrashivam Mar 11, 2024
a529fc2
Fixed failing tests
malhotrashivam Mar 15, 2024
48906dd
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 15, 2024
819a1b9
Added python APIs and improved comments
malhotrashivam Mar 16, 2024
10b7e0d
Added more fixes for python
malhotrashivam Mar 19, 2024
9f7c55e
Review with Ryan and Chip
malhotrashivam Mar 21, 2024
b62abb7
Merge branch 'main' into sm-pq-metadata
malhotrashivam Mar 21, 2024
0d7c62e
Review with Chip part 2
malhotrashivam Mar 22, 2024
5a3de8e
Review with Chip and Jianfeng Part 3
malhotrashivam Mar 22, 2024
f68551f
Review with Chip and Jianfeng continued
malhotrashivam Mar 22, 2024
b11ebd1
Added new APIs for managing indexes
malhotrashivam Mar 25, 2024
3bdc92b
Trigger CI jobs
malhotrashivam Mar 25, 2024
3dd6097
Review with Ryan
malhotrashivam Mar 26, 2024
d8bc0a5
Added python support for writing indexes
malhotrashivam Mar 27, 2024
f700883
Reordered comments
malhotrashivam Mar 27, 2024
6246289
Added more details to python comments
malhotrashivam Mar 27, 2024
fa2b0c5
Moved from list to sequence
malhotrashivam Mar 27, 2024
7909ea0
Added fixes and tests for windows
malhotrashivam Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BenchmarkSupport/BenchmarkSupport.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(':Base')
implementation project(':engine-table')
implementation project(':extensions-parquet-table')
implementation project(':extensions-parquet-base')
implementation project(':Configuration')
implementation 'org.openjdk.jmh:jmh-core:1.20'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.engine.table.impl.util.TableBuilder;
import io.deephaven.benchmarking.BenchmarkTools;
import org.openjdk.jmh.infra.BenchmarkParams;

import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -44,7 +45,7 @@ public void init() {

public void logOutput() throws IOException {
final Path outputPath = BenchmarkTools.dataDir()
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + ParquetTableWriter.PARQUET_FILE_EXTENSION);
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + PARQUET_FILE_EXTENSION);

final Table output = outputBuilder.build();
ParquetTools.writeTable(output, outputPath.toFile(), RESULT_DEF);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

public final class Channels {

Expand Down
1 change: 1 addition & 0 deletions engine/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ dependencies {
testImplementation project(':BenchmarkSupport')
testImplementation project(':extensions-csv')
testImplementation project(':extensions-parquet-table')
testImplementation project(':extensions-parquet-base')
testImplementation project(':extensions-source-support')
testImplementation project(':Numerics')
testImplementation project(':extensions-suanshu')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.locations.util;

import io.deephaven.time.DateTimeUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

/**
* This class takes a partition value object and formats it to a {@link String}. Useful when generating partitioning
* paths for table. Complementary to {@link PartitionParser}.
*/
public enum PartitionFormatter {
ForString {
@Override
public String formatObject(@NotNull final Object value) {
return (String) value;
}
},
ForBoolean {
@Override
public String formatObject(@NotNull final Object value) {
return ((Boolean) value).toString();
}
},
ForChar {
@Override
public String formatObject(@NotNull final Object value) {
return ((Character) value).toString();
}
},
ForByte {
@Override
public String formatObject(@NotNull final Object value) {
return ((Byte) value).toString();
}
},
ForShort {
@Override
public String formatObject(@NotNull final Object value) {
return ((Short) value).toString();
}
},
ForInt {
@Override
public String formatObject(@NotNull final Object value) {
return ((Integer) value).toString();
}
},
ForLong {
@Override
public String formatObject(@NotNull final Object value) {
return ((Long) value).toString();
}
},
ForFloat {
@Override
public String formatObject(@NotNull final Object value) {
return ((Float) value).toString();
}
},
ForDouble {
@Override
public String formatObject(@NotNull final Object value) {
return ((Double) value).toString();
}
},
ForBigInteger {
@Override
public String formatObject(@NotNull final Object value) {
return ((BigInteger) value).toString();
}
},
ForBigDecimal {
@Override
public String formatObject(@NotNull final Object value) {
return ((BigDecimal) value).toString();
}
},
ForInstant {
@Override
public String formatObject(@NotNull final Object value) {
return ((Instant) value).toString();
}
},
ForLocalDate {
@Override
public String formatObject(@NotNull final Object value) {
return DateTimeUtils.formatDate((LocalDate) value);
}
},
ForLocalTime {
@Override
public String formatObject(@NotNull final Object value) {
return ((LocalTime) value).toString();
}
};

private static final Map<Class<?>, PartitionFormatter> typeMap = new HashMap<>();
static {
typeMap.put(String.class, ForString);
typeMap.put(Boolean.class, ForBoolean);
typeMap.put(boolean.class, ForBoolean);
typeMap.put(Character.class, ForChar);
typeMap.put(char.class, ForChar);
typeMap.put(Byte.class, ForByte);
typeMap.put(byte.class, ForByte);
typeMap.put(Short.class, ForShort);
typeMap.put(short.class, ForShort);
typeMap.put(Integer.class, ForInt);
typeMap.put(int.class, ForInt);
typeMap.put(Long.class, ForLong);
typeMap.put(long.class, ForLong);
typeMap.put(Float.class, ForFloat);
typeMap.put(float.class, ForFloat);
typeMap.put(Double.class, ForDouble);
typeMap.put(double.class, ForDouble);
typeMap.put(BigInteger.class, ForBigInteger);
typeMap.put(BigDecimal.class, ForBigDecimal);
typeMap.put(Instant.class, ForInstant);
typeMap.put(LocalDate.class, ForLocalDate);
typeMap.put(LocalTime.class, ForLocalTime);
}

abstract String formatObject(@NotNull final Object value);

/**
* Takes a partition value object and returns a formatted string. Returns an empty string if the object is null.
*/
public String format(@Nullable final Object value) {
if (value == null) {
return "";
}
return formatObject(value);
}

/**
* Takes a partitioning column type and returns the corresponding formatter.
*/
public static PartitionFormatter getFormatterForType(@NotNull final Class<?> clazz) {
final PartitionFormatter formatter = typeMap.get(clazz);
if (formatter != null) {
return formatter;
} else {
throw new UnsupportedOperationException("Unsupported type: " + clazz.getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout;
import io.deephaven.test.types.OutOfBandTest;
Expand Down Expand Up @@ -75,6 +74,7 @@
import static io.deephaven.util.QueryConstants.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.*;
import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION;

@Category(OutOfBandTest.class)
public class QueryTableAggregationTest {
Expand Down Expand Up @@ -3951,7 +3951,7 @@ private Table makeDiskTable(File directory) throws IOException {
final TableDefaults result = testTable(stringCol("Symbol", syms),
intCol("Value", values));

final File outputFile = new File(directory, "disk_table" + ParquetTableWriter.PARQUET_FILE_EXTENSION);
final File outputFile = new File(directory, "disk_table" + PARQUET_FILE_EXTENSION);

ParquetTools.writeTable(result, outputFile, result.getDefinition());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ public interface ColumnChunkReader {
int getMaxRl();

/**
* @return The offset index for this column chunk, or null if it not found in the metadata.
* @return Whether the column chunk has offset index information set in the metadata or not.
*/
@Nullable
OffsetIndex getOffsetIndex();
boolean hasOffsetIndex();

/**
* @param context The channel context to use for reading the offset index.
* @return Get the offset index for a column chunk.
* @throws UnsupportedOperationException If the column chunk does not have an offset index.
*/
OffsetIndex getOffsetIndex(final SeekableChannelContext context);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

/**
* Used to iterate over column page readers for each page with the capability to set channel context to for reading
Expand Down Expand Up @@ -69,9 +75,9 @@ interface ColumnPageDirectAccessor {
}

/**
* @return An accessor for individual parquet pages.
* @return An accessor for individual parquet pages which uses the provided offset index.
*/
ColumnPageDirectAccessor getPageAccessor();
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex);

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.parquet.compress.DeephavenCompressorAdapterFactory;
import io.deephaven.util.channel.SeekableChannelContext.ContextHolder;
import io.deephaven.util.datastructures.LazyCachingFunction;
import org.apache.commons.io.FilenameUtils;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand Down Expand Up @@ -41,18 +42,13 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {

private final ColumnChunk columnChunk;
private final SeekableChannelsProvider channelsProvider;
/**
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final CompressorAdapter decompressor;
private final ColumnDescriptor path;
private final OffsetIndex offsetIndex;
private OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory nullMaterializerFactory;

private URI uri;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
*/
Expand All @@ -63,11 +59,9 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final String version;

ColumnChunkReaderImpl(ColumnChunk columnChunk, SeekableChannelsProvider channelsProvider, URI rootURI,
MessageType type, OffsetIndex offsetIndex, List<Type> fieldTypes, final long numRows,
final String version) {
MessageType type, List<Type> fieldTypes, final long numRows, final String version) {
this.channelsProvider = channelsProvider;
this.columnChunk = columnChunk;
this.rootURI = rootURI;
this.path = type
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
if (columnChunk.getMeta_data().isSetCodec()) {
Expand All @@ -76,12 +70,22 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
} else {
decompressor = CompressorAdapter.PASSTHRU;
}
this.offsetIndex = offsetIndex;
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new LazyCachingFunction<>(this::getDictionary);
this.nullMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType().getPrimitiveTypeName());
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
final String relativePath = FilenameUtils.separatorsToSystem(columnChunk.getFile_path());
this.columnChunkURI = convertToURI(Path.of(rootURI).resolve(relativePath), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
this.columnChunkURI = rootURI;
}
// Construct the reader object but don't read the offset index yet
this.offsetIndexReader = (columnChunk.isSetOffset_index_offset())
? new OffsetIndexReaderImpl(channelsProvider, columnChunk, columnChunkURI)
: OffsetIndexReader.NULL;
}

@Override
Expand All @@ -99,8 +103,16 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public OffsetIndex getOffsetIndex() {
return offsetIndex;
@Override
public boolean hasOffsetIndex() {
return columnChunk.isSetOffset_index_offset();
}

@Override
public OffsetIndex getOffsetIndex(final SeekableChannelContext context) {
// Reads and caches the offset index if it hasn't been read yet. Throws an exception if the offset index cannot
// be read from this source
return offsetIndexReader.getOffsetIndex(context);
}

@Override
Expand All @@ -109,23 +121,15 @@ public ColumnPageReaderIterator getPageIterator() {
}

@Override
public final ColumnPageDirectAccessor getPageAccessor() {
public ColumnPageDirectAccessor getPageAccessor(final OffsetIndex offsetIndex) {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl();
return new ColumnPageDirectAccessorImpl(offsetIndex);
}

private URI getURI() {
if (uri != null) {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
}
return columnChunkURI;
}

@Override
Expand Down Expand Up @@ -308,7 +312,11 @@ private static int getNumValues(PageHeader pageHeader) {

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}
private final OffsetIndex offsetIndex;

ColumnPageDirectAccessorImpl(final OffsetIndex offsetIndex) {
this.offsetIndex = offsetIndex;
}

@Override
public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) {
Expand Down
Loading
Loading