diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotClient.java index f3a49f064596e..d3b2ea466f458 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotClient.java @@ -20,6 +20,8 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; @@ -170,6 +172,35 @@ public void verifyRepositoryAsync(VerifyRepositoryRequest verifyRepositoryReques VerifyRepositoryResponse::fromXContent, listener, emptySet()); } + /** + * Cleans up a snapshot repository. + * See Snapshot and Restore + * API on elastic.co + * @param cleanupRepositoryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public CleanupRepositoryResponse cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options) + throws IOException { + return restHighLevelClient.performRequestAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository, + options, CleanupRepositoryResponse::fromXContent, emptySet()); + } + + /** + * Asynchronously cleans up a snapshot repository. + * See Snapshot and Restore + * API on elastic.co + * @param cleanupRepositoryRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + */ + public void cleanupRepositoryAsync(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options, + ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository, + options, CleanupRepositoryResponse::fromXContent, listener, emptySet()); + } + /** * Creates a snapshot. *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotRequestConverters.java index 406470ea52cda..703aa0d672555 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/SnapshotRequestConverters.java @@ -23,6 +23,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; @@ -94,6 +95,20 @@ static Request verifyRepository(VerifyRepositoryRequest verifyRepositoryRequest) return request; } + static Request cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest) { + String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_snapshot") + .addPathPart(cleanupRepositoryRequest.name()) + .addPathPartAsIs("_cleanup") + .build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + + RequestConverters.Params parameters = new RequestConverters.Params(); + parameters.withMasterTimeout(cleanupRepositoryRequest.masterNodeTimeout()); + parameters.withTimeout(cleanupRepositoryRequest.timeout()); + request.addParameters(parameters.asMap()); + return request; + } + static Request createSnapshot(CreateSnapshotRequest createSnapshotRequest) throws IOException { String endpoint = new RequestConverters.EndpointBuilder().addPathPart("_snapshot") .addPathPart(createSnapshotRequest.repository()) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotIT.java index 8e4001442b0cc..f9679cf5eb61c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SnapshotIT.java @@ -20,6 +20,8 @@ package org.elasticsearch.client; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; @@ -133,6 +135,17 @@ public void testVerifyRepository() throws IOException { assertThat(response.getNodes().size(), equalTo(1)); } + public void testCleanupRepository() throws IOException { + AcknowledgedResponse putRepositoryResponse = createTestRepository("test", FsRepository.TYPE, "{\"location\": \".\"}"); + assertTrue(putRepositoryResponse.isAcknowledged()); + + CleanupRepositoryRequest request = new CleanupRepositoryRequest("test"); + CleanupRepositoryResponse response = execute(request, highLevelClient().snapshot()::cleanupRepository, + highLevelClient().snapshot()::cleanupRepositoryAsync); + assertThat(response.result().bytes(), equalTo(0L)); + assertThat(response.result().blobs(), equalTo(0L)); + } + public void testCreateSnapshot() throws IOException { String repository = "test_repository"; assertTrue(createTestRepository(repository, FsRepository.TYPE, "{\"location\": \".\"}").isAcknowledged()); @@ -317,4 +330,4 @@ private static Map randomUserMetadata() { } return metadata; } -} \ No newline at end of file +} diff --git a/docs/reference/modules/snapshots.asciidoc b/docs/reference/modules/snapshots.asciidoc index 50d037b5ffb20..7383dd5d19295 100644 --- a/docs/reference/modules/snapshots.asciidoc +++ b/docs/reference/modules/snapshots.asciidoc @@ -332,6 +332,42 @@ POST /_snapshot/my_unverified_backup/_verify It returns a list of nodes where repository was successfully verified or an error message if verification process failed. +[float] +===== Repository Cleanup +Repositories can over time accumulate data that is not referenced by any existing snapshot. This is a result of the data safety guarantees +the snapshot functionality provides in failure scenarios during snapshot creation and the decentralized nature of the snapshot creation +process. This unreferenced data does in no way negatively impact the performance or safety of a snapshot repository but leads to higher +than necessary storage use. In order to clean up this unreferenced data, users can call the cleanup endpoint for a repository which will +trigger a complete accounting of the repositories contents and subsequent deletion of all unreferenced data that was found. + +[source,js] +----------------------------------- +POST /_snapshot/my_repository/_cleanup +----------------------------------- +// CONSOLE +// TEST[continued] + +The response to a cleanup request looks as follows: + +[source,js] +-------------------------------------------------- +{ + "results": { + "deleted_bytes": 20, + "deleted_blobs": 5 + } +} +-------------------------------------------------- +// TESTRESPONSE + +Depending on the concrete repository implementation the numbers shown for bytes free as well as the number of blobs removed will either +be an approximation or an exact result. Any non-zero value for the number of blobs removed implies that unreferenced blobs were found and +subsequently cleaned up. + +Please note that most of the cleanup operations executed by this endpoint are automatically executed when deleting any snapshot from a +repository. If you regularly delete snapshots, you will in most cases not get any or only minor space savings from using this functionality +and should lower your frequency of invoking it accordingly. + [float] [[snapshots-take-snapshot]] === Snapshot diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 604dc7c083efd..7a74078894c92 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import java.io.BufferedInputStream; @@ -97,7 +98,7 @@ public void deleteBlob(String blobName) throws IOException { } @Override - public void delete() { + public DeleteResult delete() { throw new UnsupportedOperationException("URL repository is read only"); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 15d1b37ecf817..042f08df48262 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.threadpool.ThreadPool; @@ -126,9 +127,9 @@ public void deleteBlob(String blobName) throws IOException { } @Override - public void delete() throws IOException { + public DeleteResult delete() throws IOException { try { - blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); + return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); } catch (URISyntaxException | StorageException e) { throw new IOException(e); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index a7d9bb93a5125..e4a7e3acb6526 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -21,12 +21,12 @@ import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; - import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.repositories.azure.AzureRepository.Repository; import org.elasticsearch.threadpool.ThreadPool; @@ -92,8 +92,9 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException service.deleteBlob(clientName, container, blob); } - public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException { - service.deleteBlobDirectory(clientName, container, path, executor); + public DeleteResult deleteBlobDirectory(String path, Executor executor) + throws URISyntaxException, StorageException, IOException { + return service.deleteBlobDirectory(clientName, container, path, executor); } public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index f4ee7b9dbcad9..cc4335956b76d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -42,6 +42,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -72,7 +73,7 @@ import static java.util.Collections.emptyMap; public class AzureStorageService { - + private static final Logger logger = LogManager.getLogger(AzureStorageService.class); public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES); @@ -192,13 +193,15 @@ public void deleteBlob(String account, String container, String blob) throws URI }); } - void deleteBlobDirectory(String account, String container, String path, Executor executor) + DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); final AtomicLong outstanding = new AtomicLong(1L); final PlainActionFuture result = PlainActionFuture.newFuture(); + final AtomicLong blobsDeleted = new AtomicLong(); + final AtomicLong bytesDeleted = new AtomicLong(); SocketAccess.doPrivilegedVoidException(() -> { for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ @@ -208,7 +211,17 @@ void deleteBlobDirectory(String account, String container, String path, Executor executor.execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { + final long len; + if (blobItem instanceof CloudBlob) { + len = ((CloudBlob) blobItem).getProperties().getLength(); + } else { + len = -1L; + } deleteBlob(account, container, blobPath); + blobsDeleted.incrementAndGet(); + if (len >= 0) { + bytesDeleted.addAndGet(len); + } } @Override @@ -234,6 +247,7 @@ public void onAfter() { exceptions.forEach(ex::addSuppressed); throw ex; } + return new DeleteResult(blobsDeleted.get(), bytesDeleted.get()); } public InputStream getInputStream(String account, String container, String blob) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 4657ece3c8a2f..da22750242722 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import java.io.IOException; @@ -77,8 +78,8 @@ public void deleteBlob(String blobName) throws IOException { } @Override - public void delete() throws IOException { - blobStore.deleteDirectory(path().buildAsString()); + public DeleteResult delete() throws IOException { + return blobStore.deleteDirectory(path().buildAsString()); } @Override diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index c3fd4848a0c3c..c42fe232b6e46 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStoreException; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.core.internal.io.Streams; @@ -55,6 +56,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -300,15 +302,24 @@ void deleteBlob(String blobName) throws IOException { * * @param pathStr Name of path to delete */ - void deleteDirectory(String pathStr) throws IOException { - SocketAccess.doPrivilegedVoidIOException(() -> { + DeleteResult deleteDirectory(String pathStr) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> { + DeleteResult deleteResult = DeleteResult.ZERO; Page page = client().get(bucketName).list(BlobListOption.prefix(pathStr)); do { final Collection blobsToDelete = new ArrayList<>(); - page.getValues().forEach(b -> blobsToDelete.add(b.getName())); + final AtomicLong blobsDeleted = new AtomicLong(0L); + final AtomicLong bytesDeleted = new AtomicLong(0L); + page.getValues().forEach(b -> { + blobsToDelete.add(b.getName()); + blobsDeleted.incrementAndGet(); + bytesDeleted.addAndGet(b.getSize()); + }); deleteBlobsIgnoringIfNotExists(blobsToDelete); + deleteResult = deleteResult.add(blobsDeleted.get(), bytesDeleted.get()); page = page.getNextPage(); } while (page != null); + return deleteResult; }); } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index e4c9af4d6c70f..304906464dcad 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -28,6 +28,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; @@ -69,9 +70,13 @@ public void deleteBlob(String blobName) throws IOException { } } + // TODO: See if we can get precise result reporting. + private static final DeleteResult DELETE_RESULT = new DeleteResult(1L, 0L); + @Override - public void delete() throws IOException { + public DeleteResult delete() throws IOException { store.execute(fileContext -> fileContext.delete(path, true)); + return DELETE_RESULT; } @Override diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java index e34f290a8e299..d65db92f0670f 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.hdfs; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.bootstrap.JavaVersion; import org.elasticsearch.common.settings.MockSecureSettings; @@ -30,6 +31,7 @@ import java.util.Collection; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; @ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class) public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase { @@ -58,4 +60,14 @@ protected void createRepository(String repoName) { ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); } + + // HDFS repository doesn't have precise cleanup stats so we only check whether or not any blobs were removed + @Override + protected void assertCleanupResponse(CleanupRepositoryResponse response, long bytes, long blobs) { + if (blobs > 0) { + assertThat(response.result().blobs(), greaterThan(0L)); + } else { + assertThat(response.result().blobs(), equalTo(0L)); + } + } } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 9e9cef9cd0e62..46910d840cd0f 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import org.apache.lucene.util.SetOnce; @@ -42,6 +41,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.collect.Tuple; @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -121,7 +122,9 @@ public void deleteBlob(String blobName) throws IOException { } @Override - public void delete() throws IOException { + public DeleteResult delete() throws IOException { + final AtomicLong deletedBlobs = new AtomicLong(); + final AtomicLong deletedBytes = new AtomicLong(); try (AmazonS3Reference clientReference = blobStore.clientReference()) { ObjectListing prevListing = null; while (true) { @@ -135,8 +138,12 @@ public void delete() throws IOException { listObjectsRequest.setPrefix(keyPath); list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); } - final List blobsToDelete = - list.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toList()); + final List blobsToDelete = new ArrayList<>(); + list.getObjectSummaries().forEach(s3ObjectSummary -> { + deletedBlobs.incrementAndGet(); + deletedBytes.addAndGet(s3ObjectSummary.getSize()); + blobsToDelete.add(s3ObjectSummary.getKey()); + }); if (list.isTruncated()) { doDeleteBlobs(blobsToDelete, false); prevListing = list; @@ -150,6 +157,7 @@ public void delete() throws IOException { } catch (final AmazonClientException e) { throw new IOException("Exception when deleting blob container [" + keyPath + "]", e); } + return new DeleteResult(deletedBlobs.get(), deletedBytes.get()); } @Override diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.cleanup_repository.json b/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.cleanup_repository.json new file mode 100644 index 0000000000000..79223dd214188 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/snapshot.cleanup_repository.json @@ -0,0 +1,28 @@ +{ + "snapshot.cleanup_repository": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/modules-snapshots.html", + "stability": "stable", + "methods": ["POST"], + "url": { + "paths": ["/_snapshot/{repository}/_cleanup"], + "parts": { + "repository": { + "type": "string", + "required" : true, + "description": "A repository name" + } + }, + "params": { + "master_timeout": { + "type" : "time", + "description" : "Explicit operation timeout for connection to master node" + }, + "timeout": { + "type" : "time", + "description" : "Explicit operation timeout" + } + } + }, + "body": {} + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yml index 0a5a7260a27a8..2a33cfbda63d0 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yml @@ -38,6 +38,51 @@ setup: - match: { acknowledged: true } +--- +"Create a snapshot and clean up repository": + - skip: + version: " - 7.99.99" + reason: cleanup introduced in 8.0 + + - do: + snapshot.cleanup_repository: + repository: test_repo_create_1 + + - match: { results.deleted_bytes: 0 } + - match: { results.deleted_blobs: 0 } + + - do: + snapshot.create: + repository: test_repo_create_1 + snapshot: test_snapshot + wait_for_completion: true + + - match: { snapshot.snapshot: test_snapshot } + - match: { snapshot.state : SUCCESS } + - match: { snapshot.shards.successful: 1 } + - match: { snapshot.shards.failed : 0 } + + - do: + snapshot.cleanup_repository: + repository: test_repo_create_1 + + - match: { results.deleted_bytes: 0 } + - match: { results.deleted_blobs: 0 } + + - do: + snapshot.delete: + repository: test_repo_create_1 + snapshot: test_snapshot + + - match: { acknowledged: true } + + - do: + snapshot.cleanup_repository: + repository: test_repo_create_1 + + - match: { results.deleted_bytes: 0 } + - match: { results.deleted_blobs: 0 } + --- "Create a snapshot for missing index": diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 643e73963c9ba..be5be216596c7 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -47,6 +47,8 @@ import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction; import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction; import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction; @@ -229,6 +231,7 @@ import org.elasticsearch.rest.action.RestMainAction; import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction; import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction; +import org.elasticsearch.rest.action.admin.cluster.RestCleanupRepositoryAction; import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; import org.elasticsearch.rest.action.admin.cluster.RestClusterGetSettingsAction; @@ -451,6 +454,7 @@ public void reg actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class); actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class); actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class); + actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class); actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class); actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class); actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); @@ -582,6 +586,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestGetRepositoriesAction(restController, settingsFilter)); registerHandler.accept(new RestDeleteRepositoryAction(restController)); registerHandler.accept(new RestVerifyRepositoryAction(restController)); + registerHandler.accept(new RestCleanupRepositoryAction(restController)); registerHandler.accept(new RestGetSnapshotsAction(restController)); registerHandler.accept(new RestCreateSnapshotAction(restController)); registerHandler.accept(new RestRestoreSnapshotAction(restController)); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryAction.java new file mode 100644 index 0000000000000..af57e6d4f00ff --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryAction.java @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.repositories.cleanup; + +import org.elasticsearch.action.ActionType; + +public final class CleanupRepositoryAction extends ActionType { + + public static final CleanupRepositoryAction INSTANCE = new CleanupRepositoryAction(); + public static final String NAME = "cluster:admin/repository/_cleanup"; + + private CleanupRepositoryAction() { + super(NAME, CleanupRepositoryResponse::new); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequest.java new file mode 100644 index 0000000000000..168cdbb496701 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.repositories.cleanup; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class CleanupRepositoryRequest extends AcknowledgedRequest { + + private String repository; + + public CleanupRepositoryRequest(String repository) { + this.repository = repository; + } + + public CleanupRepositoryRequest(StreamInput in) throws IOException { + repository = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repository); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (repository == null) { + validationException = addValidationError("repository is null", null); + } + return validationException; + } + + public String name() { + return repository; + } + + public void name(String repository) { + this.repository = repository; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequestBuilder.java new file mode 100644 index 0000000000000..2f7e6aefdcc94 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryRequestBuilder.java @@ -0,0 +1,38 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.repositories.cleanup; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class CleanupRepositoryRequestBuilder extends MasterNodeOperationRequestBuilder { + + public CleanupRepositoryRequestBuilder(ElasticsearchClient client, ActionType action, + String repository) { + super(client, action, new CleanupRepositoryRequest(repository)); + } + + public CleanupRepositoryRequestBuilder setName(String repository) { + request.name(repository); + return this; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryResponse.java new file mode 100644 index 0000000000000..8516ece925797 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/CleanupRepositoryResponse.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.repositories.cleanup; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.repositories.RepositoryCleanupResult; + +import java.io.IOException; + +public final class CleanupRepositoryResponse extends ActionResponse implements ToXContentObject { + + private static final ObjectParser PARSER = + new ObjectParser<>(CleanupRepositoryResponse.class.getName(), true, CleanupRepositoryResponse::new); + + static { + PARSER.declareObject((response, cleanupResult) -> response.result = cleanupResult, + RepositoryCleanupResult.PARSER, new ParseField("results")); + } + + private RepositoryCleanupResult result; + + public CleanupRepositoryResponse() { + } + + public CleanupRepositoryResponse(RepositoryCleanupResult result) { + this.result = result; + } + + public CleanupRepositoryResponse(StreamInput in) throws IOException { + result = new RepositoryCleanupResult(in); + } + + public RepositoryCleanupResult result() { + return result; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + result.writeTo(out); + } + + public static CleanupRepositoryResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject().field("results"); + result.toXContent(builder, params); + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java new file mode 100644 index 0000000000000..f234a9e064ae5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -0,0 +1,249 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.admin.cluster.repositories.cleanup; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryCleanupResult; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +/** + * Repository cleanup action for repository implementations based on {@link BlobStoreRepository}. + * + * The steps taken by the repository cleanup operation are as follows: + *

    + *
  1. Check that there are no running repository cleanup, snapshot create, or snapshot delete actions + * and add an entry for the repository that is to be cleaned up to {@link RepositoryCleanupInProgress}
  2. + *
  3. Run cleanup actions on the repository. Note, these are executed exclusively on the master node. + * For the precise operations execute see {@link BlobStoreRepository#cleanup}
  4. + *
  5. Remove the entry in {@link RepositoryCleanupInProgress} in the first step.
  6. + *
+ * + * On master failover during the cleanup operation it is simply removed from the cluster state. This is safe because the logic in + * {@link BlobStoreRepository#cleanup} ensures that the repository state id has not changed between creation of the cluster state entry + * and any delete/write operations. TODO: This will not work if we also want to clean up at the shard level as those will involve writes + * as well as deletes. + */ +public final class TransportCleanupRepositoryAction extends TransportMasterNodeAction { + + private static final Logger logger = LogManager.getLogger(TransportCleanupRepositoryAction.class); + + private static final Version MIN_VERSION = Version.V_8_0_0; + + private final RepositoriesService repositoriesService; + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Inject + public TransportCleanupRepositoryAction(TransportService transportService, ClusterService clusterService, + RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(CleanupRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters, + CleanupRepositoryRequest::new, indexNameExpressionResolver); + this.repositoriesService = repositoriesService; + // We add a state applier that will remove any dangling repository cleanup actions on master failover. + // This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent + // operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes. + clusterService.addStateApplier(event -> { + if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { + final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) { + return; + } + clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return removeInProgressCleanup(currentState); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.debug("Removed repository cleanup task [{}] from cluster state", repositoryCleanupInProgress); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + "Failed to remove repository cleanup task [{}] from cluster state", repositoryCleanupInProgress); + } + }); + } + }); + } + + private static ClusterState removeInProgressCleanup(final ClusterState currentState) { + RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (cleanupInProgress != null) { + boolean changed = false; + if (cleanupInProgress.cleanupInProgress() == false) { + cleanupInProgress = new RepositoryCleanupInProgress(); + changed = true; + } + if (changed) { + return ClusterState.builder(currentState).putCustom( + RepositoryCleanupInProgress.TYPE, cleanupInProgress).build(); + } + } + return currentState; + } + + @Override + protected CleanupRepositoryResponse read(StreamInput in) throws IOException { + return new CleanupRepositoryResponse(in); + } + + @Override + protected void masterOperation(Task task, CleanupRepositoryRequest request, ClusterState state, + ActionListener listener) { + if (state.nodes().getMinNodeVersion().onOrAfter(MIN_VERSION)) { + cleanupRepo(request.name(), ActionListener.map(listener, CleanupRepositoryResponse::new)); + } else { + throw new IllegalArgumentException("Repository cleanup is only supported from version [" + MIN_VERSION + + "] but the oldest node version in the cluster is [" + state.nodes().getMinNodeVersion() + ']'); + } + } + + @Override + protected ClusterBlockException checkBlock(CleanupRepositoryRequest request, ClusterState state) { + // Cluster is not affected but we look up repositories in metadata + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + /** + * Runs cleanup operations on the given repository. + * @param repositoryName Repository to clean up + * @param listener Listener for cleanup result + */ + private void cleanupRepo(String repositoryName, ActionListener listener) { + final Repository repository = repositoriesService.repository(repositoryName); + if (repository instanceof BlobStoreRepository == false) { + listener.onFailure(new IllegalArgumentException("Repository [" + repositoryName + "] does not support repository cleanup")); + return; + } + final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + final long repositoryStateId = repository.getRepositoryData().getGenId(); + logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId); + clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']', + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + throw new IllegalStateException( + "Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress"); + } + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new IllegalStateException("Cannot cleanup [" + repositoryName + "] - a snapshot is currently being deleted"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null && !snapshots.entries().isEmpty()) { + throw new IllegalStateException("Cannot cleanup [" + repositoryName + "] - a snapshot is currently running"); + } + return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE, + new RepositoryCleanupInProgress( + RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build(); + } + + @Override + public void onFailure(String source, Exception e) { + after(e, null); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, + l -> blobStoreRepository.cleanup( + repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null))))); + } + + private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) { + if (failure == null) { + logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId); + } else { + logger.debug(() -> new ParameterizedMessage( + "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure); + } + assert failure != null || result != null; + clusterService.submitStateUpdateTask( + "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']', + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return removeInProgressCleanup(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + if (failure != null) { + e.addSuppressed(failure); + } + logger.warn(() -> + new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (failure == null) { + logger.info("Done with repository cleanup on [{}][{}] with result [{}]", + repositoryName, repositoryStateId, result); + listener.onResponse(result); + } else { + logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]", + repositoryName, repositoryStateId), failure); + listener.onFailure(failure); + } + } + }); + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java index cd874b62a40b4..fdee39fdb1f93 100644 --- a/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/elasticsearch/client/ClusterAdminClient.java @@ -49,6 +49,9 @@ import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; @@ -453,6 +456,21 @@ public interface ClusterAdminClient extends ElasticsearchClient { */ GetRepositoriesRequestBuilder prepareGetRepositories(String... name); + /** + * Cleans up repository. + */ + CleanupRepositoryRequestBuilder prepareCleanupRepository(String repository); + + /** + * Cleans up repository. + */ + ActionFuture cleanupRepository(CleanupRepositoryRequest repository); + + /** + * Cleans up repository. + */ + void cleanupRepository(CleanupRepositoryRequest repository, ActionListener listener); + /** * Verifies a repository. */ diff --git a/server/src/main/java/org/elasticsearch/client/Requests.java b/server/src/main/java/org/elasticsearch/client/Requests.java index a3eb23eebfe20..fa7bc73c8b9fc 100644 --- a/server/src/main/java/org/elasticsearch/client/Requests.java +++ b/server/src/main/java/org/elasticsearch/client/Requests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; @@ -460,6 +461,16 @@ public static DeleteRepositoryRequest deleteRepositoryRequest(String name) { return new DeleteRepositoryRequest(name); } + /** + * Cleanup repository + * + * @param name repository name + * @return cleanup repository request + */ + public static CleanupRepositoryRequest cleanupRepositoryRequest(String name) { + return new CleanupRepositoryRequest(name); + } + /** * Verifies snapshot repository * diff --git a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java index c3119256fc7cf..283b8dc0a2843 100644 --- a/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -64,6 +64,10 @@ import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder; @@ -1004,6 +1008,21 @@ public GetRepositoriesRequestBuilder prepareGetRepositories(String... name) { return new GetRepositoriesRequestBuilder(this, GetRepositoriesAction.INSTANCE, name); } + @Override + public CleanupRepositoryRequestBuilder prepareCleanupRepository(String repository) { + return new CleanupRepositoryRequestBuilder(this, CleanupRepositoryAction.INSTANCE, repository); + } + + @Override + public ActionFuture cleanupRepository(CleanupRepositoryRequest request) { + return execute(CleanupRepositoryAction.INSTANCE, request); + } + + @Override + public void cleanupRepository(CleanupRepositoryRequest request, ActionListener listener) { + execute(CleanupRepositoryAction.INSTANCE, request, listener); + } + @Override public ActionFuture restoreSnapshot(RestoreSnapshotRequest request) { return execute(RestoreSnapshotAction.INSTANCE, request); diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index d0448e2be22c9..e445615e0fc73 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -118,6 +118,8 @@ public static List getNamedWriteables() { registerClusterCustom(entries, RestoreInProgress.TYPE, RestoreInProgress::new, RestoreInProgress::readDiffFrom); registerClusterCustom(entries, SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new, SnapshotDeletionsInProgress::readDiffFrom); + registerClusterCustom(entries, RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress::new, + RepositoryCleanupInProgress::readDiffFrom); // Metadata registerMetaDataCustom(entries, RepositoriesMetaData.TYPE, RepositoriesMetaData::new, RepositoriesMetaData::readDiffFrom); registerMetaDataCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom); diff --git a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java new file mode 100644 index 0000000000000..9dfd5284fd8c7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java @@ -0,0 +1,120 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public final class RepositoryCleanupInProgress extends AbstractNamedDiffable implements ClusterState.Custom { + + public static final String TYPE = "repository_cleanup"; + + private final List entries; + + public RepositoryCleanupInProgress(Entry... entries) { + this.entries = Arrays.asList(entries); + } + + RepositoryCleanupInProgress(StreamInput in) throws IOException { + this.entries = in.readList(Entry::new); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(ClusterState.Custom.class, TYPE, in); + } + + public static Entry startedEntry(String repository, long repositoryStateId) { + return new Entry(repository, repositoryStateId); + } + + public boolean cleanupInProgress() { + // TODO: Should we allow parallelism across repositories here maybe? + return entries.isEmpty(); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(entries); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(TYPE); + for (Entry entry : entries) { + builder.startObject(); + { + builder.field("repository", entry.repository); + } + builder.endObject(); + } + builder.endArray(); + return builder; + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.V_8_0_0; + } + + public static final class Entry implements Writeable { + + private final String repository; + + private final long repositoryStateId; + + private Entry(StreamInput in) throws IOException { + repository = in.readString(); + repositoryStateId = in.readLong(); + } + + private Entry(String repository, long repositoryStateId) { + this.repository = repository; + this.repositoryStateId = repositoryStateId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(repository); + out.writeLong(repositoryStateId); + } + + @Override + public String toString() { + return "{" + repository + '}' + '{' + repositoryStateId + '}'; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 94c6ea43d384c..83de4aba8e629 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -102,9 +102,11 @@ public interface BlobContainer { /** * Deletes this container and all its contents from the repository. + * + * @return delete result * @throws IOException on failure */ - void delete() throws IOException; + DeleteResult delete() throws IOException; /** * Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java new file mode 100644 index 0000000000000..9f74e31ad7d51 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.blobstore; + +/** + * The result of deleting multiple blobs from a {@link BlobStore}. + */ +public final class DeleteResult { + + public static final DeleteResult ZERO = new DeleteResult(0, 0); + + private final long blobsDeleted; + private final long bytesDeleted; + + public DeleteResult(long blobsDeleted, long bytesDeleted) { + this.blobsDeleted = blobsDeleted; + this.bytesDeleted = bytesDeleted; + } + + public long blobsDeleted() { + return blobsDeleted; + } + + public long bytesDeleted() { + return bytesDeleted; + } + + public DeleteResult add(DeleteResult other) { + return new DeleteResult(blobsDeleted + other.blobsDeleted(), bytesDeleted + other.bytesDeleted()); + } + + public DeleteResult add(long blobs, long bytes) { + return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 6723a70a9abb3..d333691a9bc26 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.core.internal.io.IOUtils; @@ -45,6 +46,7 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.unmodifiableMap; @@ -110,7 +112,7 @@ public void deleteBlob(String blobName) throws IOException { if (Files.isDirectory(blobPath)) { // delete directory recursively as long as it is empty (only contains empty directories), // which is the reason we aren't deleting any files, only the directories on the post-visit - Files.walkFileTree(blobPath, new SimpleFileVisitor() { + Files.walkFileTree(blobPath, new SimpleFileVisitor<>() { @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { Files.delete(dir); @@ -123,8 +125,26 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx } @Override - public void delete() throws IOException { - IOUtils.rm(path); + public DeleteResult delete() throws IOException { + final AtomicLong filesDeleted = new AtomicLong(0L); + final AtomicLong bytesDeleted = new AtomicLong(0L); + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException { + assert impossible == null; + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + filesDeleted.incrementAndGet(); + bytesDeleted.addAndGet(attrs.size()); + return FileVisitResult.CONTINUE; + } + }); + return new DeleteResult(filesDeleted.get(), bytesDeleted.get()); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryCleanupResult.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryCleanupResult.java new file mode 100644 index 0000000000000..bec61e02ee8f2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryCleanupResult.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.DeleteResult; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +public final class RepositoryCleanupResult implements Writeable, ToXContentObject { + + public static final ObjectParser PARSER = + new ObjectParser<>(RepositoryCleanupResult.class.getName(), true, RepositoryCleanupResult::new); + + private static final String DELETED_BLOBS = "deleted_blobs"; + + private static final String DELETED_BYTES = "deleted_bytes"; + + static { + PARSER.declareLong((result, bytes) -> result.bytes = bytes, new ParseField(DELETED_BYTES)); + PARSER.declareLong((result, blobs) -> result.blobs = blobs, new ParseField(DELETED_BLOBS)); + } + + private long bytes; + + private long blobs; + + private RepositoryCleanupResult() { + this(DeleteResult.ZERO); + } + + public RepositoryCleanupResult(DeleteResult result) { + this.blobs = result.blobsDeleted(); + this.bytes = result.bytesDeleted(); + } + + public RepositoryCleanupResult(StreamInput in) throws IOException { + bytes = in.readLong(); + blobs = in.readLong(); + } + + public long bytes() { + return bytes; + } + + public long blobs() { + return blobs; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(bytes); + out.writeLong(blobs); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field(DELETED_BYTES, bytes).field(DELETED_BLOBS, blobs).endObject(); + } + + @Override + public String toString() { + return Strings.toString(this); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 95a5f86920ab3..681f5734334ad 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -81,6 +82,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; @@ -402,7 +404,8 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never // delete an index that was created by another master node after writing this index-N blob. - foundIndices = blobStore().blobContainer(basePath().add("indices")).children(); + + foundIndices = blobStore().blobContainer(indicesPath()).children(); writeIndexGen(updatedRepositoryData, repositoryStateId); } catch (Exception ex) { listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); @@ -425,18 +428,61 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action .orElse(Collections.emptyList()), snapshotId, ActionListener.map(listener, v -> { - cleanupStaleIndices(foundIndices, survivingIndices); - cleanupStaleRootFiles(Sets.difference(rootBlobs, new HashSet<>(snapMetaFilesToDelete)), updatedRepositoryData); + cleanupStaleIndices(foundIndices, survivingIndices.values().stream().map(IndexId::getId).collect(Collectors.toSet())); + cleanupStaleRootFiles( + staleRootBlobs(updatedRepositoryData, Sets.difference(rootBlobs, new HashSet<>(snapMetaFilesToDelete)))); return null; }) ); } } - private void cleanupStaleRootFiles(Set rootBlobNames, RepositoryData repositoryData) { + /** + * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the + * repository. + * TODO: Add shard level cleanups + *
    + *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • + *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • + *
+ * @param repositoryStateId Current repository state id + * @param listener Lister to complete when done + */ + public void cleanup(long repositoryStateId, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + if (isReadOnly()) { + throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); + } + final RepositoryData repositoryData = getRepositoryData(); + if (repositoryData.getGenId() != repositoryStateId) { + // Check that we are working on the expected repository version before gathering the data to clean up + throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " + + "expected current generation [" + repositoryStateId + "], actual current generation [" + + repositoryData.getGenId() + "]"); + } + Map rootBlobs = blobContainer().listBlobs(); + final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); + final Set survivingIndexIds = + repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); + final List staleRootBlobs = staleRootBlobs(repositoryData, rootBlobs.keySet()); + if (survivingIndexIds.equals(foundIndices.keySet()) && staleRootBlobs.isEmpty()) { + // Nothing to clean up we return + return new RepositoryCleanupResult(DeleteResult.ZERO); + } + // write new index-N blob to ensure concurrent operations will fail + writeIndexGen(repositoryData, repositoryStateId); + final DeleteResult deleteIndicesResult = cleanupStaleIndices(foundIndices, survivingIndexIds); + List cleaned = cleanupStaleRootFiles(staleRootBlobs); + return new RepositoryCleanupResult( + deleteIndicesResult.add(cleaned.size(), cleaned.stream().mapToLong(name -> rootBlobs.get(name).length()).sum())); + }); + } + + // Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData + private List staleRootBlobs(RepositoryData repositoryData, Set rootBlobNames) { final Set allSnapshotIds = repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); - final List blobsToDelete = rootBlobNames.stream().filter( + return rootBlobNames.stream().filter( blob -> { if (FsBlobContainer.isTempBlobName(blob)) { return true; @@ -457,12 +503,16 @@ private void cleanupStaleRootFiles(Set rootBlobNames, RepositoryData rep return false; } ).collect(Collectors.toList()); + } + + private List cleanupStaleRootFiles(List blobsToDelete) { if (blobsToDelete.isEmpty()) { - return; + return blobsToDelete; } try { logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToDelete); blobContainer().deleteBlobsIgnoringIfNotExists(blobsToDelete); + return blobsToDelete; } catch (IOException e) { logger.warn(() -> new ParameterizedMessage( "[{}] The following blobs are no longer part of any snapshot [{}] but failed to remove them", @@ -474,18 +524,18 @@ private void cleanupStaleRootFiles(Set rootBlobNames, RepositoryData rep assert false : e; logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of root level blobs", metadata.name()), e); } + return Collections.emptyList(); } - private void cleanupStaleIndices(Map foundIndices, Map survivingIndices) { + private DeleteResult cleanupStaleIndices(Map foundIndices, Set survivingIndexIds) { + DeleteResult deleteResult = DeleteResult.ZERO; try { - final Set survivingIndexIds = survivingIndices.values().stream() - .map(IndexId::getId).collect(Collectors.toSet()); for (Map.Entry indexEntry : foundIndices.entrySet()) { final String indexSnId = indexEntry.getKey(); try { if (survivingIndexIds.contains(indexSnId) == false) { logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - indexEntry.getValue().delete(); + deleteResult = deleteResult.add(indexEntry.getValue().delete()); logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); } } catch (IOException e) { @@ -501,6 +551,7 @@ private void cleanupStaleIndices(Map foundIndices, Map indices, SnapshotId snapshotId, diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCleanupRepositoryAction.java new file mode 100644 index 0000000000000..3eca34ff2d3d5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCleanupRepositoryAction.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.rest.action.admin.cluster; + +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; + +import static org.elasticsearch.client.Requests.cleanupRepositoryRequest; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +/** + * Cleans up a repository + */ +public class RestCleanupRepositoryAction extends BaseRestHandler { + + public RestCleanupRepositoryAction(RestController controller) { + controller.registerHandler(POST, "/_snapshot/{repository}/_cleanup", this); + } + + @Override + public String getName() { + return "cleanup_repository_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + CleanupRepositoryRequest cleanupRepositoryRequest = cleanupRepositoryRequest(request.param("repository")); + cleanupRepositoryRequest.timeout(request.paramAsTime("timeout", cleanupRepositoryRequest.timeout())); + cleanupRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", cleanupRepositoryRequest.masterNodeTimeout())); + return channel -> client.admin().cluster().cleanupRepository(cleanupRepositoryRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 85497ad60c6e6..7364f1e859de2 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -264,6 +265,11 @@ public ClusterState execute(ClusterState currentState) { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, "cannot snapshot while a snapshot deletion is in-progress"); } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress"); + } SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots == null || snapshots.entries().isEmpty()) { // Store newSnapshot here to be processed in clusterStateProcessed @@ -1134,6 +1140,11 @@ public ClusterState execute(ClusterState currentState) { throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete - another snapshot is currently being deleted"); } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(), + "cannot delete snapshot while a repository cleanup is in-progress"); + } RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); if (restoreInProgress != null) { // don't allow snapshot deletions while a restore is taking place, diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 9f680f11f2117..9766663d58b77 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -20,7 +20,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.SnapshotsInProgress; -import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -72,17 +71,13 @@ public void assertConsistentHistoryInLuceneIndex() throws Exception { @After public void assertRepoConsistency() { if (skipRepoConsistencyCheckReason == null) { - client().admin().cluster().prepareGetRepositories().get().repositories() - .stream() - .map(RepositoryMetaData::name) - .forEach(name -> { - final List snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots(name); - // Delete one random snapshot to trigger repository cleanup. - if (snapshots.isEmpty() == false) { - client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get(); - } - BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); - }); + client().admin().cluster().prepareGetRepositories().get().repositories().forEach(repositoryMetaData -> { + final String name = repositoryMetaData.name(); + if (repositoryMetaData.settings().getAsBoolean("readonly", false) == false) { + client().admin().cluster().prepareCleanupRepository(name).get(); + } + BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); + }); } else { logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 6538ccf40e735..6a6aecc0d8ed1 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -486,13 +486,8 @@ public void testSnapshotWithStuckNode() throws Exception { () -> client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap") .execute().actionGet().getSnapshots("test-repo")); - // TODO: Replace this by repository cleanup endpoint call once that's available logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup"); - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-tmp") - .setWaitForCompletion(true) - .setIndices("test-idx") - .get(); - client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-tmp").get(); + client().admin().cluster().prepareCleanupRepository("test-repo").get(); // Subtract four files that will remain in the repository: // (1) index-(N+1) diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index 15faecf46ca40..c38ddb45ab853 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.DeleteResult; import java.io.IOException; import java.io.InputStream; @@ -60,8 +61,8 @@ public void deleteBlob(String blobName) throws IOException { } @Override - public void delete() throws IOException { - delegate.delete(); + public DeleteResult delete() throws IOException { + return delegate.delete(); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index d21f3db81e69c..9a2e4e246ec43 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.Maps; @@ -48,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -215,13 +217,20 @@ public void deleteBlob(String blobName) { } @Override - public void delete() { + public DeleteResult delete() { ensureNotClosed(); final String thisPath = path.buildAsString(); + final AtomicLong bytesDeleted = new AtomicLong(0L); + final AtomicLong blobsDeleted = new AtomicLong(0L); synchronized (context.actions) { consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath)) - .forEach(a -> context.actions.add(new BlobStoreAction(Operation.DELETE, a.path))); + .forEach(a -> { + context.actions.add(new BlobStoreAction(Operation.DELETE, a.path)); + bytesDeleted.addAndGet(a.data.length); + blobsDeleted.incrementAndGet(); + }); } + return new DeleteResult(blobsDeleted.get(), bytesDeleted.get()); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index a552e7ac54664..bd0a5cc772fd7 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Setting; @@ -330,14 +331,20 @@ public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { } @Override - public void delete() throws IOException { + public DeleteResult delete() throws IOException { + DeleteResult deleteResult = DeleteResult.ZERO; for (BlobContainer child : children().values()) { - child.delete(); + deleteResult = deleteResult.add(child.delete()); } - for (String blob : listBlobs().values().stream().map(BlobMetaData::name).collect(Collectors.toList())) { + final Map blobs = listBlobs(); + long deleteBlobCount = blobs.size(); + long deleteByteCount = 0L; + for (String blob : blobs.values().stream().map(BlobMetaData::name).collect(Collectors.toList())) { deleteBlobIgnoringIfNotExists(blob); + deleteByteCount += blobs.get(blob).length(); } blobStore().blobContainer(path().parent()).deleteBlob(path().toArray()[path().toArray().length - 1]); + return deleteResult.add(deleteBlobCount, deleteByteCount); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 11a359c0ded4d..a78a4b9323e4c 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.PlainActionFuture; @@ -210,30 +211,49 @@ public void testCleanup() throws Exception { .state(), equalTo(SnapshotState.SUCCESS)); - logger.info("--> creating a dangling index folder"); final BlobStoreRepository repo = (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); - final PlainActionFuture future = PlainActionFuture.newFuture(); final Executor genericExec = repo.threadPool().executor(ThreadPool.Names.GENERIC); + + logger.info("--> creating a dangling index folder"); + + createDanglingIndex(repo, genericExec); + + logger.info("--> deleting a snapshot to trigger repository cleanup"); + client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet(); + + assertConsistentRepository(repo, genericExec); + + logger.info("--> Create dangling index"); + createDanglingIndex(repo, genericExec); + + logger.info("--> Execute repository cleanup"); + final CleanupRepositoryResponse response = client().admin().cluster().prepareCleanupRepository("test-repo").get(); + assertCleanupResponse(response, 3L, 1L); + } + + protected void assertCleanupResponse(CleanupRepositoryResponse response, long bytes, long blobs) { + assertThat(response.result().blobs(), equalTo(1L + 2L)); + assertThat(response.result().bytes(), equalTo(3L + 2 * 3L)); + } + + private void createDanglingIndex(final BlobStoreRepository repo, final Executor genericExec) throws Exception { + final PlainActionFuture future = PlainActionFuture.newFuture(); genericExec.execute(new ActionRunnable<>(future) { @Override protected void doRun() throws Exception { final BlobStore blobStore = repo.blobStore(); blobStore.blobContainer(repo.basePath().add("indices").add("foo")) - .writeBlob("bar", new ByteArrayInputStream(new byte[0]), 0, false); + .writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false); for (String prefix : Arrays.asList("snap-", "meta-")) { blobStore.blobContainer(repo.basePath()) - .writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[0]), 0, false); + .writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false); } future.onResponse(null); } }); future.actionGet(); assertTrue(assertCorruptionVisible(repo, genericExec)); - logger.info("--> deleting a snapshot to trigger repository cleanup"); - client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet(); - - assertConsistentRepository(repo, genericExec); } protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception {