Skip to content

Commit

Permalink
AmazonS3Reference, Javadoc & other feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Feb 20, 2018
1 parent 469828c commit 5ae961c
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories.s3;

import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import org.elasticsearch.common.lease.Releasable;

/**
* Handles the shutdown of the internal {@link AmazonS3Client} by reference
* counting.
*/
public class AmazonS3Reference extends AbstractRefCounted implements Releasable {

private final AmazonS3 client;

AmazonS3Reference(AmazonS3 client) {
super("AWS_S3_CLIENT");
this.client = client;
}

/**
* Call when the client is not needed anymore.
*/
@Override
public void close() {
decRef();
}

/**
* Returns the underlying `AmazonS3` client. All calls are permitted BUT NOT
* shutdown. Shutdown is called on 0 reference count.
*/
public AmazonS3 client() {
return client;
}

@Override
protected void closeInternal() {
client.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@

package org.elasticsearch.repositories.s3;

import com.amazonaws.services.s3.AmazonS3;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.RefCounted;

interface AwsS3Service extends LifecycleComponent {

/**
* Creates an {@code AmazonS3} client from the given repository metadata and node settings.
*/
AmazonS3Wrapper client(String clientName);
AmazonS3Reference client(String clientName);

void updateClientSettings(Settings settings);

static interface AmazonS3Wrapper extends Releasable, RefCounted {
AmazonS3 client();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,54 +33,58 @@
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import java.util.Collections;
import java.util.Map;
import static java.util.Collections.emptyMap;


class InternalAwsS3Service extends AbstractLifecycleComponent implements AwsS3Service {

private volatile Map<String, AmazonS3Wrapper> clientsCache = Collections.unmodifiableMap(emptyMap());
private volatile Map<String, S3ClientSettings> clientsSettings = Collections.unmodifiableMap(emptyMap());
private volatile Map<String, AmazonS3Reference> clientsCache = emptyMap();
private volatile Map<String, S3ClientSettings> clientsSettings = emptyMap();

InternalAwsS3Service(Settings settings) {
super(settings);
updateClientSettings(settings);
}

/**
* Reloads the settings for the AmazonS3 client. New clients will be build using
* these. Old clients are usable until released. On release they will be
* destroyed contrary to being returned to the registry.
*/
@Override
public synchronized void updateClientSettings(Settings settings) {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Wrapper clientWrapper : clientsCache.values()) {
clientWrapper.decRef();
}
// clear previously cached clients
clientsCache = Collections.unmodifiableMap(emptyMap());
// shutdown all unused clients, others will shutdown on their respective release
doClose();
// reload secure settings
clientsSettings = Collections.unmodifiableMap(S3ClientSettings.load(settings));
assert clientsSettings.containsKey("default") : "always at least have 'default'";
// clients are built lazily by #client(String)
}

/**
* Attempts to retrieve a client by name from the registry. If the client does
* not exist it will be created.
*/
@Override
public AmazonS3Wrapper client(String clientName) {
AmazonS3Wrapper clientWrapper = clientsCache.get(clientName);
if ((clientWrapper != null) && clientWrapper.tryIncRef()) {
return clientWrapper;
public AmazonS3Reference client(String clientName) {
AmazonS3Reference clientReference = clientsCache.get(clientName);
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
synchronized (this) {
clientWrapper = clientsCache.get(clientName);
if ((clientWrapper != null) && clientWrapper.tryIncRef()) {
return clientWrapper;
clientReference = clientsCache.get(clientName);
if ((clientReference != null) && clientReference.tryIncRef()) {
return clientReference;
}
clientWrapper = new InternalAmazonS3Wrapper(buildClient(clientName));
clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientWrapper).immutableMap();
clientWrapper.incRef();
return clientWrapper;
clientReference = new AmazonS3Reference(buildClient(clientName));
clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientName, clientReference).immutableMap();
clientReference.incRef();
return clientReference;
}
}

// does not require synchronization because it is called inside computeIfAbsent
private AmazonS3 buildClient(String clientName) {
final S3ClientSettings clientSettings = clientsSettings.get(clientName);
if (clientSettings == null) {
Expand Down Expand Up @@ -141,11 +145,15 @@ protected void doStop() throws ElasticsearchException {
}

@Override
protected void doClose() throws ElasticsearchException {
for (final AmazonS3Wrapper clientWrapper : clientsCache.values()) {
clientWrapper.decRef();
protected synchronized void doClose() throws ElasticsearchException {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
}
// Ensure that IdleConnectionReaper is shutdown
// clear previously cached clients
clientsCache = emptyMap();
// shutdown IdleConnectionReaper background thread
// it will be restarted on any new client usage
IdleConnectionReaper.shutdown();
}

Expand All @@ -167,29 +175,4 @@ public void refresh() {
}
}

private static class InternalAmazonS3Wrapper extends AbstractRefCounted implements AmazonS3Wrapper {

private final AmazonS3 client;

public InternalAmazonS3Wrapper(AmazonS3 client) {
super("AWS_S3_CLIENT");
this.client = client;
}

@Override
public void close() {
decRef();
}

@Override
public AmazonS3 client() {
return client;
}

@Override
protected void closeInternal() {
client.shutdown();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,17 @@ class S3BlobContainer extends AbstractBlobContainer {

@Override
public boolean blobExists(String blobName) {
try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
return SocketAccess.doPrivileged(() -> clientWrapper.client().doesObjectExist(blobStore.bucket(), buildKey(blobName)));
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
return SocketAccess.doPrivileged(() -> clientReference.client().doesObjectExist(blobStore.bucket(), buildKey(blobName)));
} catch (Exception e) {
throw new BlobStoreException("Failed to check if blob [" + blobName +"] exists", e);
}
}

@Override
public InputStream readBlob(String blobName) throws IOException {
try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientWrapper.client().getObject(blobStore.bucket(),
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(blobStore.bucket(),
buildKey(blobName)));
return s3Object.getObjectContent();
} catch (AmazonClientException e) {
Expand Down Expand Up @@ -114,8 +114,8 @@ public void deleteBlob(String blobName) throws IOException {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}

try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().deleteObject(blobStore.bucket(), buildKey(blobName)));
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObject(blobStore.bucket(), buildKey(blobName)));
} catch (AmazonClientException e) {
throw new IOException("Exception when deleting blob [" + blobName + "]", e);
}
Expand All @@ -124,19 +124,19 @@ public void deleteBlob(String blobName) throws IOException {
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
final MapBuilder<String, BlobMetaData> blobsBuilder = MapBuilder.newMapBuilder();
try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
final ObjectListing finalPrevListing = prevListing;
list = SocketAccess.doPrivileged(() -> clientWrapper.client().listNextBatchOfObjects(finalPrevListing));
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
if (blobNamePrefix != null) {
list = SocketAccess.doPrivileged(() -> clientWrapper.client().listObjects(blobStore.bucket(),
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(),
buildKey(blobNamePrefix)));
} else {
list = SocketAccess.doPrivileged(() -> clientWrapper.client().listObjects(blobStore.bucket(), keyPath));
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(blobStore.bucket(), keyPath));
}
}
for (S3ObjectSummary summary : list.getObjectSummaries()) {
Expand Down Expand Up @@ -166,10 +166,10 @@ public void move(String sourceBlobName, String targetBlobName) throws IOExceptio
request.setNewObjectMetadata(objectMetadata);
}

try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> {
clientWrapper.client().copyObject(request);
clientWrapper.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName));
clientReference.client().copyObject(request);
clientReference.client().deleteObject(blobStore.bucket(), buildKey(sourceBlobName));
});
} catch (AmazonS3Exception e) {
throw new IOException(e);
Expand Down Expand Up @@ -210,9 +210,9 @@ void executeSingleUpload(final S3BlobStore blobStore,
putRequest.setStorageClass(blobStore.getStorageClass());
putRequest.setCannedAcl(blobStore.getCannedACL());

try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> {
clientWrapper.client().putObject(putRequest);
clientReference.client().putObject(putRequest);
});
} catch (AmazonClientException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
Expand Down Expand Up @@ -259,9 +259,9 @@ void executeMultipartUpload(final S3BlobStore blobStore,
md.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
initRequest.setObjectMetadata(md);
}
try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {

uploadId.set(SocketAccess.doPrivileged(() -> clientWrapper.client().initiateMultipartUpload(initRequest).getUploadId()));
uploadId.set(SocketAccess.doPrivileged(() -> clientReference.client().initiateMultipartUpload(initRequest).getUploadId()));
if (Strings.isEmpty(uploadId.get())) {
throw new IOException("Failed to initialize multipart upload " + blobName);
}
Expand All @@ -286,7 +286,7 @@ void executeMultipartUpload(final S3BlobStore blobStore,
}
bytesCount += uploadRequest.getPartSize();

final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientWrapper.client().uploadPart(uploadRequest));
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(() -> clientReference.client().uploadPart(uploadRequest));
parts.add(uploadResponse.getPartETag());
}

Expand All @@ -296,16 +296,16 @@ void executeMultipartUpload(final S3BlobStore blobStore,
}

CompleteMultipartUploadRequest complRequest = new CompleteMultipartUploadRequest(bucketName, blobName, uploadId.get(), parts);
SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().completeMultipartUpload(complRequest));
SocketAccess.doPrivilegedVoid(() -> clientReference.client().completeMultipartUpload(complRequest));
success = true;

} catch (AmazonClientException e) {
throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e);
} finally {
if (success == false && Strings.hasLength(uploadId.get())) {
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(bucketName, blobName, uploadId.get());
try (AwsS3Service.AmazonS3Wrapper clientWrapper = blobStore.clientWrapper()) {
SocketAccess.doPrivilegedVoid(() -> clientWrapper.client().abortMultipartUpload(abortRequest));
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortRequest));
}
}
}
Expand Down
Loading

0 comments on commit 5ae961c

Please sign in to comment.