Skip to content

Commit

Permalink
Add logging when worker pool is shrunk.
Browse files Browse the repository at this point in the history
RELNOTES: None.
PiperOrigin-RevId: 573777234
Change-Id: I33bb1e44c959caa6e035a9be3ce218ffcaf05363
  • Loading branch information
zhengwei143 authored and copybara-github committed Oct 16, 2023
1 parent ff4b1d5 commit 22ff6ae
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ java_library(
":worker_key",
":worker_pool",
"//third_party:apache_commons_pool2",
"//third_party:flogger",
"//third_party:guava",
"//third_party:jsr305",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -32,6 +33,8 @@
@ThreadSafe
final class SimpleWorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

/**
* The subtrahend for maximal toal number of object per key. Unfortunately
* GenericKeyedObjectPoolConfig doesn't support different number of objects per key, so we need to
Expand Down Expand Up @@ -101,7 +104,7 @@ public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedExcept
if (eventBus != null) {
eventBus.post(new WorkerEvictedEvent(key.hashCode(), key.getMnemonic()));
}
updateShrunkBy(key);
updateShrunkBy(key, obj.getWorkerId());
}
} catch (Throwable t) {
Throwables.propagateIfPossible(t, InterruptedException.class);
Expand All @@ -116,18 +119,21 @@ public void returnObject(WorkerKey key, Worker obj) {
if (eventBus != null) {
eventBus.post(new WorkerEvictedEvent(key.hashCode(), key.getMnemonic()));
}
updateShrunkBy(key);
updateShrunkBy(key, obj.getWorkerId());
}
}

public int getMaxTotalPerKey(WorkerKey key) {
return getMaxTotalPerKey() - shrunkBy.getOrDefault(key, 0);
}

private synchronized void updateShrunkBy(WorkerKey workerKey) {
private synchronized void updateShrunkBy(WorkerKey workerKey, int workerId) {
int currentValue = shrunkBy.getOrDefault(workerKey, 0);
if (getMaxTotalPerKey() - currentValue > 1) {
shrunkBy.put(workerKey, currentValue + 1);
int newValue = currentValue + 1;
logger.atInfo().log(
"shrinking %s (worker-%d) by %d.", workerKey.getMnemonic(), workerId, newValue);
shrunkBy.put(workerKey, newValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.eventbus.EventBus;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -32,14 +33,20 @@
/** Implementation of WorkerPool. */
@ThreadSafe
public class WorkerPoolImpl implements WorkerPool {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

/** Unless otherwise specified, the max number of workers per WorkerKey. */
private static final int DEFAULT_MAX_WORKERS = 4;

/** Unless otherwise specified, the max number of multiplex workers per WorkerKey. */
private static final int DEFAULT_MAX_MULTIPLEX_WORKERS = 8;

private final WorkerPoolConfig workerPoolConfig;

/** Map of singleplex worker pools, one per mnemonic. */
private final ImmutableMap<String, SimpleWorkerPool> workerPools;

/** Map of multiplex worker pools, one per mnemonic. */
private final ImmutableMap<String, SimpleWorkerPool> multiplexPools;

Expand Down Expand Up @@ -130,10 +137,10 @@ public void evictWithPolicy(EvictionPolicy<Worker> evictionPolicy) throws Interr
private void evictWithPolicy(EvictionPolicy<Worker> evictionPolicy, SimpleWorkerPool pool)
throws InterruptedException {
try {
pool.setEvictionPolicy(evictionPolicy);
pool.evict();
} catch (Throwable t) {
Throwables.propagateIfPossible(t, InterruptedException.class);
pool.setEvictionPolicy(evictionPolicy);
pool.evict();
} catch (Throwable t) {
Throwables.propagateIfPossible(t, InterruptedException.class);
throw new VerifyException("unexpected", t);
}
}
Expand Down Expand Up @@ -186,11 +193,17 @@ public synchronized void setDoomedWorkers(ImmutableSet<Integer> workerIds) {
@Override
public synchronized void clearDoomedWorkers() {
this.doomedWorkers = ImmutableSet.of();
for (SimpleWorkerPool pool : workerPools.values()) {
pool.clearShrunkBy();
for (Entry<String, SimpleWorkerPool> entry : workerPools.entrySet()) {
logger.atInfo().log(
"clearing shrunk by values for %s worker pool",
entry.getKey().isEmpty() ? "shared" : entry.getKey());
entry.getValue().clearShrunkBy();
}
for (SimpleWorkerPool pool : multiplexPools.values()) {
pool.clearShrunkBy();
for (Entry<String, SimpleWorkerPool> entry : multiplexPools.entrySet()) {
logger.atInfo().log(
"clearing shrunk by values for %s multiplex worker pool",
entry.getKey().isEmpty() ? "shared" : entry.getKey());
entry.getValue().clearShrunkBy();
}
}

Expand Down

0 comments on commit 22ff6ae

Please sign in to comment.