Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify layout of grouping files #4525

Merged
merged 9 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,14 @@ public void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
public FileVisitResult preVisitDirectory(
@NotNull final Path dir,
@NotNull final BasicFileAttributes attrs) {
final String dirName = dir.getFileName().toString();
// Skip dot directories
if (!dirName.isEmpty() && dirName.charAt(0) == '.') {
return FileVisitResult.SKIP_SUBTREE;
}
if (++columnCount > 0) {
// We're descending and past the root
final String[] components = dir.getFileName().toString().split("=", 2);
final String[] components = dirName.split("=", 2);
if (components.length != 2) {
throw new TableDataException(
"Unexpected directory name format (not key=value) at " + dir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.function.Function;

import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed;
Expand Down Expand Up @@ -237,9 +236,34 @@ private static String minusParquetSuffix(@NotNull final String s) {
return s;
}

public static Function<String, String> defaultGroupingFileName(@NotNull final String path) {
final String prefix = minusParquetSuffix(path);
return columnName -> prefix + "_" + columnName + "_grouping.parquet";
/**
* Generates the index file path relative to the table destination file path.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*
* @param tableDest Destination path for the main table containing these indexing columns
* @param columnName Name of the indexing column
*
* @return The relative index file path. For example, for table with destination {@code "table.parquet"} and
* indexing column {@code "IndexingColName"}, the method will return
* {@code ".dh_metadata/indexes/IndexingColName/index_IndexingColName_table.parquet"}
*/
public static String getRelativeIndexFilePath(@NotNull final File tableDest, @NotNull final String columnName) {
return String.format(".dh_metadata/indexes/%s/index_%s_%s", columnName, columnName, tableDest.getName());
}

/**
* Legacy method for generating a grouping file name. We used to place grouping files right next to the original
* table destination.
*
* @param tableDest Destination path for the main table containing these grouping columns
* @param columnName Name of the grouping column
*
* @return The relative grouping file path. For example, for table with destination {@code "table.parquet"} and
* grouping column {@code "GroupingColName"}, the method will return
* {@code "table_GroupingColName_grouping.parquet"}
*/
public static String legacyGroupingFileName(@NotNull final File tableDest, @NotNull final String columnName) {
final String prefix = minusParquetSuffix(tableDest.getName());
return prefix + "_" + columnName + "_grouping.parquet";
}

/**
Expand Down Expand Up @@ -285,7 +309,7 @@ private static void installShadowFile(@NotNull final File destFile, @NotNull fin
}

/**
* Roll back any changes made in the {@link #installShadowFile} in a best effort manner
* Roll back any changes made in the {@link #installShadowFile} in best-effort manner
*/
private static void rollbackFile(@NotNull final File destFile) {
final File backupDestFile = getBackupFile(destFile);
Expand All @@ -297,7 +321,7 @@ private static void rollbackFile(@NotNull final File destFile) {
/**
* Make any missing ancestor directories of {@code destination}.
*
* @param destination The destination file
* @param destination The destination parquet file
* @return The first created directory, or null if no directories were made.
*/
private static File prepareDestinationFileLocation(@NotNull File destination) {
Expand Down Expand Up @@ -362,12 +386,13 @@ private static Map<String, ParquetTableWriter.GroupingColumnWritingInfo> groupin
for (int gci = 0; gci < groupingColumnNames.length; gci++) {
final String groupingColumnName = groupingColumnNames[gci];
final String parquetColumnName = parquetColumnNames[gci];
final String groupingFilePath = defaultGroupingFileName(destFile.getPath()).apply(parquetColumnName);
final File groupingFile = new File(groupingFilePath);
deleteBackupFile(groupingFile);
final File shadowGroupingFile = getShadowFile(groupingFile);
final String indexFileRelativePath = getRelativeIndexFilePath(destFile, parquetColumnName);
final File indexFile = new File(destFile.getParent(), indexFileRelativePath);
prepareDestinationFileLocation(indexFile);
deleteBackupFile(indexFile);
final File shadowIndexFile = getShadowFile(indexFile);
gcwim.put(groupingColumnName, new ParquetTableWriter.GroupingColumnWritingInfo(parquetColumnName,
groupingFile, shadowGroupingFile));
indexFile, shadowIndexFile));
}
return gcwim;
}
Expand Down Expand Up @@ -397,15 +422,12 @@ public static void writeParquetTables(@NotNull final Table[] sources,
}
Arrays.stream(destinations).forEach(ParquetTools::deleteBackupFile);

// Write tables at temporary shadow file paths in the same directory to prevent overwriting any existing files
// Write tables and index files at temporary shadow file paths in the same directory to prevent overwriting
// any existing files
final File[] shadowDestFiles =
Arrays.stream(destinations)
.map(ParquetTools::getShadowFile)
.toArray(File[]::new);
Arrays.stream(destinations).map(ParquetTools::getShadowFile).toArray(File[]::new);
final File[] firstCreatedDirs =
Arrays.stream(shadowDestFiles)
.map(ParquetTools::prepareDestinationFileLocation)
.toArray(File[]::new);
Arrays.stream(shadowDestFiles).map(ParquetTools::prepareDestinationFileLocation).toArray(File[]::new);

// List of shadow files, to clean up in case of exceptions
final List<File> shadowFiles = new ArrayList<>();
Expand All @@ -426,7 +448,7 @@ public static void writeParquetTables(@NotNull final Table[] sources,
// Create grouping info for each table and write the table and grouping files to shadow path
groupingColumnWritingInfoMaps = new ArrayList<>(sources.length);

// Shared parquet column names across all tables
// Same parquet column names across all tables
final String[] parquetColumnNames = Arrays.stream(groupingColumns)
.map(writeInstructions::getParquetColumnNameFromColumnNameOrDefault)
.toArray(String[]::new);
Expand Down Expand Up @@ -455,10 +477,10 @@ public static void writeParquetTables(@NotNull final Table[] sources,
final Map<String, ParquetTableWriter.GroupingColumnWritingInfo> gcwim =
groupingColumnWritingInfoMaps.get(tableIdx);
for (final ParquetTableWriter.GroupingColumnWritingInfo gfwi : gcwim.values()) {
final File groupingDestFile = gfwi.metadataFilePath;
final File shadowGroupingFile = gfwi.destFile;
destFiles.add(groupingDestFile);
installShadowFile(groupingDestFile, shadowGroupingFile);
final File indexDestFile = gfwi.metadataFilePath;
final File shadowIndexFile = gfwi.destFile;
destFiles.add(indexDestFile);
installShadowFile(indexDestFile, shadowIndexFile);
}
}
}
Expand Down Expand Up @@ -570,7 +592,12 @@ private static Table readTableInternal(
return readPartitionedTableWithMetadata(source, instructions);
}
final Path firstEntryPath;
try (final DirectoryStream<Path> sourceStream = Files.newDirectoryStream(sourcePath)) {
// Ignore dot files while looking for the first entry
try (final DirectoryStream<Path> sourceStream =
Files.newDirectoryStream(sourcePath, (path) -> {
final String filename = path.getFileName().toString();
return !filename.isEmpty() && filename.charAt(0) != '.';
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
})) {
final Iterator<Path> entryIterator = sourceStream.iterator();
if (!entryIterator.hasNext()) {
throw new TableDataException("Source directory " + source + " is empty");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.deephaven.parquet.table.layout;

import java.nio.file.Path;
import io.deephaven.parquet.table.ParquetTableWriter;

import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import java.nio.file.Path;

final class ParquetFileHelper {

public static boolean fileNameMatches(Path path) {
return path.getFileName().toString().endsWith(PARQUET_FILE_EXTENSION);
/**
* Used as a filter to select relevant parquet files while reading all files in a directory.
*/
static boolean fileNameMatches(final Path path) {
final String fileName = path.getFileName().toString();
return fileName.endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
Expand Down Expand Up @@ -146,33 +147,55 @@ private ParquetTableLocation tl() {
private static final ColumnDefinition<Long> LAST_KEY_COL_DEF =
ColumnDefinition.ofLong("__lastKey__");

/**
* Helper method for logging a warning on failure in reading an index file
*/
private void logWarnFailedToRead(final String indexFilePath) {
log.warn().append("Failed to read expected index file ").append(indexFilePath)
.append(" for table location ").append(tl()).append(", column ")
.append(getName())
.endl();
}

@Override
@Nullable
public <METADATA_TYPE> METADATA_TYPE getMetadata(
@NotNull final ColumnDefinition<?> columnDefinition) {
public <METADATA_TYPE> METADATA_TYPE getMetadata(@NotNull final ColumnDefinition<?> columnDefinition) {
if (!hasGroupingTable) {
return null;
}

final Function<String, String> defaultGroupingFilenameByColumnName =
ParquetTools.defaultGroupingFileName(tl().getParquetFile().getAbsolutePath());
final File parquetFile = tl().getParquetFile();
try {
final GroupingColumnInfo groupingColumnInfo =
tl().getGroupingColumns().get(parquetColumnName);
final ParquetFileReader parquetFileReader;
final String groupingFileName = groupingColumnInfo == null
? defaultGroupingFilenameByColumnName.apply(parquetColumnName)
: tl().getParquetFile().toPath().getParent()
.resolve(groupingColumnInfo.groupingTablePath()).toString();
try {
parquetFileReader =
new ParquetFileReader(groupingFileName, tl().getChannelProvider());
} catch (Exception e) {
log.warn().append("Failed to read expected grouping file ").append(groupingFileName)
.append(" for table location ").append(tl()).append(", column ")
.append(getName())
.endl();
return null;
ParquetFileReader parquetFileReader;
final String indexFilePath;
final GroupingColumnInfo groupingColumnInfo = tl().getGroupingColumns().get(parquetColumnName);
if (groupingColumnInfo != null) {
final String indexFileRelativePath = groupingColumnInfo.groupingTablePath();
indexFilePath = parquetFile.toPath().getParent().resolve(indexFileRelativePath).toString();
try {
parquetFileReader = new ParquetFileReader(indexFilePath, tl().getChannelProvider());
} catch (final RuntimeException e) {
logWarnFailedToRead(indexFilePath);
return null;
}
} else {
final String relativeIndexFilePath =
ParquetTools.getRelativeIndexFilePath(parquetFile, parquetColumnName);
indexFilePath = parquetFile.toPath().getParent().resolve(relativeIndexFilePath).toString();
try {
parquetFileReader = new ParquetFileReader(indexFilePath, tl().getChannelProvider());
} catch (final RuntimeException e1) {
// Retry with legacy grouping file path
final String legacyGroupingFileName =
ParquetTools.legacyGroupingFileName(parquetFile, parquetColumnName);
final File legacyGroupingFile = new File(parquetFile.getParent(), legacyGroupingFileName);
try {
parquetFileReader =
new ParquetFileReader(legacyGroupingFile.getAbsolutePath(), tl().getChannelProvider());
} catch (final RuntimeException e2) {
logWarnFailedToRead(indexFilePath);
return null;
}
}
}
final Map<String, ColumnTypeInfo> columnTypes = ParquetSchemaReader.parseMetadata(
new ParquetMetadataConverter().fromParquetMetadata(parquetFileReader.fileMetaData)
Expand All @@ -187,7 +210,7 @@ public <METADATA_TYPE> METADATA_TYPE getMetadata(
final ColumnChunkReader endPosReader =
rowGroupReader.getColumnChunk(Collections.singletonList(END_POS));
if (groupingKeyReader == null || beginPosReader == null || endPosReader == null) {
log.warn().append("Grouping file ").append(groupingFileName)
log.warn().append("Index file ").append(indexFilePath)
.append(" is missing one or more expected columns for table location ")
.append(tl()).append(", column ").append(getName());
return null;
Expand Down
Loading
Loading