Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: Decouple clustering from time partitioning when writing #30094

Merged
merged 7 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Enrichment Transform](https://s.apache.org/enrichment-transform) along with GCP BigTable handler added to Python SDK ([#30001](https://github.com/apache/beam/pull/30001)).
* Allow writing clustered and not time partitioned BigQuery tables (Java) ([#30094](https://github.com/apache/beam/pull/30094)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.ExtractResult;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningClusteringDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
Expand Down Expand Up @@ -2746,8 +2746,7 @@ public Write<T> withJsonTimePartitioning(ValueProvider<String> partitioning) {
}

/**
* Specifies the clustering fields to use when writing to a single output table. Can only be
* used when {@link#withTimePartitioning(TimePartitioning)} is set. If {@link
* Specifies the clustering fields to use when writing to a single output table. If {@link
* #to(SerializableFunction)} or {@link #to(DynamicDestinations)} is used to write to dynamic
* tables, the fields here will be ignored; call {@link #withClustering()} instead.
*/
Expand Down Expand Up @@ -3359,9 +3358,10 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
}

// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
if (getJsonTimePartitioning() != null) {
if (getJsonTimePartitioning() != null
|| Optional.ofNullable(getClustering()).map(Clustering::getFields).isPresent()) {
dynamicDestinations =
new ConstantTimePartitioningDestinations<>(
new ConstantTimePartitioningClusteringDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getJsonTimePartitioning(),
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,13 @@ private static void tryCreateTable(
TimePartitioning timePartitioning = tableDestination.getTimePartitioning();
if (timePartitioning != null) {
table.setTimePartitioning(timePartitioning);
Clustering clustering = tableDestination.getClustering();
if (clustering != null) {
table.setClustering(clustering);
}
}

Clustering clustering = tableDestination.getClustering();
if (clustering != null) {
table.setClustering(clustering);
}

if (kmsKey != null) {
table.setEncryptionConfiguration(new EncryptionConfiguration().setKmsKeyName(kmsKey));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,37 +272,41 @@ public String toString() {
}
}

static class ConstantTimePartitioningDestinations<T>
static class ConstantTimePartitioningClusteringDestinations<T>
extends DelegatingDynamicDestinations<T, TableDestination> {

private final ValueProvider<String> jsonTimePartitioning;
private final @Nullable ValueProvider<String> jsonTimePartitioning;
private final @Nullable ValueProvider<String> jsonClustering;

ConstantTimePartitioningDestinations(
ConstantTimePartitioningClusteringDestinations(
DynamicDestinations<T, TableDestination> inner,
ValueProvider<String> jsonTimePartitioning,
ValueProvider<String> jsonClustering) {
super(inner);
Preconditions.checkArgumentNotNull(
jsonTimePartitioning, "jsonTimePartitioning provider can not be null");
if (jsonTimePartitioning.isAccessible()) {
Preconditions.checkArgumentNotNull(
jsonTimePartitioning.get(), "jsonTimePartitioning can not be null");
}

checkArgument(
(jsonTimePartitioning != null
&& jsonTimePartitioning.isAccessible()
&& jsonTimePartitioning.get() != null)
|| (jsonClustering != null
&& jsonClustering.isAccessible()
&& jsonClustering.get() != null),
"at least one of jsonTimePartitioning or jsonClustering must be non-null, accessible "
+ "and present");

this.jsonTimePartitioning = jsonTimePartitioning;
this.jsonClustering = jsonClustering;
}

@Override
public TableDestination getDestination(@Nullable ValueInSingleWindow<T> element) {
TableDestination destination = super.getDestination(element);
String partitioning = this.jsonTimePartitioning.get();
checkArgument(partitioning != null, "jsonTimePartitioning can not be null");
String partitioning =
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
String clustering = Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);

return new TableDestination(
destination.getTableSpec(),
destination.getTableDescription(),
partitioning,
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null));
destination.getTableSpec(), destination.getTableDescription(), partitioning, clustering);
}

@Override
Expand All @@ -316,10 +320,10 @@ public Coder<TableDestination> getDestinationCoder() {

@Override
public String toString() {
MoreObjects.ToStringHelper helper =
MoreObjects.toStringHelper(this)
.add("inner", inner)
.add("jsonTimePartitioning", jsonTimePartitioning);
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this).add("inner", inner);
if (jsonTimePartitioning != null) {
helper.add("jsonTimePartitioning", jsonTimePartitioning);
}
if (jsonClustering != null) {
helper.add("jsonClustering", jsonClustering);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,12 @@ private BigQueryHelpers.PendingJob startZeroLoadJob(
}
if (timePartitioning != null) {
loadConfig.setTimePartitioning(timePartitioning);
// only set clustering if timePartitioning is set
if (clustering != null) {
loadConfig.setClustering(clustering);
}
}

if (clustering != null) {
loadConfig.setClustering(clustering);
}

if (kmsKey != null) {
loadConfig.setDestinationEncryptionConfiguration(
new EncryptionConfiguration().setKmsKeyName(kmsKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ private void verifySideInputs() {
}
}

void testTimePartitioningClustering(
void testTimePartitioningAndClustering(
BigQueryIO.Write.Method insertMethod, boolean enablePartitioning, boolean enableClustering)
throws Exception {
TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", "1");
Expand Down Expand Up @@ -545,16 +545,8 @@ void testTimePartitioningClustering(
}
}

void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception {
testTimePartitioningClustering(insertMethod, true, false);
}

void testClustering(BigQueryIO.Write.Method insertMethod) throws Exception {
testTimePartitioningClustering(insertMethod, true, true);
}

@Test
public void testTimePartitioning() throws Exception {
void testTimePartitioningAndClusteringWithAllMethods(
Boolean enablePartitioning, Boolean enableClustering) throws Exception {
BigQueryIO.Write.Method method;
if (useStorageApi) {
method =
Expand All @@ -564,15 +556,27 @@ public void testTimePartitioning() throws Exception {
} else {
method = Method.FILE_LOADS;
}
testTimePartitioning(method);
testTimePartitioningAndClustering(method, enablePartitioning, enableClustering);
}

@Test
public void testClusteringStorageApi() throws Exception {
if (useStorageApi) {
testClustering(
useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API);
}
public void testTimePartitioningWithoutClustering() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(true, false);
}

@Test
public void testTimePartitioningWithClustering() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(true, true);
}

@Test
public void testClusteringWithoutPartitioning() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(false, true);
}

@Test
public void testNoClusteringNoPartitioning() throws Exception {
testTimePartitioningAndClusteringWithAllMethods(false, false);
}

@Test
Expand Down
Loading