Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable validation of cloud bigtable change stream IO #31376

Merged
merged 4 commits into from
May 28, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
Expand Down Expand Up @@ -2057,6 +2056,7 @@ static ReadChangeStream create() {
return new AutoValue_BigtableIO_ReadChangeStream.Builder()
.setBigtableConfig(config)
.setMetadataTableBigtableConfig(metadataTableconfig)
.setValidateConfig(true)
.build();
}

Expand All @@ -2080,6 +2080,8 @@ static ReadChangeStream create() {

abstract @Nullable Duration getBacklogReplicationAdjustment();

abstract @Nullable Boolean getValidateConfig();

abstract ReadChangeStream.Builder toBuilder();

/**
Expand Down Expand Up @@ -2284,25 +2286,77 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}

/**
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
* Disables validation that the table being read and the metadata table exists, and that the app
* profile used is single cluster and single row transcation enabled. Set this option if the
* caller does not have additional Bigtable permissions to validate the configurations.
*/
public ReadChangeStream withoutValidation() {
BigtableConfig config = getBigtableConfig();
BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
return toBuilder()
.setBigtableConfig(config.withValidate(false))
.setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false))
.setValidateConfig(false)
.build();
}

@Override
public void validate(PipelineOptions options) {
BigtableServiceFactory factory = new BigtableServiceFactory();
if (getBigtableConfig().getValidate()) {
try {
checkArgument(
factory.checkTableExists(getBigtableConfig(), options, getTableId()),
"Change Stream table %s does not exist",
getTableId());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

// Validate the app profile is single cluster and allows single row transactions.
private void validateAppProfile(
MetadataTableAdminDao metadataTableAdminDao, String appProfileId) {
checkArgument(metadataTableAdminDao != null);
checkArgument(
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId),
"App profile id '"
+ appProfileId
+ "' provided to access metadata table needs to use single-cluster routing policy"
+ " and allow single-row transactions.");
}

// Update metadata table schema if allowed and required.
private void createOrUpdateMetadataTable(
MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) {
boolean shouldCreateOrUpdateMetadataTable = true;
if (getCreateOrUpdateMetadataTable() != null) {
shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
}
// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableId);
}
}

@Override
public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
checkArgument(
getBigtableConfig() != null,
"BigtableIO ReadChangeStream is missing required configurations fields.");
checkArgument(
getBigtableConfig().getProjectId() != null, "Missing required projectId field.");
checkArgument(
getBigtableConfig().getInstanceId() != null, "Missing required instanceId field.");
getBigtableConfig().validate();
checkArgument(getTableId() != null, "Missing required tableId field.");

BigtableConfig bigtableConfig = getBigtableConfig();
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
if (getBigtableConfig().getAppProfileId() == null
|| getBigtableConfig().getAppProfileId().get().isEmpty()) {
if (bigtableConfig.getAppProfileId() == null
|| bigtableConfig.getAppProfileId().get().isEmpty()) {
bigtableConfig = bigtableConfig.withAppProfileId(StaticValueProvider.of("default"));
}

BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
String metadataTableId = getMetadataTableId();
if (metadataTableConfig.getProjectId() == null
|| metadataTableConfig.getProjectId().get().isEmpty()) {
metadataTableConfig = metadataTableConfig.withProjectId(bigtableConfig.getProjectId());
Expand All @@ -2311,6 +2365,7 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
|| metadataTableConfig.getInstanceId().get().isEmpty()) {
metadataTableConfig = metadataTableConfig.withInstanceId(bigtableConfig.getInstanceId());
}
String metadataTableId = getMetadataTableId();
if (metadataTableId == null || metadataTableId.isEmpty()) {
metadataTableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
}
Expand All @@ -2333,10 +2388,6 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS;
}

boolean shouldCreateOrUpdateMetadataTable = true;
if (getCreateOrUpdateMetadataTable() != null) {
shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
}
Duration backlogReplicationAdjustment = getBacklogReplicationAdjustment();
if (backlogReplicationAdjustment == null) {
backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT;
Expand All @@ -2348,31 +2399,25 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);

// Validate the configuration is correct before creating the pipeline, if required.
try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();
checkArgument(metadataTableAdminDao != null);
checkArgument(
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(
metadataTableConfig.getAppProfileId().get()),
"App profile id '"
+ metadataTableConfig.getAppProfileId().get()
+ "' provided to access metadata table needs to use single-cluster routing policy"
+ " and allow single-row transactions.");

// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
boolean validateConfig = true;
if (getValidateConfig() != null) {
validateConfig = getValidateConfig();
}
checkArgument(
metadataTableAdminDao.doesMetadataTableExist(),
"Metadata table does not exist: " + metadataTableAdminDao.getTableId());

try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor =
BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) {
// Validate app profile and create metadata table if validate is required.
if (validateConfig) {
createOrUpdateMetadataTable(metadataTableAdminDao, metadataTableId);
validateAppProfile(metadataTableAdminDao, metadataTableConfig.getAppProfileId().get());
}
// Validate metadata table if validate is required. We validate metadata table after
// createOrUpdateMetadataTable because if the metadata doesn't exist, we have to run
// createOrUpdateMetadataTable to create the metadata table.
if (metadataTableConfig.getValidate()) {
checkArgument(
bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()),
"Change Stream table does not exist");
metadataTableAdminDao.doesMetadataTableExist(),
"Metadata table does not exist: " + metadataTableAdminDao.getTableId());
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -2429,6 +2474,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(

abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);

abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);

abstract ReadChangeStream build();
}
}
Expand Down
Loading