-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async Deletion of Previous Metadata and Statistics Files #312
base: main
Are you sure you want to change the base?
Conversation
tableMetadata.snapshots().stream() | ||
.map(Snapshot::manifestListLocation) | ||
.forEach(fileIO::deleteFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we've already handled the manifest files and manifest list files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we've already handled the manifest files and manifest list files.
Thank you for bringing that up! I’ve just noticed that these are already handled in the ManifestFileCleanupTaskHandler. Have removed theses redundant deletion in the latest commit. Appreciate your input.
tableMetadata.previousFiles().stream() | ||
.map(TableMetadata.MetadataLogEntry::file) | ||
.forEach(fileIO::deleteFile); | ||
tableMetadata.statisticsFiles().stream() | ||
.map(StatisticsFile::path) | ||
.forEach(fileIO::deleteFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should either add retries for these or submit them as separate tasks. As is, if one of these files fails to delete, we'll retry the whole task and resubmit a new task for each manifest. If the manifests are already deleted when we retry, we'll get stuck in a retry loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, Michael! I could refactor the deletion process by adding retry strategy, similar to the tryDelete
in ManifestFileCleanupTaskHandler
.
Personally, I think using a retry mechanism here might be more effective than creating separate tasks. With separate tasks,, there’s still a risk of issues like task creation failures, which could result in skipping the entire task (which contains multiple files). By using retries within the ManifestFileCleanupTaskHandler, we can manage failure handling at the file level, ensuring that each file is retried independently. This way, if a file deletion fails, we can retry just that file without needing to resubmit or skip the others files. This approach can offers more granular control over handling failures.
I’m open to your thoughts on this! Does this seem aligned with what you were suggesting, or do you see potential advantages in the separate task approach that I might be overlooking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, do you mean adding retry within the TableCleanupTaskHandler.java
? If so, that's fine with me. I'd at least use an Executor to attempt the execution in parallel (i.e., so file2 isn't blocked during the delay between retries for file1).
TBH, I'm not very familiar with how Iceberg generates the statistic files. Does there tend to be one per snapshot? one per data file? If the latter, we could be talking about a very large number of files. If that's the case, I think submitting separate tasks with batches of files makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are per snapshots, https://iceberg.apache.org/spec/#table-statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there only one statistics file per snapshot? The spec is not clear:
A table can contain many statistics files associated with different table snapshots.
Unlike the partition statistics file, which is very clear:
Each table snapshot may be associated with at most one partition statistics file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be multiple statistics files as the following code shows:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are per snapshots, https://iceberg.apache.org/spec/#table-statistics.
Could be multiple statistics files as the following code shows:
Thank you for providing the context, @flyrain :)
Sorry, do you mean adding retry within the
TableCleanupTaskHandler.java
? If so, that's fine with me. I'd at least use an Executor to attempt the execution in parallel (i.e., so file2 isn't blocked during the delay between retries for file1).TBH, I'm not very familiar with how Iceberg generates the statistic files. Does there tend to be one per snapshot? one per data file? If the latter, we could be talking about a very large number of files. If that's the case, I think submitting separate tasks with batches of files makes sense.
@collado-mike Yes, if we go for retries, the logic will be within TableCleanupTaskHandler.java
. But considering the information provided by Yufei, maybe separate tasks will be a more appropriate approach for stats file deletion?
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e); | ||
return false; | ||
} | ||
private boolean cleanUpManifestFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the lots of changes here, but don’t worry—it’s mainly because I refactored the deletion logic for the manifest and all its data into a new method; no other changes were made in lines 91-135.
* files in a manifest and the manifest itself. Since data files may be present in multiple | ||
* manifests across different snapshots, we assume a data file that doesn't exist is missing because | ||
* it was already deleted by another task. 2. Table content files: It contains previous metadata and | ||
* statistics files, which are grouped and deleted in batch | ||
*/ | ||
public class ManifestFileCleanupTaskHandler implements TaskHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming this task will triger lots of relevent changes. If a rename is needed, we may want to handle it in a separate PR to avoid too much changes (Leave a TODO here)
@@ -226,4 +233,228 @@ public void deleteFile(String location) { | |||
assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test cases in this file primarily verify theses cleanup:
- straightforward cleanup
- cleanup when files do not exist
- cleanup with retries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test cases in this file primarily verify theses scenarios:
- Cleanup of a single metadata file with a single snapshot
- Cleanup of a single metadata file across multiple snapshots
- Cleanup of multiple metadata files across multiple snapshots (New)
@@ -76,7 +79,13 @@ public void testTableCleanup() throws IOException { | |||
TestSnapshot snapshot = | |||
TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); | |||
String metadataFile = "v1-49494949.metadata.json"; | |||
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Cleanup of a single metadata file with a single snapshot
@@ -303,7 +323,20 @@ public void testTableCleanupMultipleSnapshots() throws IOException { | |||
manifestFile1, | |||
manifestFile3); // exclude manifest2 from the new snapshot | |||
String metadataFile = "v1-295495059.metadata.json"; | |||
TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot, snapshot2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Cleanup of a single metadata file across multiple snapshots
} | ||
} | ||
|
||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Cleanup of multiple metadata files across multiple snapshots (New)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file primarily attach previous metadata and statistics filewhen writing metadata
Thanks for the suggestions @flyrain , I've updated the code based on your comments and added some comment to help the code easier to understand. I’d appreciate any further feedback, and thanks for your time and patience! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, @danielhumanmod. Left some comments.
fileIO.deleteFile(tableEntity.getMetadataLocation()); | ||
|
||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
private List<List<String>> getContentFileBatch(TableMetadata tableMetadata, FileIO fileIO) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name contentFile
in Iceberg refers to a data file or a delete file. Let's pick another name to avoid confusion, as metadata files and statistic files are not content files. How about metadataFile
?
|
||
private List<List<String>> getContentFileBatch(TableMetadata tableMetadata, FileIO fileIO) { | ||
List<List<String>> result = new ArrayList<>(); | ||
List<String> contentFiles = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename here
Stream.concat( | ||
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), | ||
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path)) | ||
.filter(file -> TaskUtils.exists(file, fileIO)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check the file existence here? It will be checked at delete time, right?
.toList(); | ||
}); | ||
|
||
Stream<TaskEntity> contentFileCleanupTasks = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> metadataFileCleanupTask
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java
Show resolved
Hide resolved
@@ -185,12 +236,18 @@ private CompletableFuture<Void> tryDelete( | |||
public static final class ManifestCleanupTask { | |||
private TableIdentifier tableId; | |||
private String manifestFileData; | |||
private List<String> contentFileBatch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename here
LOGGER | ||
.atWarn() | ||
.addKeyValue("contentFileBatch", contentFileBatch.toString()) | ||
.addKeyValue("tableId", tableId) | ||
.log("Table content cleanup task scheduled, but the none of the file in batch exists"); | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to warn here if 1 out of 10 files doesn't exist?
LOGGER | ||
.atInfo() | ||
.addKeyValue("contentFileBatch", contentFileBatch.toString()) | ||
.addKeyValue("tableId", tableId) | ||
.log("Content file batch deletion has completed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can either remove it or make it debug level. An info level log may not be necessary.
LOGGER | ||
.atWarn() | ||
.addKeyValue("contentFileBatch", contentFileBatch.toString()) | ||
.addKeyValue("tableId", tableId) | ||
.log("Exception detected during content file batch deletion", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw here?
* files in a manifest and the manifest itself. Since data files may be present in multiple | ||
* manifests across different snapshots, we assume a data file that doesn't exist is missing because | ||
* it was already deleted by another task. 2. Table content files: It contains previous metadata and | ||
* statistics files, which are grouped and deleted in batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle the same for partition statistics files also (in this PR or as a follow up)
more details: apache/iceberg#9409
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to handle the same for partition statistics files also (in this PR or as a follow up) more details: apache/iceberg#9409
Thanks for your suggestion! Would it be okay to address this in a separate PR avoid making this one too large? (Left a comment on the code, I can create an issue to track it if necessary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
works for me.
- 1. renaming - 2. add log and exception handling - 3. remove unnecessary log
- 1. renaming - 2. extract task entities creation into methods - 3. remove unnecessary filtering
fileIO.deleteFile(tableEntity.getMetadataLocation()); | ||
|
||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract previous manifest task creation into a new method, no new change added for line 152 - 201
.build(); | ||
}); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code blow this line is the new changes
Thanks for your review and suggestions @flyrain @ajantha-bhat ! I've made updates to the code based on your comments and would greatly appreciate any additional feedback. I understand this PR is a bit long—thank you again for your patience and time! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the slow iteration cycle, but before we check in, I'd like to split the task type. Splitting the task type now means we have freedom to change the handler class design later, but if we start storing different tasks with the same task type, we won't be able to split the handlers later on.
@@ -49,6 +51,7 @@ public class TableCleanupTaskHandler implements TaskHandler { | |||
private final TaskExecutor taskExecutor; | |||
private final MetaStoreManagerFactory metaStoreManagerFactory; | |||
private final Function<TaskEntity, FileIO> fileIOSupplier; | |||
private static final int BATCH_SIZE = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the PolarisConfigurationStore
to control this value
* {@link TaskHandler} responsible for deleting previous metadata and statistics files of a table. | ||
*/ | ||
public class TableContentCleanupTaskHandler implements TaskHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rename this to something like BatchFileCleanupTaskHandler
and make the javadoc more generic. We don't really need to know what kind of files these are, as we treat all of them the same (unlike the ManifestFileCleanupTaskHandler
, which has to read the manifests).
FILE_CLEANUP(2); | ||
FILE_CLEANUP(2), | ||
TABLE_CONTENT_CLEANUP(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FILE_CLEANUP
type was poorly named. We should make it specific, as only the ManifestFileCleanupTaskHandler
deals with it. That said, the actual serialized value is only the integer (see https://github.com/collado-mike/polaris/blob/8cb6b44bf57dc597dab612d109d3eb534aef5715/polaris-core/src/main/java/org/apache/polaris/core/entity/AsyncTaskType.java#L34-L37 ), so we can rename it and maintain backward compatibility.
With that in mind, I think we should rename FILE_CLEANUP
-> MANIFEST_FILE_CLEANUP
and add two enums for the new file types: METADATA_LOG_ENTRY_CLEANUP
and STATISTICS_FILE_CLEANUP
. Your task handler can look for instances of both types. I think that gives us flexibility in the future if we need to handle the different file types differently.
taskExecutor.addTaskHandler( | ||
new ManifestFileCleanupTaskHandler( | ||
fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); | ||
taskExecutor.addTaskHandler( | ||
new TableContentCleanupTaskHandler( | ||
fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth considering whether these ought to share the same executor service... I haven't put much thought into it, so... we can just leave a comment for now if there's not a strong argument either way.
/** | ||
* {@link TaskHandler} responsible for deleting previous metadata and statistics files of a table. | ||
*/ | ||
public class TableContentCleanupTaskHandler implements TaskHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oof, sorry for missing this comment - I don't think we should have merged these two classes. At most, I think a common base class would be fine, but I'd prefer to avoid overloading the same class and cluttering it with if/else statements
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); | ||
TableIdentifier tableId = cleanupTask.getTableId(); | ||
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) { | ||
|
||
// if the file doesn't exist, we assume that another task execution was successful, but failed | ||
// to drop the task entity. Log a warning and return success | ||
if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) { | ||
if (cleanupTask.getManifestFileData() != null) { | ||
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData()); | ||
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId); | ||
} else if (cleanupTask.getMetadataFiles() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to belabor this, but I don't want to overload this class with logic for handling many different file types. As I mentioned, the AsyncTaskType.FILE_CLEANUP
enum is poorly named, making it sound like this class can just handle any kind of file clean up.
In order to avoid bogging down this PR too much, can we just add a second task type for the metadata files and predicate the logic here on the task type rather than testing the presence of getManifestFileData
or getMetadataFiles
? A future PR can refactor this code to use a common base class for two handlers to avoid duplicating logic while maintaining a clear separation of responsibilities.
Thanks @collado-mike for the suggestions, which are very helpful in improving the code quality. I’ve already split the task types in the cleanup task handler as the comments suggested and am very willing to refactor this in future PRs. Appreciate any additional feedback you may have |
Description
Currently, when handling DROP TABLE PURGE, Polaris only deletes data files and the current metadata in
TableCleanupTaskHandler
. However, other related files, such as historical metadata.json files, stat files, and others, are not deleted.Iceberg has addressed this issue (see Iceberg PR #9305), but Polaris doesn’t adopt this solution due to factors like asynchronous deletion. This PR introduces additional deletions for the following files:
Fixes #289
Included Changes
TableCleanupTaskHandler.java
to schedule previous metadata and statistics deletion in batchManifestFileCleanupTaskHandler
to delete metadata and statistics file batchTaskTestUtils.java
to write metadata with previous metadata and statistics filesTableCleanupTaskHandlerTest.java
andTableContentCleanupTaskHandlerTest
Recommended Review Order:
TableCleanupTaskHandler.java
->ManifestFileCleanupTaskHandler.java
->TaskTestUtils.java
->TableCleanupTaskHandlerTest.java
andManifestFileCleanupTaskHandlerTest
Future Improvement
Type of change
Please delete options that are not relevant.
How Has This Been Tested?
Unit Test:
Checklist:
Please delete options that are not relevant.