diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 6dbfddd320f19..1ed70024f592c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -57,7 +57,6 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
-import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
@@ -2057,6 +2056,7 @@ static ReadChangeStream create() {
return new AutoValue_BigtableIO_ReadChangeStream.Builder()
.setBigtableConfig(config)
.setMetadataTableBigtableConfig(metadataTableconfig)
+ .setValidateConfig(true)
.build();
}
@@ -2080,6 +2080,8 @@ static ReadChangeStream create() {
abstract @Nullable Duration getBacklogReplicationAdjustment();
+ abstract @Nullable Boolean getValidateConfig();
+
abstract ReadChangeStream.Builder toBuilder();
/**
@@ -2284,25 +2286,80 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}
+ /**
+ * Disables validation that the table being read and the metadata table exists, and that the app
+ * profile used is single cluster and single row transaction enabled. Set this option if the
+ * caller does not have additional Bigtable permissions to validate the configurations.
+ * NOTE this also disabled creating or updating the metadata table because that also
+ * requires additional permissions, essentially setting {@link #withCreateOrUpdateMetadataTable}
+ * to false.
+ */
+ public ReadChangeStream withoutValidation() {
+ BigtableConfig config = getBigtableConfig();
+ BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
+ return toBuilder()
+ .setBigtableConfig(config.withValidate(false))
+ .setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false))
+ .setValidateConfig(false)
+ .build();
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ BigtableServiceFactory factory = new BigtableServiceFactory();
+ if (getBigtableConfig().getValidate()) {
+ try {
+ checkArgument(
+ factory.checkTableExists(getBigtableConfig(), options, getTableId()),
+ "Change Stream table %s does not exist",
+ getTableId());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // Validate the app profile is single cluster and allows single row transactions.
+ private void validateAppProfile(
+ MetadataTableAdminDao metadataTableAdminDao, String appProfileId) {
+ checkArgument(metadataTableAdminDao != null);
+ checkArgument(
+ metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId),
+ "App profile id '"
+ + appProfileId
+ + "' provided to access metadata table needs to use single-cluster routing policy"
+ + " and allow single-row transactions.");
+ }
+
+ // Update metadata table schema if allowed and required.
+ private void createOrUpdateMetadataTable(
+ MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) {
+ boolean shouldCreateOrUpdateMetadataTable = true;
+ if (getCreateOrUpdateMetadataTable() != null) {
+ shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
+ }
+ // Only try to create or update metadata table if option is set to true. Otherwise, just
+ // check if the table exists.
+ if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
+ LOG.info("Created metadata table: " + metadataTableId);
+ }
+ }
+
@Override
public PCollection> expand(PBegin input) {
+ BigtableConfig bigtableConfig = getBigtableConfig();
checkArgument(
- getBigtableConfig() != null,
+ bigtableConfig != null,
"BigtableIO ReadChangeStream is missing required configurations fields.");
- checkArgument(
- getBigtableConfig().getProjectId() != null, "Missing required projectId field.");
- checkArgument(
- getBigtableConfig().getInstanceId() != null, "Missing required instanceId field.");
+ bigtableConfig.validate();
checkArgument(getTableId() != null, "Missing required tableId field.");
- BigtableConfig bigtableConfig = getBigtableConfig();
- if (getBigtableConfig().getAppProfileId() == null
- || getBigtableConfig().getAppProfileId().get().isEmpty()) {
+ if (bigtableConfig.getAppProfileId() == null
+ || bigtableConfig.getAppProfileId().get().isEmpty()) {
bigtableConfig = bigtableConfig.withAppProfileId(StaticValueProvider.of("default"));
}
BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
- String metadataTableId = getMetadataTableId();
if (metadataTableConfig.getProjectId() == null
|| metadataTableConfig.getProjectId().get().isEmpty()) {
metadataTableConfig = metadataTableConfig.withProjectId(bigtableConfig.getProjectId());
@@ -2311,6 +2368,7 @@ public PCollection> expand(PBegin input) {
|| metadataTableConfig.getInstanceId().get().isEmpty()) {
metadataTableConfig = metadataTableConfig.withInstanceId(bigtableConfig.getInstanceId());
}
+ String metadataTableId = getMetadataTableId();
if (metadataTableId == null || metadataTableId.isEmpty()) {
metadataTableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
}
@@ -2333,10 +2391,6 @@ public PCollection> expand(PBegin input) {
existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS;
}
- boolean shouldCreateOrUpdateMetadataTable = true;
- if (getCreateOrUpdateMetadataTable() != null) {
- shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
- }
Duration backlogReplicationAdjustment = getBacklogReplicationAdjustment();
if (backlogReplicationAdjustment == null) {
backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT;
@@ -2348,31 +2402,25 @@ public PCollection> expand(PBegin input) {
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);
+ // Validate the configuration is correct before creating the pipeline, if required.
try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();
- checkArgument(metadataTableAdminDao != null);
- checkArgument(
- metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(
- metadataTableConfig.getAppProfileId().get()),
- "App profile id '"
- + metadataTableConfig.getAppProfileId().get()
- + "' provided to access metadata table needs to use single-cluster routing policy"
- + " and allow single-row transactions.");
-
- // Only try to create or update metadata table if option is set to true. Otherwise, just
- // check if the table exists.
- if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
- LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
+ boolean validateConfig = true;
+ if (getValidateConfig() != null) {
+ validateConfig = getValidateConfig();
}
- checkArgument(
- metadataTableAdminDao.doesMetadataTableExist(),
- "Metadata table does not exist: " + metadataTableAdminDao.getTableId());
-
- try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor =
- BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) {
+ // Validate app profile and create metadata table if validate is required.
+ if (validateConfig) {
+ createOrUpdateMetadataTable(metadataTableAdminDao, metadataTableId);
+ validateAppProfile(metadataTableAdminDao, metadataTableConfig.getAppProfileId().get());
+ }
+ // Validate metadata table if validate is required. We validate metadata table after
+ // createOrUpdateMetadataTable because if the metadata doesn't exist, we have to run
+ // createOrUpdateMetadataTable to create the metadata table.
+ if (metadataTableConfig.getValidate()) {
checkArgument(
- bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()),
- "Change Stream table does not exist");
+ metadataTableAdminDao.doesMetadataTableExist(),
+ "Metadata table does not exist: " + metadataTableAdminDao.getTableId());
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -2429,6 +2477,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(
abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);
+ abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);
+
abstract ReadChangeStream build();
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index bffca8652089e..5163e0e4ff25d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -2041,4 +2041,97 @@ synchronized ConfigId newId() {
return ConfigId.create();
}
}
+
+ /////////////////////////// ReadChangeStream ///////////////////////////
+
+ @Test
+ public void testReadChangeStreamBuildsCorrectly() {
+ Instant startTime = Instant.now();
+ BigtableIO.ReadChangeStream readChangeStream =
+ BigtableIO.readChangeStream()
+ .withProjectId("project")
+ .withInstanceId("instance")
+ .withTableId("table")
+ .withAppProfileId("app-profile")
+ .withChangeStreamName("change-stream-name")
+ .withMetadataTableProjectId("metadata-project")
+ .withMetadataTableInstanceId("metadata-instance")
+ .withMetadataTableTableId("metadata-table")
+ .withMetadataTableAppProfileId("metadata-app-profile")
+ .withStartTime(startTime)
+ .withBacklogReplicationAdjustment(Duration.standardMinutes(1))
+ .withCreateOrUpdateMetadataTable(false)
+ .withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
+ assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get());
+ assertEquals("instance", readChangeStream.getBigtableConfig().getInstanceId().get());
+ assertEquals("app-profile", readChangeStream.getBigtableConfig().getAppProfileId().get());
+ assertEquals("table", readChangeStream.getTableId());
+ assertEquals(
+ "metadata-project", readChangeStream.getMetadataTableBigtableConfig().getProjectId().get());
+ assertEquals(
+ "metadata-instance",
+ readChangeStream.getMetadataTableBigtableConfig().getInstanceId().get());
+ assertEquals(
+ "metadata-app-profile",
+ readChangeStream.getMetadataTableBigtableConfig().getAppProfileId().get());
+ assertEquals("metadata-table", readChangeStream.getMetadataTableId());
+ assertEquals("change-stream-name", readChangeStream.getChangeStreamName());
+ assertEquals(startTime, readChangeStream.getStartTime());
+ assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment());
+ assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable());
+ assertEquals(
+ BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS,
+ readChangeStream.getExistingPipelineOptions());
+ }
+
+ @Test
+ public void testReadChangeStreamFailsValidation() {
+ BigtableIO.ReadChangeStream readChangeStream =
+ BigtableIO.readChangeStream()
+ .withProjectId("project")
+ .withInstanceId("instance")
+ .withTableId("table");
+ // Validating table fails because table does not exist.
+ thrown.expect(IllegalArgumentException.class);
+ readChangeStream.validate(TestPipeline.testingPipelineOptions());
+ }
+
+ @Test
+ public void testReadChangeStreamPassWithoutValidation() {
+ BigtableIO.ReadChangeStream readChangeStream =
+ BigtableIO.readChangeStream()
+ .withProjectId("project")
+ .withInstanceId("instance")
+ .withTableId("table")
+ .withoutValidation();
+ // No error is thrown because we skip validation
+ readChangeStream.validate(TestPipeline.testingPipelineOptions());
+ }
+
+ @Test
+ public void testReadChangeStreamValidationFailsDuringApply() {
+ BigtableIO.ReadChangeStream readChangeStream =
+ BigtableIO.readChangeStream()
+ .withProjectId("project")
+ .withInstanceId("instance")
+ .withTableId("table");
+ // Validating table fails because resources cannot be found
+ thrown.expect(RuntimeException.class);
+
+ p.apply(readChangeStream);
+ }
+
+ @Test
+ public void testReadChangeStreamPassWithoutValidationDuringApply() {
+ BigtableIO.ReadChangeStream readChangeStream =
+ BigtableIO.readChangeStream()
+ .withProjectId("project")
+ .withInstanceId("instance")
+ .withTableId("table")
+ .withoutValidation();
+ // No RunTime exception as seen in previous test with validation. Only error that the pipeline
+ // is not ran.
+ thrown.expect(PipelineRunMissingException.class);
+ p.apply(readChangeStream);
+ }
}