Skip to content

Commit

Permalink
[Java] Disable soft delete policy when creating a default bucket for …
Browse files Browse the repository at this point in the history
…a project. (apache#31324)

* Disable soft delete policy when creating a default bucket for a project.

Also, getBucket() and removeBucket() are added into GcsUtil.

* Add random number to prefix to avoid collision on multiple test instances.

* Remove default bucket before creating in the integration test.

* Add one line in CHANGES.md to mention this change and fix a typo.
  • Loading branch information
shunping authored May 17, 2024
1 parent 44177d1 commit a435f45
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
Template variables can be passed with the (json-formatted) `--jinja_variables` flag.
* DataFrame API now supports pandas 2.1.x and adds 12 more string functions for Series.([#31185](https://github.com/apache/beam/pull/31185)).
* Added BigQuery handler for enrichment transform (Python) ([#31295](https://github.com/apache/beam/pull/31295))
* Disable soft delete policy when creating the default bucket for a project (Java) ([#31324](https://github.com/apache/beam/pull/31324)).

## Breaking Changes

Expand Down
24 changes: 24 additions & 0 deletions sdks/java/extensions/google-cloud-platform-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,32 @@ task integrationTestKms(type: Test) {
}
}

// Note that no runner is specified here, so tests running under this task should not be running
// pipelines.
task integrationTestNoKms(type: Test) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests-cmek'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
outputs.upToDateWhen { false }

include '**/*IT.class'
maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.UsesKms"
}
}

task postCommit {
group = "Verification"
description = "Integration tests of GCP connectors using the DirectRunner."
dependsOn integrationTestKms
dependsOn integrationTestNoKms
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import com.google.api.services.cloudresourcemanager.model.Project;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Bucket.SoftDeletePolicy;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
Expand Down Expand Up @@ -392,40 +393,67 @@ class GcpTempLocationFactory implements DefaultValueFactory<String> {
return tempLocation;
}

/**
* Creates a default bucket or verifies the existence and proper access control of an existing
* default bucket. Returns the location if successful.
*/
@VisibleForTesting
static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
static ImmutableList<String> getDefaultBucketNameStubs(
PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
GcsOptions gcsOptions = options.as(GcsOptions.class);

checkArgument(
isNullOrEmpty(gcsOptions.getDataflowKmsKey()),
"Cannot create a default bucket when --dataflowKmsKey is set.");

final String projectId = gcsOptions.getProject();
checkArgument(!isNullOrEmpty(projectId), "--project is a required option.");

// Look up the project number, to create a default bucket with a stable
// name with no special characters.
long projectNumber = 0L;
try {
projectNumber = getProjectNumber(projectId, crmClient);
} catch (IOException e) {
throw new RuntimeException("Unable to verify project with ID " + projectId, e);
}

String region = DEFAULT_REGION;
if (!isNullOrEmpty(gcsOptions.getZone())) {
region = getRegionFromZone(gcsOptions.getZone());
}
final String bucketName = "dataflow-staging-" + region + "-" + projectNumber;

return ImmutableList.of(bucketNamePrefix, region, String.valueOf(projectNumber));
}

/**
* Creates a default bucket or verifies the existence and proper access control of an existing
* default bucket. Returns the location if successful.
*/
@VisibleForTesting
static String tryCreateDefaultBucket(PipelineOptions options, CloudResourceManager crmClient) {
return tryCreateDefaultBucketWithPrefix(options, crmClient, "dataflow-staging");
}

@VisibleForTesting
static String tryCreateDefaultBucketWithPrefix(
PipelineOptions options, CloudResourceManager crmClient, String bucketNamePrefix) {
GcsOptions gcsOptions = options.as(GcsOptions.class);

checkArgument(
isNullOrEmpty(gcsOptions.getDataflowKmsKey()),
"Cannot create a default bucket when --dataflowKmsKey is set.");

final List<String> bucketNameStubs =
getDefaultBucketNameStubs(options, crmClient, bucketNamePrefix);
final String region = bucketNameStubs.get(1);
final long projectNumber = Long.parseLong(bucketNameStubs.get(2));
final String bucketName = String.join("-", bucketNameStubs);
LOG.info("No tempLocation specified, attempting to use default bucket: {}", bucketName);
Bucket bucket = new Bucket().setName(bucketName).setLocation(region);

// Disable soft delete policy for a bucket.
// Reference: https://cloud.google.com/storage/docs/soft-delete
SoftDeletePolicy softDeletePolicy = new SoftDeletePolicy().setRetentionDurationSeconds(0L);

Bucket bucket =
new Bucket()
.setName(bucketName)
.setLocation(region)
.setSoftDeletePolicy(softDeletePolicy);
// Always try to create the bucket before checking access, so that we do not
// race with other pipelines that may be attempting to do the same thing.
try {
gcsOptions.getGcsUtil().createBucket(projectId, bucket);
gcsOptions.getGcsUtil().createBucket(gcsOptions.getProject(), bucket);
} catch (FileAlreadyExistsException e) {
LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,17 @@ public void createBucket(String projectId, Bucket bucket) throws IOException {
createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT);
}

/** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */
@Nullable
public Bucket getBucket(GcsPath path) throws IOException {
return getBucket(path, createBackOff(), Sleeper.DEFAULT);
}

/** Remove an empty {@link Bucket} in Cloud Storage or propagates an exception. */
public void removeBucket(Bucket bucket) throws IOException {
removeBucket(bucket, createBackOff(), Sleeper.DEFAULT);
}

/**
* Returns whether the GCS bucket exists. This will return false if the bucket is inaccessible due
* to permissions.
Expand Down Expand Up @@ -753,6 +764,40 @@ public boolean shouldRetry(IOException e) {
}
}

@VisibleForTesting
void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws IOException {
Storage.Buckets.Delete getBucket = storageClient.buckets().delete(bucket.getName());

try {
ResilientOperation.retry(
getBucket::execute,
backoff,
new RetryDeterminer<IOException>() {
@Override
public boolean shouldRetry(IOException e) {
if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
return false;
}
return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
}
},
IOException.class,
sleeper);
} catch (GoogleJsonResponseException e) {
if (errorExtractor.accessDenied(e)) {
throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
}
if (errorExtractor.itemNotFound(e)) {
throw new FileNotFoundException(e.getMessage());
}
throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
String.format("Error while attempting to remove bucket gs://%s", bucket.getName()), e);
}
}

private static void executeBatches(List<BatchInterface> batches) throws IOException {
ExecutorService executor =
MoreExecutors.listeningDecorator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.gcp.options;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;

import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import com.google.api.services.storage.model.Bucket;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Random;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Integration tests for {@link GcpOptions}. These tests are designed to run against production
* Google Cloud Storage.
*
* <p>This is a runnerless integration test, even though the Beam IT framework assumes one. Thus,
* this test should only be run against single runner (such as DirectRunner).
*/
@RunWith(JUnit4.class)
public class GcpOptionsIT {
/** Tests the creation of a default bucket in a project. */
@Test
public void testCreateDefaultBucket() throws IOException {
TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);

CloudResourceManager crmClient =
GcpOptions.GcpTempLocationFactory.newCloudResourceManagerClient(
options.as(CloudResourceManagerOptions.class))
.build();

GcsOptions gcsOptions = options.as(GcsOptions.class);
GcsUtil gcsUtil = gcsOptions.getGcsUtil();

Random rand = new Random();
// Add a random number to the prefix to avoid collision if multiple test instances
// are run at the same time. To avoid too many dangling buckets if bucket removal fails,
// we limit the max number of possible bucket names in this test to 1000.
String bucketNamePrefix = "gcp-options-it-" + rand.nextInt(1000);

String bucketName =
String.join(
"-",
GcpOptions.GcpTempLocationFactory.getDefaultBucketNameStubs(
options, crmClient, bucketNamePrefix));

// remove existing default bucket if any
try {
Bucket oldBucket = gcsUtil.getBucket(GcsPath.fromUri("gs://" + bucketName));
gcsUtil.removeBucket(oldBucket);
} catch (FileNotFoundException e) {
// the bucket to be created does not exist, which is good news
}

String tempLocation =
GcpOptions.GcpTempLocationFactory.tryCreateDefaultBucketWithPrefix(
options, crmClient, bucketNamePrefix);

GcsPath gcsPath = GcsPath.fromUri(tempLocation);
Bucket bucket = gcsUtil.getBucket(gcsPath);
assertNotNull(bucket);
// verify the soft delete policy is disabled
assertEquals(bucket.getSoftDeletePolicy().getRetentionDurationSeconds(), Long.valueOf(0L));

gcsUtil.removeBucket(bucket);
assertThrows(FileNotFoundException.class, () -> gcsUtil.getBucket(gcsPath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.services.cloudresourcemanager.CloudResourceManager;
Expand Down Expand Up @@ -56,6 +59,7 @@
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand Down Expand Up @@ -230,6 +234,14 @@ public void testCreateBucket() throws Exception {

String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
assertEquals("gs://dataflow-staging-us-north1-1/temp/", bucket);

ArgumentCaptor<Bucket> bucketArg = ArgumentCaptor.forClass(Bucket.class);
verify(mockGcsUtil, times(1)).createBucket(anyString(), bucketArg.capture());

// verify that the soft delete policy is disabled in the default bucket
assertEquals(
bucketArg.getValue().getSoftDeletePolicy().getRetentionDurationSeconds(),
Long.valueOf(0L));
}

@Test
Expand Down

0 comments on commit a435f45

Please sign in to comment.