From b1c8040b7c9815e2d47fd097fb2c23077f2456c8 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Fri, 14 Jun 2024 17:49:55 -0700 Subject: [PATCH] Use the original way of validating Bigtable Change Stream table existance to improve compatbility with testing (#31597) Change-Id: Iba1ee679fed4276a271f2fc83c7ad657a0da8174 --- .../org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 4feccd397d7e..d25ad7d4871d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; @@ -2307,11 +2308,11 @@ public ReadChangeStream withoutValidation() { @Override public void validate(PipelineOptions options) { - BigtableServiceFactory factory = new BigtableServiceFactory(); if (getBigtableConfig().getValidate()) { - try { + try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor = + BigtableChangeStreamAccessor.getOrCreate(getBigtableConfig())) { checkArgument( - factory.checkTableExists(getBigtableConfig(), options, getTableId()), + bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()), "Change Stream table %s does not exist", getTableId()); } catch (IOException e) {