Skip to content

Commit

Permalink
Merge branch 'main' into native-users-enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Aug 23, 2023
2 parents 6e733b6 + d734f1a commit 838cbf8
Show file tree
Hide file tree
Showing 22 changed files with 274 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void benchmark() {
case "double" -> {
DoubleVector values = op.getOutput().<DoubleBlock>getBlock(1).asVector();
for (int p = 0; p < values.getPositionCount(); p++) {
sum += values.getDouble(p);
sum += (long) values.getDouble(p);
}
}
case "keyword" -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,43 @@
*/
package org.elasticsearch.repositories.s3;

import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.MultipartUpload;

import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.OptionalBytesReference;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
Expand Down Expand Up @@ -67,4 +93,99 @@ protected void createRepository(String repoName) {
.get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}

public void testCompareAndExchangeCleanup() throws IOException {
final var timeOffsetMillis = new AtomicLong();
final var threadpool = new TestThreadPool(getTestName()) {
@Override
public long absoluteTimeInMillis() {
return super.absoluteTimeInMillis() + timeOffsetMillis.get();
}
};
// construct our own repo instance so we can inject a threadpool that allows to control the passage of time
try (
var repository = new S3Repository(
node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata(),
xContentRegistry(),
node().injector().getInstance(PluginsService.class).filterPlugins(S3RepositoryPlugin.class).get(0).getService(),
ClusterServiceUtils.createClusterService(threadpool),
BigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(node().settings(), node().injector().getInstance(ClusterService.class).getClusterSettings())
)
) {
repository.start();

final var blobStore = (S3BlobStore) repository.blobStore();
final var blobContainer = (S3BlobContainer) blobStore.blobContainer(repository.basePath().add(getTestName()));

try (var clientReference = blobStore.clientReference()) {
final var client = clientReference.client();
final var bucketName = S3Repository.BUCKET_SETTING.get(repository.getMetadata().settings());
final var registerBlobPath = blobContainer.buildKey("key");

class TestHarness {
boolean tryCompareAndSet(BytesReference expected, BytesReference updated) {
return PlainActionFuture.<Boolean, RuntimeException>get(
future -> blobContainer.compareAndSetRegister("key", expected, updated, future),
10,
TimeUnit.SECONDS
);
}

BytesReference readRegister() {
return PlainActionFuture.get(
future -> blobContainer.getRegister("key", future.map(OptionalBytesReference::bytesReference)),
10,
TimeUnit.SECONDS
);
}

List<MultipartUpload> listMultipartUploads() {
return client.listMultipartUploads(new ListMultipartUploadsRequest(bucketName).withPrefix(registerBlobPath))
.getMultipartUploads();
}
}

var testHarness = new TestHarness();

final var bytes1 = new BytesArray(new byte[] { (byte) 1 });
final var bytes2 = new BytesArray(new byte[] { (byte) 2 });
assertTrue(testHarness.tryCompareAndSet(BytesArray.EMPTY, bytes1));

// show we're looking at the right blob
assertEquals(bytes1, testHarness.readRegister());
assertArrayEquals(
bytes1.array(),
client.getObject(new GetObjectRequest(bucketName, registerBlobPath)).getObjectContent().readAllBytes()
);

// a fresh ongoing upload blocks other CAS attempts
client.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, registerBlobPath));
assertThat(testHarness.listMultipartUploads(), hasSize(1));

assertFalse(testHarness.tryCompareAndSet(bytes1, bytes2));
final var multipartUploads = testHarness.listMultipartUploads();
assertThat(multipartUploads, hasSize(1));

// repo clock may not be exactly aligned with ours, but it should be close
final var age = blobStore.getThreadPool().absoluteTimeInMillis() - multipartUploads.get(0)
.getInitiated()
.toInstant()
.toEpochMilli();
final var ageRangeMillis = TimeValue.timeValueMinutes(1).millis();
assertThat(age, allOf(greaterThanOrEqualTo(-ageRangeMillis), lessThanOrEqualTo(ageRangeMillis)));

// if the upload exceeds the TTL then CAS attempts will abort it
timeOffsetMillis.addAndGet(blobStore.getCompareAndExchangeTimeToLive().millis() - Math.min(0, age));
assertTrue(testHarness.tryCompareAndSet(bytes1, bytes2));
assertThat(testHarness.listMultipartUploads(), hasSize(0));
assertEquals(bytes2, testHarness.readRegister());
} finally {
blobContainer.delete();
}
} finally {
ThreadPool.terminate(threadpool, 10, TimeUnit.SECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -476,7 +478,8 @@ private ListObjectsRequest listObjectsRequest(String pathPrefix) {
.withRequestMetricCollector(blobStore.listMetricCollector);
}

private String buildKey(String blobName) {
// exposed for tests
String buildKey(String blobName) {
return keyPath + blobName;
}

Expand Down Expand Up @@ -665,6 +668,22 @@ private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipar
for (MultipartUpload multipartUpload : multipartUploads) {
final var observedUploadId = multipartUpload.getUploadId();
if (observedUploadId.equals(targetUploadId)) {
final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
logger.warn(
"""
compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
which deviates from local node epoch time [{}] by more than the warn threshold of [{}ms]""",
bucket,
blobKey,
multipartUpload.getInitiated(),
multipartUpload.getInitiated().toInstant().toEpochMilli(),
currentTimeMillis,
expectedAgeRangeMillis
);
}
found = true;
} else if (observedUploadId.compareTo(targetUploadId) < 0) {
uploadIndex += 1;
Expand All @@ -674,12 +693,47 @@ private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipar
return found ? uploadIndex : -1;
}

void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
/**
* @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
*/
private boolean hasPreexistingUploads() {
final var uploads = listMultipartUploads();
if (uploads.isEmpty()) {
return false;
}

final var expiryDate = Date.from(
Instant.ofEpochMilli(
blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
)
);
if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
return true;
}

// there are uploads, but they are all older than the TTL, so clean them up before carrying on (should be rare)
for (final var upload : uploads) {
logger.warn(
"cleaning up stale compare-and-swap upload [{}] initiated at [{}]",
upload.getUploadId(),
upload.getInitiated()
);
safeAbortMultipartUpload(upload.getUploadId());
}

return false;
}

void run(BytesReference expected, BytesReference updated, ActionListener<OptionalBytesReference> listener) throws Exception {
BlobContainerUtils.ensureValidRegisterContent(updated);

if (listMultipartUploads().isEmpty() == false) {
// TODO What if the previous writer crashed? We should consider the age of any ongoing uploads before bailing out like this.
if (hasPreexistingUploads()) {

// This is a small optimization to improve the liveness properties of this algorithm.
//
// We can safely proceed even if there are other uploads in progress, but that would add to the potential for collisions and
// delays. Thus in this case we prefer avoid disturbing the ongoing attempts and just fail up front.

listener.onResponse(OptionalBytesReference.MISSING);
return;
}
Expand Down Expand Up @@ -711,13 +765,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
final var isComplete = new AtomicBoolean();
final Runnable doCleanup = () -> {
if (isComplete.compareAndSet(false, true)) {
try {
abortMultipartUploadIfExists(uploadId);
} catch (Exception e) {
// cleanup is a best-effort thing, we can't do anything better than log and fall through here
logger.error("unexpected error cleaning up upload [" + uploadId + "] of [" + blobKey + "]", e);
assert false : e;
}
safeAbortMultipartUpload(uploadId);
}
};

Expand Down Expand Up @@ -761,9 +809,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
final var currentUploadId = currentUpload.getUploadId();
if (uploadId.equals(currentUploadId) == false) {
threadPool.executor(ThreadPool.Names.SNAPSHOT)
.execute(
ActionRunnable.run(listeners.acquire(), () -> abortMultipartUploadIfExists(currentUploadId))
);
.execute(ActionRunnable.run(listeners.acquire(), () -> safeAbortMultipartUpload(currentUploadId)));
}
}
} finally {
Expand All @@ -784,6 +830,15 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
}
}

private void safeAbortMultipartUpload(String uploadId) {
try {
abortMultipartUploadIfExists(uploadId);
} catch (Exception e) {
// cleanup is a best-effort thing, we can't do anything better than log and fall through here
logger.error("unexpected error cleaning up upload [" + uploadId + "] of [" + blobKey + "]", e);
}
}

private void abortMultipartUploadIfExists(String uploadId) {
try {
final var request = new AbortMultipartUploadRequest(bucket, blobKey, uploadId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -127,6 +128,10 @@ public void collectMetrics(Request<?> request) {
};
}

public TimeValue getCompareAndExchangeTimeToLive() {
return service.compareAndExchangeTimeToLive;
}

// metrics collector that ignores null responses that we interpret as the request not reaching the S3 endpoint due to a network
// issue
private abstract static class IgnoreNoResponseMetricsCollector extends RequestMetricCollector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public Collection<Object> createComponents(
AllocationService allocationService,
IndicesService indicesService
) {
service.set(s3Service(environment));
service.set(s3Service(environment, clusterService.getSettings()));
this.service.get().refreshAndClearCache(S3ClientSettings.load(settings));
return List.of(service);
}

S3Service s3Service(Environment environment) {
return new S3Service(environment);
S3Service s3Service(Environment environment, Settings nodeSettings) {
return new S3Service(environment, nodeSettings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.coordination.stateless.StoreHeartbeatService;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;

import java.io.Closeable;
Expand All @@ -51,6 +54,12 @@
class S3Service implements Closeable {
private static final Logger LOGGER = LogManager.getLogger(S3Service.class);

private static final Setting<TimeValue> REPOSITORY_S3_CAS_TTL_SETTING = Setting.timeSetting(
"repository_s3.compare_and_exchange.time_to_live",
StoreHeartbeatService.HEARTBEAT_FREQUENCY,
Setting.Property.NodeScope
);

private volatile Map<S3ClientSettings, AmazonS3Reference> clientsCache = emptyMap();

/**
Expand All @@ -69,13 +78,16 @@ class S3Service implements Closeable {

final CustomWebIdentityTokenCredentialsProvider webIdentityTokenCredentialsProvider;

S3Service(Environment environment) {
final TimeValue compareAndExchangeTimeToLive;

S3Service(Environment environment, Settings nodeSettings) {
webIdentityTokenCredentialsProvider = new CustomWebIdentityTokenCredentialsProvider(
environment,
System::getenv,
System::getProperty,
Clock.systemUTC()
);
compareAndExchangeTimeToLive = REPOSITORY_S3_CAS_TTL_SETTING.get(nodeSettings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ protected void assertSnapshotOrGenericThread() {
}

@Override
S3Service s3Service(Environment environment) {
return new ProxyS3Service(environment);
S3Service s3Service(Environment environment, Settings nodeSettings) {
return new ProxyS3Service(environment, nodeSettings);
}

public static final class ClientAndCredentials extends AmazonS3Wrapper {
Expand All @@ -289,8 +289,8 @@ public static final class ProxyS3Service extends S3Service {

private static final Logger logger = LogManager.getLogger(ProxyS3Service.class);

ProxyS3Service(Environment environment) {
super(environment);
ProxyS3Service(Environment environment, Settings nodeSettings) {
super(environment, nodeSettings);
}

@Override
Expand Down
Loading

0 comments on commit 838cbf8

Please sign in to comment.