Skip to content

Commit

Permalink
Make helper cleanup method static and public
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 8, 2022
1 parent 84f0b69 commit c6fd4e4
Showing 1 changed file with 54 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Part
}
verify(partitionAndMore.hasFileNames(), "fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true");

cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames()));
SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames()));
}

private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, TableAndMore tableAndMore)
Expand All @@ -1956,59 +1956,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Tabl
}
Path tableLocation = tableAndMore.getCurrentLocation().orElseThrow(() -> new IllegalArgumentException("currentLocation expected to be set if isCleanExtraOutputFilesOnCommit is true"));
List<String> files = tableAndMore.getFileNames().orElseThrow(() -> new IllegalArgumentException("fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true"));
cleanExtraOutputFiles(hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files));
}

private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Path path, Set<String> filesToKeep)
{
List<String> filesToDelete = new LinkedList<>();
try {
log.debug("Deleting failed attempt files from %s for query %s", path, queryId);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
if (!fileSystem.exists(path)) {
// directory may nat exit if no files were actually written
return;
}

// files are written flat in a single directory so we do not need to list recursively
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(path, false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) {
filesToDelete.add(file.getName());
}
}

ImmutableList.Builder<String> deletedFilesBuilder = ImmutableList.builder();
Iterator<String> filesToDeleteIterator = filesToDelete.iterator();
while (filesToDeleteIterator.hasNext()) {
String fileName = filesToDeleteIterator.next();
log.debug("Deleting failed attempt file %s/%s for query %s", path, fileName, queryId);
fileSystem.delete(new Path(path, fileName), false);
deletedFilesBuilder.add(fileName);
filesToDeleteIterator.remove();
}

List<String> deletedFiles = deletedFilesBuilder.build();
if (!deletedFiles.isEmpty()) {
log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId);
}
}
catch (IOException e) {
// If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query.
// Yet if we have problem here, probably rollback will also fail.
//
// Thrown exception is listing files which we could not delete. So those can be cleaned up later by user manually.
// Note it is not a bullet-proof solution.
// The rollback routine will still fire and try to cleanup the changes query made. It will cleanup some, leave some behind probably.
// It is not obvious that if at this point user cleans up the failed attempt files the table would be in the expected state.
//
// Still we cannot do much better for non-transactional Hive tables.
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error deleting failed retry attempt files from %s; remaining files %s; manual cleanup may be required", path, filesToDelete),
e);
}
SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files));
}

private PartitionStatistics getExistingPartitionStatistics(Partition partition, String partitionName)
Expand Down Expand Up @@ -3687,4 +3635,56 @@ public void commitTransaction(long transactionId)
{
delegate.commitTransaction(transactionId);
}

public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String queryId, Path path, Set<String> filesToKeep)
{
List<String> filesToDelete = new LinkedList<>();
try {
log.debug("Deleting failed attempt files from %s for query %s", path, queryId);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
if (!fileSystem.exists(path)) {
// directory may nat exit if no files were actually written
return;
}

// files are written flat in a single directory so we do not need to list recursively
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(path, false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) {
filesToDelete.add(file.getName());
}
}

ImmutableList.Builder<String> deletedFilesBuilder = ImmutableList.builder();
Iterator<String> filesToDeleteIterator = filesToDelete.iterator();
while (filesToDeleteIterator.hasNext()) {
String fileName = filesToDeleteIterator.next();
log.debug("Deleting failed attempt file %s/%s for query %s", path, fileName, queryId);
fileSystem.delete(new Path(path, fileName), false);
deletedFilesBuilder.add(fileName);
filesToDeleteIterator.remove();
}

List<String> deletedFiles = deletedFilesBuilder.build();
if (!deletedFiles.isEmpty()) {
log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId);
}
}
catch (IOException e) {
// If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query.
// Yet if we have problem here, probably rollback will also fail.
//
// Thrown exception is listing files which we could not delete. So those can be cleaned up later by user manually.
// Note it is not a bullet-proof solution.
// The rollback routine will still fire and try to cleanup the changes query made. It will cleanup some, leave some behind probably.
// It is not obvious that if at this point user cleans up the failed attempt files the table would be in the expected state.
//
// Still we cannot do much better for non-transactional Hive tables.
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error deleting failed retry attempt files from %s; remaining files %s; manual cleanup may be required", path, filesToDelete),
e);
}
}
}

0 comments on commit c6fd4e4

Please sign in to comment.