Skip to content

Commit

Permalink
Make BlobStoreRepository#writeIndexGen API Async (elastic#49584)
Browse files Browse the repository at this point in the history
Preliminary to shorten the diff of elastic#49060. In elastic#49060
we execute cluster state updates during the writing of a new
index gen and thus it must be an async API.
  • Loading branch information
original-brownbear committed Nov 26, 2019
1 parent 3862400 commit 46eca1a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
*/
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
ActionListener<Void> listener) throws IOException {
ActionListener<Void> listener) {

if (writeShardGens) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
Expand All @@ -448,14 +448,14 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
// written if all shard paths have been successfully updated.
final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
final ShardGenerations.Builder builder = ShardGenerations.builder();
for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true);
writeUpdatedRepoDataStep.onResponse(updatedRepoData);
}, listener::onFailure);
final ShardGenerations.Builder builder = ShardGenerations.builder();
for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true,
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
Expand All @@ -467,15 +467,17 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, false);
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure);
writeIndexGen(updatedRepoData, repositoryStateId, false, ActionListener.wrap(v -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener),
afterCleanupsListener::onFailure);
}, listener::onFailure));
}
}

Expand Down Expand Up @@ -656,8 +658,9 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId, writeShardGens);
cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new));
writeIndexGen(repositoryData, repositoryStateId, writeShardGens,
ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -768,11 +771,12 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens);
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(snapshotInfo);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(snapshotInfo);
}, onUpdateFailure));
}, onUpdateFailure));
}, onUpdateFailure), 2 + indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
Expand Down Expand Up @@ -1001,50 +1005,58 @@ public boolean isReadOnly() {
return readOnly;
}

protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen,
final boolean writeShardGens) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
expectedGen + "], actual current generation [" + currentGen +
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen < latestKnownGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
bStream.writeLong(newGen);
genBytes = bStream.bytes();
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
try {
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
} catch (IOException e) {
logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
/**
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
* @param listener completion listener
*/
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
throw new RepositoryException(metadata.name(),
"concurrent modification of the index-N file, expected current generation [" + expectedGen +
"], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests");
}
}
final long newGen = currentGen + 1;
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
}
// write the index file
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
if (newGen < latestKnownGen) {
// Don't mess up the index.latest blob
throw new IllegalStateException(
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
}
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
bStream.writeLong(newGen);
genBytes = bStream.bytes();
}
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
try {
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
} catch (IOException e) {
logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
}
}
return null;
});
}

/**
Expand Down Expand Up @@ -1438,7 +1450,7 @@ public void verify(String seed, DiscoveryNode localNode) {
public String toString() {
return "BlobStoreRepository[" +
"[" + metadata.name() +
"], [" + blobStore() + ']' +
"], [" + blobStore.get() + ']' +
']';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.repositories.blobstore;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
Expand All @@ -43,7 +44,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -142,7 +142,7 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
// write to and read from a index file with no entries
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
repository.writeIndexGen(emptyData, emptyData.getGenId(), true);
writeIndexGen(repository, emptyData, emptyData.getGenId());
RepositoryData repoData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository);
assertEquals(repoData, emptyData);
assertEquals(repoData.getIndices().size(), 0);
Expand All @@ -151,53 +151,55 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {

// write to and read from an index file with snapshots but no indices
repoData = addRandomSnapshotsToRepoData(repoData, false);
repository.writeIndexGen(repoData, repoData.getGenId(), true);
writeIndexGen(repository, repoData, repoData.getGenId());
assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));

// write to and read from a index file with random repository data
repoData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
repository.writeIndexGen(repoData, repoData.getGenId(), true);
writeIndexGen(repository, repoData, repoData.getGenId());
assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository));
}

public void testIndexGenerationalFiles() throws Exception {
final BlobStoreRepository repository = setupRepo();
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY);

// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
writeIndexGen(repository, repositoryData, RepositoryData.EMPTY_REPO_GEN);
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData));
assertThat(repository.latestIndexBlobId(), equalTo(0L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));

// adding more and writing to a new index generational file
repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(1L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));

// removing a snapshot and writing to a new index generational file
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true);
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(2L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
}

public void testRepositoryDataConcurrentModificationNotAllowed() throws IOException {
public void testRepositoryDataConcurrentModificationNotAllowed() {
final BlobStoreRepository repository = setupRepo();

// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
final long startingGeneration = repositoryData.getGenId();
repository.writeIndexGen(repositoryData, startingGeneration, true);
final PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, startingGeneration, true, future1);

// write repo data again to index generational file, errors because we already wrote to the
// N+1 generation from which this repository data instance was created
expectThrows(RepositoryException.class, () -> repository.writeIndexGen(
repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId(), true));
expectThrows(RepositoryException.class,
() -> writeIndexGen(repository, repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId()));
}

public void testBadChunksize() throws Exception {
Expand Down Expand Up @@ -232,6 +234,12 @@ public void testFsRepositoryCompressDeprecated() {
" See the breaking changes documentation for the next major version.");
}

private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, generation, true, future);
future.actionGet();
}

private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand Down

0 comments on commit 46eca1a

Please sign in to comment.