Skip to content

Commit

Permalink
GCS repo plugin update-able secure settings (#30688)
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits authored May 28, 2018
1 parent a2bc4d8 commit 000c585
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.util.Map;

class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,24 @@ class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;

private final Storage storage;
private final String bucket;
private final String bucketName;
private final String clientName;
private final GoogleCloudStorageService storageService;

GoogleCloudStorageBlobStore(Settings settings, String bucket, Storage storage) {
GoogleCloudStorageBlobStore(Settings settings, String bucketName, String clientName, GoogleCloudStorageService storageService) {
super(settings);
this.bucket = bucket;
this.storage = storage;
if (doesBucketExist(bucket) == false) {
throw new BlobStoreException("Bucket [" + bucket + "] does not exist");
this.bucketName = bucketName;
this.clientName = clientName;
this.storageService = storageService;
if (doesBucketExist(bucketName) == false) {
throw new BlobStoreException("Bucket [" + bucketName + "] does not exist");
}
}

private Storage client() throws IOException {
return storageService.client(clientName);
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new GoogleCloudStorageBlobContainer(path, this);
Expand All @@ -92,46 +98,44 @@ public void close() {
}

/**
* Return true if the given bucket exists
* Return true iff the given bucket exists
*
* @param bucketName name of the bucket
* @return true if the bucket exists, false otherwise
* @return true iff the bucket exists
*/
boolean doesBucketExist(String bucketName) {
try {
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> storage.get(bucketName));
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
return bucket != null;
} catch (final Exception e) {
throw new BlobStoreException("Unable to check if bucket [" + bucketName + "] exists", e);
}
}

/**
* List blobs in the bucket under the specified path. The path root is removed.
* List blobs in the specific bucket under the specified path. The path root is removed.
*
* @param path
* base path of the blobs to list
* @param path base path of the blobs to list
* @return a map of blob names and their metadata
*/
Map<String, BlobMetaData> listBlobs(String path) throws IOException {
return listBlobsByPrefix(path, "");
}

/**
* List all blobs in the bucket which have a prefix
* List all blobs in the specific bucket with names prefixed
*
* @param path
* base path of the blobs to list. This path is removed from the
* names of the blobs returned.
* @param prefix
* prefix of the blobs to list.
* @param prefix prefix of the blobs to list.
* @return a map of blob names and their metadata.
*/
Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws IOException {
final String pathPrefix = buildKey(path, prefix);
final MapBuilder<String, BlobMetaData> mapBuilder = MapBuilder.newMapBuilder();
SocketAccess.doPrivilegedVoidIOException(() -> {
storage.get(bucket).list(BlobListOption.prefix(pathPrefix)).iterateAll().forEach(blob -> {
client().get(bucketName).list(BlobListOption.prefix(pathPrefix)).iterateAll().forEach(blob -> {
assert blob.getName().startsWith(path);
final String suffixName = blob.getName().substring(path.length());
mapBuilder.put(suffixName, new PlainBlobMetaData(suffixName, blob.getSize()));
Expand All @@ -141,26 +145,26 @@ Map<String, BlobMetaData> listBlobsByPrefix(String path, String prefix) throws I
}

/**
* Returns true if the blob exists in the bucket
* Returns true if the blob exists in the specific bucket
*
* @param blobName name of the blob
* @return true if the blob exists, false otherwise
* @return true iff the blob exists
*/
boolean blobExists(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucket, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> storage.get(blobId));
final BlobId blobId = BlobId.of(bucketName, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId));
return blob != null;
}

/**
* Returns an {@link java.io.InputStream} for a given blob
* Returns an {@link java.io.InputStream} for the given blob name
*
* @param blobName name of the blob
* @return an InputStream
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucket, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> storage.get(blobId));
final BlobId blobId = BlobId.of(bucketName, blobName);
final Blob blob = SocketAccess.doPrivilegedIOException(() -> client().get(blobId));
if (blob == null) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exit");
}
Expand All @@ -185,13 +189,13 @@ public void close() throws IOException {
}

/**
* Writes a blob in the bucket.
* Writes a blob in the specific bucket
*
* @param inputStream content of the blob to be written
* @param blobSize expected size of the blob to be written
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
final BlobInfo blobInfo = BlobInfo.newBuilder(bucket, blobName).build();
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
writeBlobResumable(blobInfo, inputStream);
} else {
Expand All @@ -209,8 +213,8 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I
*/
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException {
try {
final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException(
() -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
final WriteChannel writeChannel = SocketAccess
.doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist()));
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
@Override
public boolean isOpen() {
Expand All @@ -228,7 +232,7 @@ public int write(ByteBuffer src) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
}
}));
} catch (StorageException se) {
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
Expand All @@ -250,45 +254,43 @@ private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
Streams.copy(inputStream, baos);
SocketAccess.doPrivilegedVoidIOException(
() -> {
try {
storage.create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist());
} catch (StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
});
try {
SocketAccess.doPrivilegedVoidIOException(
() -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist()));
} catch (final StorageException se) {
if (se.getCode() == HTTP_PRECON_FAILED) {
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
}
throw se;
}
}

/**
* Deletes a blob in the bucket
* Deletes the blob from the specific bucket
*
* @param blobName name of the blob
*/
void deleteBlob(String blobName) throws IOException {
final BlobId blobId = BlobId.of(bucket, blobName);
final boolean deleted = SocketAccess.doPrivilegedIOException(() -> storage.delete(blobId));
final BlobId blobId = BlobId.of(bucketName, blobName);
final boolean deleted = SocketAccess.doPrivilegedIOException(() -> client().delete(blobId));
if (deleted == false) {
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
}
}

/**
* Deletes multiple blobs in the bucket that have a given prefix
* Deletes multiple blobs from the specific bucket all of which have prefixed names
*
* @param prefix prefix of the buckets to delete
* @param prefix prefix of the blobs to delete
*/
void deleteBlobsByPrefix(String prefix) throws IOException {
deleteBlobs(listBlobsByPrefix("", prefix).keySet());
}

/**
* Deletes multiple blobs in the given bucket (uses a batch request to perform this)
* Deletes multiple blobs from the specific bucket using a batch request
*
* @param blobNames names of the bucket to delete
* @param blobNames names of the blobs to delete
*/
void deleteBlobs(Collection<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
Expand All @@ -299,13 +301,13 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
deleteBlob(blobNames.iterator().next());
return;
}
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blobName -> BlobId.of(bucket, blobName)).collect(Collectors.toList());
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> storage.delete(blobIdsToDelete));
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete));
assert blobIdsToDelete.size() == deletedStatuses.size();
boolean failed = false;
for (int i = 0; i < blobIdsToDelete.size(); i++) {
if (deletedStatuses.get(i) == false) {
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucket);
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName);
failed = true;
}
}
Expand All @@ -315,26 +317,27 @@ void deleteBlobs(Collection<String> blobNames) throws IOException {
}

/**
* Moves a blob within the same bucket
* Moves a blob within the specific bucket
*
* @param sourceBlobName name of the blob to move
* @param targetBlobName new name of the blob in the same bucket
*/
void moveBlob(String sourceBlobName, String targetBlobName) throws IOException {
final BlobId sourceBlobId = BlobId.of(bucket, sourceBlobName);
final BlobId targetBlobId = BlobId.of(bucket, targetBlobName);
final BlobId sourceBlobId = BlobId.of(bucketName, sourceBlobName);
final BlobId targetBlobId = BlobId.of(bucketName, targetBlobName);
final CopyRequest request = CopyRequest.newBuilder()
.setSource(sourceBlobId)
.setTarget(targetBlobId)
.build();
SocketAccess.doPrivilegedVoidIOException(() -> {
// There's no atomic "move" in GCS so we need to copy and delete
// There's no atomic "move" in GCS so we need to copy and delete
final Storage storage = client();
final boolean deleted = SocketAccess.doPrivilegedIOException(() -> {
storage.copy(request).getResult();
final boolean deleted = storage.delete(sourceBlobId);
if (deleted == false) {
throw new IOException("Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]");
}
return storage.delete(sourceBlobId);
});
if (deleted == false) {
throw new IOException("Failed to move source [" + sourceBlobName + "] to target [" + targetBlobName + "]");
}
}

private static String buildKey(String keyPath, String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,34 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReInitializablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin {
public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin, ReInitializablePlugin {

private final Map<String, GoogleCloudStorageClientSettings> clientsSettings;
// package-private for tests
final GoogleCloudStorageService storageService;

public GoogleCloudStoragePlugin(final Settings settings) {
clientsSettings = GoogleCloudStorageClientSettings.load(settings);
}

protected Map<String, GoogleCloudStorageClientSettings> getClientsSettings() {
return clientsSettings;
this.storageService = createStorageService(settings);
// eagerly load client settings so that secure settings are readable (not closed)
reinit(settings);
}

// overridable for tests
protected GoogleCloudStorageService createStorageService(Environment environment) {
return new GoogleCloudStorageService(environment, clientsSettings);
protected GoogleCloudStorageService createStorageService(Settings settings) {
return new GoogleCloudStorageService(settings);
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, createStorageService(env)));
(metadata) -> new GoogleCloudStorageRepository(metadata, env, namedXContentRegistry, this.storageService));
}

@Override
Expand All @@ -66,4 +65,16 @@ public List<Setting<?>> getSettings() {
GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING,
GoogleCloudStorageClientSettings.TOKEN_URI_SETTING);
}

@Override
public boolean reinit(Settings settings) {
// Secure settings should be readable inside this method. Duplicate client
// settings in a format (`GoogleCloudStorageClientSettings`) that does not
// require for the `SecureSettings` to be open. Pass that around (the
// `GoogleCloudStorageClientSettings` instance) instead of the `Settings`
// instance.
final Map<String, GoogleCloudStorageClientSettings> clientsSettings = GoogleCloudStorageClientSettings.load(settings);
this.storageService.updateClientsSettings(clientsSettings);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
import static org.elasticsearch.common.settings.Setting.simpleString;

import com.google.cloud.storage.Storage;

class GoogleCloudStorageRepository extends BlobStoreRepository {

// package private for testing
Expand Down Expand Up @@ -86,8 +84,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {

logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, compress);

Storage client = SocketAccess.doPrivilegedIOException(() -> storageService.createClient(clientName));
this.blobStore = new GoogleCloudStorageBlobStore(settings, bucket, client);
this.blobStore = new GoogleCloudStorageBlobStore(settings, bucket, clientName, storageService);
}


Expand Down
Loading

0 comments on commit 000c585

Please sign in to comment.