Skip to content

Commit

Permalink
Repository Cleanup Endpoint (#43900)
Browse files Browse the repository at this point in the history
* Snapshot cleanup functionality via transport/REST endpoint.
* Added all the infrastructure for this with the HLRC and node client
* Made use of it in tests and resolved relevant TODO
* Added new `Custom` CS element that tracks the cleanup logic.
Kept it similar to the delete and in progress classes and gave it
some (for now) redundant way of handling multiple cleanups but only allow one
* Use the exact same mechanism used by deletes to have the combination
of CS entry and increment in repository state ID provide some
concurrency safety (the initial approach of just an entry in the CS
was not enough, we must increment the repository state ID to be safe
against concurrent modifications, otherwise we run the risk of "cleaning up"
blobs that just got created without noticing)
* Isolated the logic to the transport action class as much as I could.
It's not ideal, but we don't need to keep any state and do the same
for other repository operations
(like getting the detailed snapshot shard status)
  • Loading branch information
original-brownbear authored Aug 21, 2019
1 parent 4d210dd commit df01766
Show file tree
Hide file tree
Showing 39 changed files with 1,226 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
Expand Down Expand Up @@ -170,6 +172,35 @@ public void verifyRepositoryAsync(VerifyRepositoryRequest verifyRepositoryReques
VerifyRepositoryResponse::fromXContent, listener, emptySet());
}

/**
* Cleans up a snapshot repository.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
* @param cleanupRepositoryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public CleanupRepositoryResponse cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository,
options, CleanupRepositoryResponse::fromXContent, emptySet());
}

/**
* Asynchronously cleans up a snapshot repository.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
* @param cleanupRepositoryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void cleanupRepositoryAsync(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options,
ActionListener<CleanupRepositoryResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository,
options, CleanupRepositoryResponse::fromXContent, listener, emptySet());
}

/**
* Creates a snapshot.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
Expand Down Expand Up @@ -94,6 +95,20 @@ static Request verifyRepository(VerifyRepositoryRequest verifyRepositoryRequest)
return request;
}

static Request cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_snapshot")
.addPathPart(cleanupRepositoryRequest.name())
.addPathPartAsIs("_cleanup")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withMasterTimeout(cleanupRepositoryRequest.masterNodeTimeout());
parameters.withTimeout(cleanupRepositoryRequest.timeout());
request.addParameters(parameters.asMap());
return request;
}

static Request createSnapshot(CreateSnapshotRequest createSnapshotRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPart("_snapshot")
.addPathPart(createSnapshotRequest.repository())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
Expand Down Expand Up @@ -133,6 +135,17 @@ public void testVerifyRepository() throws IOException {
assertThat(response.getNodes().size(), equalTo(1));
}

public void testCleanupRepository() throws IOException {
AcknowledgedResponse putRepositoryResponse = createTestRepository("test", FsRepository.TYPE, "{\"location\": \".\"}");
assertTrue(putRepositoryResponse.isAcknowledged());

CleanupRepositoryRequest request = new CleanupRepositoryRequest("test");
CleanupRepositoryResponse response = execute(request, highLevelClient().snapshot()::cleanupRepository,
highLevelClient().snapshot()::cleanupRepositoryAsync);
assertThat(response.result().bytes(), equalTo(0L));
assertThat(response.result().blobs(), equalTo(0L));
}

public void testCreateSnapshot() throws IOException {
String repository = "test_repository";
assertTrue(createTestRepository(repository, FsRepository.TYPE, "{\"location\": \".\"}").isAcknowledged());
Expand Down Expand Up @@ -317,4 +330,4 @@ private static Map<String, Object> randomUserMetadata() {
}
return metadata;
}
}
}
36 changes: 36 additions & 0 deletions docs/reference/modules/snapshots.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,42 @@ POST /_snapshot/my_unverified_backup/_verify

It returns a list of nodes where repository was successfully verified or an error message if verification process failed.

[float]
===== Repository Cleanup
Repositories can over time accumulate data that is not referenced by any existing snapshot. This is a result of the data safety guarantees
the snapshot functionality provides in failure scenarios during snapshot creation and the decentralized nature of the snapshot creation
process. This unreferenced data does in no way negatively impact the performance or safety of a snapshot repository but leads to higher
than necessary storage use. In order to clean up this unreferenced data, users can call the cleanup endpoint for a repository which will
trigger a complete accounting of the repositories contents and subsequent deletion of all unreferenced data that was found.

[source,js]
-----------------------------------
POST /_snapshot/my_repository/_cleanup
-----------------------------------
// CONSOLE
// TEST[continued]

The response to a cleanup request looks as follows:

[source,js]
--------------------------------------------------
{
"results": {
"deleted_bytes": 20,
"deleted_blobs": 5
}
}
--------------------------------------------------
// TESTRESPONSE

Depending on the concrete repository implementation the numbers shown for bytes free as well as the number of blobs removed will either
be an approximation or an exact result. Any non-zero value for the number of blobs removed implies that unreferenced blobs were found and
subsequently cleaned up.

Please note that most of the cleanup operations executed by this endpoint are automatically executed when deleting any snapshot from a
repository. If you regularly delete snapshots, you will in most cases not get any or only minor space savings from using this functionality
and should lower your frequency of invoking it accordingly.

[float]
[[snapshots-take-snapshot]]
=== Snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void delete() {
public DeleteResult delete() {
throw new UnsupportedOperationException("URL repository is read only");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -126,9 +127,9 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
try {
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;

import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -92,8 +92,9 @@ public void deleteBlob(String blob) throws URISyntaxException, StorageException
service.deleteBlob(clientName, container, blob);
}

public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
service.deleteBlobDirectory(clientName, container, path, executor);
public DeleteResult deleteBlobDirectory(String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
return service.deleteBlobDirectory(clientName, container, path, executor);
}

public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -72,7 +73,7 @@
import static java.util.Collections.emptyMap;

public class AzureStorageService {

private static final Logger logger = LogManager.getLogger(AzureStorageService.class);

public static final ByteSizeValue MIN_CHUNK_SIZE = new ByteSizeValue(1, ByteSizeUnit.BYTES);
Expand Down Expand Up @@ -192,13 +193,15 @@ public void deleteBlob(String account, String container, String blob) throws URI
});
}

void deleteBlobDirectory(String account, String container, String path, Executor executor)
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
final AtomicLong blobsDeleted = new AtomicLong();
final AtomicLong bytesDeleted = new AtomicLong();
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
Expand All @@ -208,7 +211,17 @@ void deleteBlobDirectory(String account, String container, String path, Executor
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len;
if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength();
} else {
len = -1L;
}
deleteBlob(account, container, blobPath);
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}

@Override
Expand All @@ -234,6 +247,7 @@ public void onAfter() {
exceptions.forEach(ex::addSuppressed);
throw ex;
}
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}

public InputStream getInputStream(String account, String container, String blob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;

import java.io.IOException;
Expand Down Expand Up @@ -77,8 +78,8 @@ public void deleteBlob(String blobName) throws IOException {
}

@Override
public void delete() throws IOException {
blobStore.deleteDirectory(path().buildAsString());
public DeleteResult delete() throws IOException {
return blobStore.deleteDirectory(path().buildAsString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.internal.io.Streams;
Expand All @@ -55,6 +56,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -300,15 +302,24 @@ void deleteBlob(String blobName) throws IOException {
*
* @param pathStr Name of path to delete
*/
void deleteDirectory(String pathStr) throws IOException {
SocketAccess.doPrivilegedVoidIOException(() -> {
DeleteResult deleteDirectory(String pathStr) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Page<Blob> page = client().get(bucketName).list(BlobListOption.prefix(pathStr));
do {
final Collection<String> blobsToDelete = new ArrayList<>();
page.getValues().forEach(b -> blobsToDelete.add(b.getName()));
final AtomicLong blobsDeleted = new AtomicLong(0L);
final AtomicLong bytesDeleted = new AtomicLong(0L);
page.getValues().forEach(b -> {
blobsToDelete.add(b.getName());
blobsDeleted.incrementAndGet();
bytesDeleted.addAndGet(b.getSize());
});
deleteBlobsIgnoringIfNotExists(blobsToDelete);
deleteResult = deleteResult.add(blobsDeleted.get(), bytesDeleted.get());
page = page.getNextPage();
} while (page != null);
return deleteResult;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
Expand Down Expand Up @@ -69,9 +70,13 @@ public void deleteBlob(String blobName) throws IOException {
}
}

// TODO: See if we can get precise result reporting.
private static final DeleteResult DELETE_RESULT = new DeleteResult(1L, 0L);

@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
store.execute(fileContext -> fileContext.delete(path, true));
return DELETE_RESULT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.settings.MockSecureSettings;
Expand All @@ -30,6 +31,7 @@
import java.util.Collection;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
Expand Down Expand Up @@ -58,4 +60,14 @@ protected void createRepository(String repoName) {
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}

// HDFS repository doesn't have precise cleanup stats so we only check whether or not any blobs were removed
@Override
protected void assertCleanupResponse(CleanupRepositoryResponse response, long bytes, long blobs) {
if (blobs > 0) {
assertThat(response.result().blobs(), greaterThan(0L));
} else {
assertThat(response.result().blobs(), equalTo(0L));
}
}
}
Loading

0 comments on commit df01766

Please sign in to comment.