From 1f714760219fb3e614b3f9d6176e8a093034b4b4 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Wed, 22 May 2024 16:55:01 -0400 Subject: [PATCH 1/4] Add option to disable validation of cloud bigtable change stream IO Change-Id: I0ca42df2f6d8dfe9cd1eaac7208c77fa8d213c4b --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 131 +++++++++++++----- 1 file changed, 95 insertions(+), 36 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 6dbfddd320f1..36c7820e9fe4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; -import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; @@ -2057,6 +2056,7 @@ static ReadChangeStream create() { return new AutoValue_BigtableIO_ReadChangeStream.Builder() .setBigtableConfig(config) .setMetadataTableBigtableConfig(metadataTableconfig) + .setValidateConfig(true) .build(); } @@ -2080,6 +2080,8 @@ static ReadChangeStream create() { abstract @Nullable Duration getBacklogReplicationAdjustment(); + abstract @Nullable Boolean getValidateConfig(); + abstract ReadChangeStream.Builder toBuilder(); /** @@ -2284,25 +2286,99 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { return toBuilder().setBacklogReplicationAdjustment(adjustment).build(); } + /** + * Disables validation that the table being read and the metadata table exists, and that the app + * profile used is single cluster and single row transcation enabled. Set this option if the + * caller does not have additional Bigtable permissions to validate the configurations. + */ + public ReadChangeStream withoutValidation() { + BigtableConfig config = getBigtableConfig(); + BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig(); + return toBuilder() + .setBigtableConfig(config.withValidate(false)) + .setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false)) + .setValidateConfig(false) + .build(); + } + + @Override + public void validate(PipelineOptions options) { + BigtableServiceFactory factory = new BigtableServiceFactory(); + if (getBigtableConfig().getValidate()) { + validateTableExists(factory, getBigtableConfig(), options); + } + if (getMetadataTableBigtableConfig().getValidate()) { + validateMetadataTableExists(factory, getMetadataTableBigtableConfig(), options); + } + } + + // Validate the change stream table exists. + private void validateTableExists( + BigtableServiceFactory factory, BigtableConfig config, PipelineOptions options) { + try { + checkArgument( + factory.checkTableExists(config, options, getTableId()), + "Change Stream table %s does not exist", + getTableId()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Validate the metadata table exists. + private void validateMetadataTableExists( + BigtableServiceFactory factory, BigtableConfig config, PipelineOptions options) { + try { + checkArgument( + factory.checkTableExists(config, options, getMetadataTableId()), + "Metadata table %s does not exist", + getTableId()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // Validate the app profile is single cluster and allows single row transactions. + private void validateAppProfileCorrect( + MetadataTableAdminDao metadataTableAdminDao, String appProfileId) { + checkArgument(metadataTableAdminDao != null); + checkArgument( + metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId), + "App profile id '" + + appProfileId + + "' provided to access metadata table needs to use single-cluster routing policy" + + " and allow single-row transactions."); + } + + // Update metadata table schema if allowed and required. + private void updateMetadataTableIfRequired( + MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) { + boolean shouldCreateOrUpdateMetadataTable = true; + if (getCreateOrUpdateMetadataTable() != null) { + shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable(); + } + // Only try to create or update metadata table if option is set to true. Otherwise, just + // check if the table exists. + if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) { + LOG.info("Created metadata table: " + metadataTableId); + } + } + @Override public PCollection> expand(PBegin input) { checkArgument( getBigtableConfig() != null, "BigtableIO ReadChangeStream is missing required configurations fields."); - checkArgument( - getBigtableConfig().getProjectId() != null, "Missing required projectId field."); - checkArgument( - getBigtableConfig().getInstanceId() != null, "Missing required instanceId field."); + getBigtableConfig().validate(); checkArgument(getTableId() != null, "Missing required tableId field."); BigtableConfig bigtableConfig = getBigtableConfig(); - if (getBigtableConfig().getAppProfileId() == null - || getBigtableConfig().getAppProfileId().get().isEmpty()) { + if (bigtableConfig.getAppProfileId() == null + || bigtableConfig.getAppProfileId().get().isEmpty()) { bigtableConfig = bigtableConfig.withAppProfileId(StaticValueProvider.of("default")); } BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig(); - String metadataTableId = getMetadataTableId(); if (metadataTableConfig.getProjectId() == null || metadataTableConfig.getProjectId().get().isEmpty()) { metadataTableConfig = metadataTableConfig.withProjectId(bigtableConfig.getProjectId()); @@ -2311,6 +2387,7 @@ public PCollection> expand(PBegin input) { || metadataTableConfig.getInstanceId().get().isEmpty()) { metadataTableConfig = metadataTableConfig.withInstanceId(bigtableConfig.getInstanceId()); } + String metadataTableId = getMetadataTableId(); if (metadataTableId == null || metadataTableId.isEmpty()) { metadataTableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME; } @@ -2333,10 +2410,6 @@ public PCollection> expand(PBegin input) { existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS; } - boolean shouldCreateOrUpdateMetadataTable = true; - if (getCreateOrUpdateMetadataTable() != null) { - shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable(); - } Duration backlogReplicationAdjustment = getBacklogReplicationAdjustment(); if (backlogReplicationAdjustment == null) { backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT; @@ -2348,31 +2421,15 @@ public PCollection> expand(PBegin input) { new DaoFactory( bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName); + boolean validateConfig = true; + if (getValidateConfig() != null) { + validateConfig = getValidateConfig(); + } try { - MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao(); - checkArgument(metadataTableAdminDao != null); - checkArgument( - metadataTableAdminDao.isAppProfileSingleClusterAndTransactional( - metadataTableConfig.getAppProfileId().get()), - "App profile id '" - + metadataTableConfig.getAppProfileId().get() - + "' provided to access metadata table needs to use single-cluster routing policy" - + " and allow single-row transactions."); - - // Only try to create or update metadata table if option is set to true. Otherwise, just - // check if the table exists. - if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) { - LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId()); - } - checkArgument( - metadataTableAdminDao.doesMetadataTableExist(), - "Metadata table does not exist: " + metadataTableAdminDao.getTableId()); - - try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor = - BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) { - checkArgument( - bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()), - "Change Stream table does not exist"); + if (validateConfig) { + updateMetadataTableIfRequired(daoFactory.getMetadataTableAdminDao(), metadataTableId); + validateAppProfileCorrect( + daoFactory.getMetadataTableAdminDao(), metadataTableConfig.getAppProfileId().get()); } } catch (Exception e) { throw new RuntimeException(e); @@ -2429,6 +2486,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions( abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment); + abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig); + abstract ReadChangeStream build(); } } From 10929b9ee7d1613eeb6c60dc20f9d965ed84587e Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Thu, 23 May 2024 02:10:25 -0400 Subject: [PATCH 2/4] Fix ordering of the validations Change-Id: I4b7f060381d3402a62a1e68045e1d3b7f1f50a9b --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 66 ++++++++----------- 1 file changed, 27 insertions(+), 39 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 36c7820e9fe4..7061f3092a0b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2305,41 +2305,19 @@ public ReadChangeStream withoutValidation() { public void validate(PipelineOptions options) { BigtableServiceFactory factory = new BigtableServiceFactory(); if (getBigtableConfig().getValidate()) { - validateTableExists(factory, getBigtableConfig(), options); - } - if (getMetadataTableBigtableConfig().getValidate()) { - validateMetadataTableExists(factory, getMetadataTableBigtableConfig(), options); - } - } - - // Validate the change stream table exists. - private void validateTableExists( - BigtableServiceFactory factory, BigtableConfig config, PipelineOptions options) { - try { - checkArgument( - factory.checkTableExists(config, options, getTableId()), - "Change Stream table %s does not exist", - getTableId()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - // Validate the metadata table exists. - private void validateMetadataTableExists( - BigtableServiceFactory factory, BigtableConfig config, PipelineOptions options) { - try { - checkArgument( - factory.checkTableExists(config, options, getMetadataTableId()), - "Metadata table %s does not exist", - getTableId()); - } catch (IOException e) { - throw new RuntimeException(e); + try { + checkArgument( + factory.checkTableExists(getBigtableConfig(), options, getTableId()), + "Change Stream table %s does not exist", + getTableId()); + } catch (IOException e) { + throw new RuntimeException(e); + } } } // Validate the app profile is single cluster and allows single row transactions. - private void validateAppProfileCorrect( + private void validateAppProfile( MetadataTableAdminDao metadataTableAdminDao, String appProfileId) { checkArgument(metadataTableAdminDao != null); checkArgument( @@ -2351,7 +2329,7 @@ private void validateAppProfileCorrect( } // Update metadata table schema if allowed and required. - private void updateMetadataTableIfRequired( + private void createOrUpdateMetadataTable( MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) { boolean shouldCreateOrUpdateMetadataTable = true; if (getCreateOrUpdateMetadataTable() != null) { @@ -2421,15 +2399,25 @@ public PCollection> expand(PBegin input) { new DaoFactory( bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName); - boolean validateConfig = true; - if (getValidateConfig() != null) { - validateConfig = getValidateConfig(); - } + // Validate the configuration is correct before creating the pipeline, if required. try { + MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao(); + boolean validateConfig = true; + if (getValidateConfig() != null) { + validateConfig = getValidateConfig(); + } + // Validate app profile and create metadata table if validate is required. if (validateConfig) { - updateMetadataTableIfRequired(daoFactory.getMetadataTableAdminDao(), metadataTableId); - validateAppProfileCorrect( - daoFactory.getMetadataTableAdminDao(), metadataTableConfig.getAppProfileId().get()); + createOrUpdateMetadataTable(metadataTableAdminDao, metadataTableId); + validateAppProfile(metadataTableAdminDao, metadataTableConfig.getAppProfileId().get()); + } + // Validate metadata table if validate is required. We validate metadata table after + // createOrUpdateMetadataTable because if the metadata doesn't exist, we have to run + // createOrUpdateMetadataTable to create the metadata table. + if (metadataTableConfig.getValidate()) { + checkArgument( + metadataTableAdminDao.doesMetadataTableExist(), + "Metadata table does not exist: " + metadataTableAdminDao.getTableId()); } } catch (Exception e) { throw new RuntimeException(e); From 13a3e46f87f79143dd5a9c01117049609f28240e Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Thu, 23 May 2024 15:42:12 -0400 Subject: [PATCH 3/4] Add unit test for ReadChangeStream config and validation Change-Id: I4b47693f7bdc83b116b0b3aea9cdc2cd77f15bc4 --- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index bffca8652089..5163e0e4ff25 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -2041,4 +2041,97 @@ synchronized ConfigId newId() { return ConfigId.create(); } } + + /////////////////////////// ReadChangeStream /////////////////////////// + + @Test + public void testReadChangeStreamBuildsCorrectly() { + Instant startTime = Instant.now(); + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table") + .withAppProfileId("app-profile") + .withChangeStreamName("change-stream-name") + .withMetadataTableProjectId("metadata-project") + .withMetadataTableInstanceId("metadata-instance") + .withMetadataTableTableId("metadata-table") + .withMetadataTableAppProfileId("metadata-app-profile") + .withStartTime(startTime) + .withBacklogReplicationAdjustment(Duration.standardMinutes(1)) + .withCreateOrUpdateMetadataTable(false) + .withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS); + assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get()); + assertEquals("instance", readChangeStream.getBigtableConfig().getInstanceId().get()); + assertEquals("app-profile", readChangeStream.getBigtableConfig().getAppProfileId().get()); + assertEquals("table", readChangeStream.getTableId()); + assertEquals( + "metadata-project", readChangeStream.getMetadataTableBigtableConfig().getProjectId().get()); + assertEquals( + "metadata-instance", + readChangeStream.getMetadataTableBigtableConfig().getInstanceId().get()); + assertEquals( + "metadata-app-profile", + readChangeStream.getMetadataTableBigtableConfig().getAppProfileId().get()); + assertEquals("metadata-table", readChangeStream.getMetadataTableId()); + assertEquals("change-stream-name", readChangeStream.getChangeStreamName()); + assertEquals(startTime, readChangeStream.getStartTime()); + assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment()); + assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable()); + assertEquals( + BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS, + readChangeStream.getExistingPipelineOptions()); + } + + @Test + public void testReadChangeStreamFailsValidation() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table"); + // Validating table fails because table does not exist. + thrown.expect(IllegalArgumentException.class); + readChangeStream.validate(TestPipeline.testingPipelineOptions()); + } + + @Test + public void testReadChangeStreamPassWithoutValidation() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table") + .withoutValidation(); + // No error is thrown because we skip validation + readChangeStream.validate(TestPipeline.testingPipelineOptions()); + } + + @Test + public void testReadChangeStreamValidationFailsDuringApply() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table"); + // Validating table fails because resources cannot be found + thrown.expect(RuntimeException.class); + + p.apply(readChangeStream); + } + + @Test + public void testReadChangeStreamPassWithoutValidationDuringApply() { + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId("project") + .withInstanceId("instance") + .withTableId("table") + .withoutValidation(); + // No RunTime exception as seen in previous test with validation. Only error that the pipeline + // is not ran. + thrown.expect(PipelineRunMissingException.class); + p.apply(readChangeStream); + } } From aa831b473302d5b310337541939e068c58633deb Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Fri, 24 May 2024 13:38:10 -0400 Subject: [PATCH 4/4] Add additionl comment clarifying the effect of withoutValidation Change-Id: I8f341d36e78f5b737741afa4dfa899884b3c9387 --- .../apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 7061f3092a0b..1ed70024f592 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2288,8 +2288,11 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { /** * Disables validation that the table being read and the metadata table exists, and that the app - * profile used is single cluster and single row transcation enabled. Set this option if the + * profile used is single cluster and single row transaction enabled. Set this option if the * caller does not have additional Bigtable permissions to validate the configurations. + * NOTE this also disabled creating or updating the metadata table because that also + * requires additional permissions, essentially setting {@link #withCreateOrUpdateMetadataTable} + * to false. */ public ReadChangeStream withoutValidation() { BigtableConfig config = getBigtableConfig(); @@ -2344,13 +2347,13 @@ private void createOrUpdateMetadataTable( @Override public PCollection> expand(PBegin input) { + BigtableConfig bigtableConfig = getBigtableConfig(); checkArgument( - getBigtableConfig() != null, + bigtableConfig != null, "BigtableIO ReadChangeStream is missing required configurations fields."); - getBigtableConfig().validate(); + bigtableConfig.validate(); checkArgument(getTableId() != null, "Missing required tableId field."); - BigtableConfig bigtableConfig = getBigtableConfig(); if (bigtableConfig.getAppProfileId() == null || bigtableConfig.getAppProfileId().get().isEmpty()) { bigtableConfig = bigtableConfig.withAppProfileId(StaticValueProvider.of("default"));