Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote: Limit max number of gRPC connections by --remote_max_connections. #14202

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ public ReferenceCounted touch(Object o) {
private final AtomicReference<String> authorityRef = new AtomicReference<>();

public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory) {
this(connectionFactory, /*maxConnections=*/ 0);
}

public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory, int maxConnections) {
this.dynamicConnectionPool =
new DynamicConnectionPool(connectionFactory, connectionFactory.maxConcurrency());
new DynamicConnectionPool(connectionFactory, connectionFactory.maxConcurrency(), maxConnections);
}

public boolean isShutdown() {
Expand All @@ -87,12 +91,12 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);

try {
connection.close();
} catch (IOException e) {
throw new AssertionError(e.getMessage(), e);
} finally {
super.onClose(status, trailers);
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
remoteOptions.remoteMaxConnections);

// Create a separate channel if --remote_executor and --remote_cache point to different
// endpoints.
Expand All @@ -390,7 +391,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
remoteOptions.remoteMaxConnections);
}

if (enableRemoteDownloader) {
Expand All @@ -411,7 +413,8 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteProxy,
authAndTlsOptions,
interceptors.build(),
maxConcurrencyPerConnection));
maxConcurrencyPerConnection),
remoteOptions.remoteMaxConnections);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class DynamicConnectionPool implements ConnectionPool {
private final ConnectionFactory connectionFactory;
private final int maxConcurrencyPerConnection;
private final int maxConnections;
private final AtomicBoolean closed = new AtomicBoolean(false);

@GuardedBy("this")
Expand All @@ -40,8 +41,14 @@ public class DynamicConnectionPool implements ConnectionPool {

public DynamicConnectionPool(
ConnectionFactory connectionFactory, int maxConcurrencyPerConnection) {
this(connectionFactory, maxConcurrencyPerConnection, /*maxConnections=*/ 0);
}

public DynamicConnectionPool(
ConnectionFactory connectionFactory, int maxConcurrencyPerConnection, int maxConnections) {
this.connectionFactory = connectionFactory;
this.maxConcurrencyPerConnection = maxConcurrencyPerConnection;
this.maxConnections = maxConnections;
this.factories = new ArrayList<>();
}

Expand All @@ -61,12 +68,19 @@ public void close() throws IOException {
}
}

@GuardedBy("this")
private SharedConnectionFactory nextFactory() {
int index = Math.abs(indexTicker % factories.size());
indexTicker += 1;
return factories.get(index);
}

/**
* Performs a simple round robin on the list of {@link SharedConnectionFactory} and return one
* having available connections at this moment.
* Performs a simple round robin on the list of {@link SharedConnectionFactory}.
*
* <p>If no factory has available connections, it will create a new {@link
* SharedConnectionFactory}.
* <p>This will try to find a factory that has available connections at this moment. If no factory
* has available connections, and the number of factories is less than {@link #maxConnections}, it
* will create a new {@link SharedConnectionFactory}.
*/
private SharedConnectionFactory nextAvailableFactory() {
if (closed.get()) {
Expand All @@ -75,19 +89,20 @@ private SharedConnectionFactory nextAvailableFactory() {

synchronized (this) {
for (int times = 0; times < factories.size(); ++times) {
int index = Math.abs(indexTicker % factories.size());
indexTicker += 1;

SharedConnectionFactory factory = factories.get(index);
SharedConnectionFactory factory = nextFactory();
if (factory.numAvailableConnections() > 0) {
return factory;
}
}

SharedConnectionFactory factory =
new SharedConnectionFactory(connectionFactory, maxConcurrencyPerConnection);
factories.add(factory);
return factory;
if (maxConnections <= 0 || factories.size() < maxConnections) {
SharedConnectionFactory factory =
new SharedConnectionFactory(connectionFactory, maxConcurrencyPerConnection);
factories.add(factory);
return factory;
} else {
return nextFactory();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public final class RemoteOptions extends OptionsBase {

@Option(
name = "remote_max_connections",
defaultValue = "100",
defaultValue = "0",
coeuvre marked this conversation as resolved.
Show resolved Hide resolved
documentationCategory = OptionDocumentationCategory.REMOTE,
effectTags = {OptionEffectTag.HOST_MACHINE_RESOURCE_OPTIMIZATIONS},
help =
"The max. number of concurrent network connections to the remote cache/executor. By "
+ "default Bazel limits the number of TCP connections to 100. Setting this flag to "
+ "0 will make Bazel choose the number of connections automatically.")
"The max. number of concurrent network connections to the remote cache/executor. By"
+ " default the value is set to 0 which will make Bazel choose the number of"
+ " connections automatically.")
public int remoteMaxConnections;

@Option(
Expand Down