Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options to control number of Storage API connections when using multiplexing #31721

Merged
merged 10 commits into from
Jul 12, 2024
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
## New Features / Improvements

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* Added options to control the number of Storage API multiplexing connections ([#31721](https://github.com/apache/beam/pull/31721))

## Breaking Changes

Expand All @@ -78,6 +79,7 @@

## Bugfixes

* Fixed a bug in BigQueryIO batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710))
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
*/
@AutoValue
abstract class AppendClientInfo {
private final Counter activeConnections =
Metrics.counter(AppendClientInfo.class, "activeConnections");
private final Counter activeStreamAppendClients =
Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients");

abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();

Expand Down Expand Up @@ -123,7 +123,7 @@ public AppendClientInfo withAppendClient(
writeStreamService.getStreamAppendClient(
streamName, getDescriptor(), useConnectionPool, missingValueInterpretation);

activeConnections.inc();
activeStreamAppendClients.inc();

return toBuilder().setStreamName(streamName).setStreamAppendClient(client).build();
}
Expand All @@ -133,7 +133,7 @@ public void close() {
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
if (client != null) {
getCloseAppendClient().accept(client);
activeConnections.dec();
activeStreamAppendClients.dec();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ public interface BigQueryOptions

void setNumStorageWriteApiStreamAppendClients(Integer value);

@Description(
"When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), "
+ "this option sets the minimum number of connections each pool creates. This is on a per worker, per region basis. "
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
+ "Note that in practice, the minimum number of connections created is the minimum of this value and "
+ "(numStorageWriteApiStreamAppendClients x num destinations).")
@Default.Integer(2)
Integer getMinConnectionPoolConnections();

void setMinConnectionPoolConnections(Integer value);
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

@Description(
"When using the STORAGE_API_AT_LEAST_ONCE write method with multiplexing (ie. useStorageApiConnectionPool=true), "
+ "this option sets the maximum number of connections each pool creates. This is on a per worker, per region basis. "
+ "This value should be greater than or equal to the total number of dynamic destinations, otherwise a "
+ "race condition occurs where append operations compete over streams.")
@Default.Integer(20)
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
Integer getMaxConnectionPoolConnections();

void setMaxConnectionPoolConnections(Integer value);

@Description("The max number of messages inflight that we expect each connection will retain.")
@Default.Long(1000)
Long getStorageWriteMaxInflightRequests();
Expand All @@ -122,6 +142,9 @@ public interface BigQueryOptions

void setStorageWriteMaxInflightBytes(Long value);

@Description(
"Enables multiplexing mode, where multiple tables can share the same connection. Only available when writing with STORAGE_API_AT_LEAST_ONCE"
+ " mode. For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management")
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
@Default.Boolean(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's time to enable this by default (though we should probably update our BOM to point to a more-recent client lib first).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to trying this, I'd probably vote to to do it separately after the release cut though to give it time to bake and so we can check on benchmarks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling this as a default may have negative effects on pipelines writing to >20 destinations (since maxConnectionsPerRegion is 20 by default).

Whereas currently the sink would create 1 connection per destination, enabling multiplexing would limit it to 20 connections (unless the user explicitly sets a higher maximum)

Boolean getUseStorageApiConnectionPool();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
Expand Down Expand Up @@ -1423,6 +1424,14 @@ public StreamAppendClient getStreamAppendClient(
bqIOMetadata.getBeamJobId() == null ? "" : bqIOMetadata.getBeamJobId(),
bqIOMetadata.getBeamWorkerId() == null ? "" : bqIOMetadata.getBeamWorkerId());

ConnectionWorkerPool.setOptions(
ConnectionWorkerPool.Settings.builder()
.setMinConnectionsPerRegion(
options.as(BigQueryOptions.class).getMinConnectionPoolConnections())
.setMaxConnectionsPerRegion(
options.as(BigQueryOptions.class).getMaxConnectionPoolConnections())
.build());

StreamWriter streamWriter =
StreamWriter.newBuilder(streamName, newWriteClient)
.setExecutorProvider(
Expand Down
Loading