Skip to content

Commit

Permalink
Remove worker events (created, evicted, destroyed) and calculate the …
Browse files Browse the repository at this point in the history
…worker_pool_stats from the final list of WorkerProcessMetrics. This will log all WorkerPoolStats for all worker pools (even though nothing may have happened) and let us infer what happened from the counts. Also add unknown_destroyed_count and alive_count to the WorkerPoolStats proto (the latter describes the current status of the pool).

Some additional points:
1. Worker events are only used to generate the `worker_pool_stats` in the BEP.
2. Since we already keep killed worker metrics (`WorkerProcessMetrics`) around during the build, we don't need this mechanism in (1) to tell us what happened to each worker and just infer directly from the worker metrics.
3. This allows us to have a consistency between the `worker_pool_stats` and worker_metrics in the BEP in terms of the counts. See (4) and (5).
4. Counting worker events can be inaccurate, a worker process can be forcefully killed (`WorkerLifecycleManager#killLargeWorkers`) in one build, and only counted in the next build when the spawn runner realizes that the worker is invalid because the process has already terminated, only then posting a `WorkerDestroyedEvent`.
5. If no workers are killed or created during a build, then the `worker_pool_stats` is empty, which is likely confusing. It would be clearer log all worker pools (even though nothing happened) and let the counts in the proto explicitly tell us what has happened.

RELNOTES: Log all WorkerPoolStats for all worker pools (even though workers aren't created or destroyed). Also add unknown_destroyed_count and alive_count to the WorkerPoolStats proto.
PiperOrigin-RevId: 588787331
Change-Id: I70eec4a7accbeb2dc129af4e3290ae714ff63655
  • Loading branch information
zhengwei143 authored and copybara-github committed Dec 7, 2023
1 parent d4781f3 commit 564a399
Show file tree
Hide file tree
Showing 22 changed files with 311 additions and 479 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,9 @@ message BuildMetrics {
string mnemonic = 2;
// Number of workers created during a build.
int64 created_count = 3;
// Number of workers destroyed during a build.
// Number of workers destroyed during a build (sum of all workers
// destroyed by eviction, UserExecException, IoException,
// InterruptedException and unknown reasons below).
int64 destroyed_count = 4;
// Number of workers evicted during a build.
int64 evicted_count = 5;
Expand All @@ -1230,6 +1232,10 @@ message BuildMetrics {
int64 io_exception_destroyed_count = 7;
// Number of workers destroyed due to InterruptedExceptions.
int64 interrupted_exception_destroyed_count = 8;
// Number of workers destroyed due to an unknown reason.
int64 unknown_destroyed_count = 9;
// Number of workers alive at the end of the build.
int64 alive_count = 10;
}
}

Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/google/devtools/build/lib/metrics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/profiler:network_metrics_collector",
"//src/main/java/com/google/devtools/build/lib/skyframe:execution_finished_event",
"//src/main/java/com/google/devtools/build/lib/skyframe:top_level_status_events",
"//src/main/java/com/google/devtools/build/lib/worker:worker_events",
"//src/main/java/com/google/devtools/build/lib/worker:worker_process_metrics",
"//src/main/java/com/google/devtools/build/lib/worker:worker_process_status",
"//src/main/java/com/google/devtools/build/skyframe:skyframe_graph_stats_event",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.
package com.google.devtools.build.lib.metrics;

import static com.google.common.collect.ImmutableList.toImmutableList;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
Expand All @@ -38,6 +41,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.PackageMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.TargetMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.TimingMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerMetrics;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildMetrics.WorkerPoolMetrics;
import com.google.devtools.build.lib.buildtool.BuildPrecompleteEvent;
import com.google.devtools.build.lib.buildtool.buildevent.ExecutionPhaseCompleteEvent;
Expand All @@ -57,12 +61,9 @@
import com.google.devtools.build.lib.runtime.SpawnStats;
import com.google.devtools.build.lib.skyframe.ExecutionFinishedEvent;
import com.google.devtools.build.lib.skyframe.TopLevelStatusEvents.TopLevelTargetPendingExecutionEvent;
import com.google.devtools.build.lib.worker.WorkerCreatedEvent;
import com.google.devtools.build.lib.worker.WorkerDestroyedEvent;
import com.google.devtools.build.lib.worker.WorkerEvictedEvent;
import com.google.devtools.build.lib.worker.WorkerProcessMetrics;
import com.google.devtools.build.lib.worker.WorkerProcessMetricsCollector;
import com.google.devtools.build.lib.worker.WorkerProcessStatus;
import com.google.devtools.build.lib.worker.WorkerProcessStatus.Status;
import com.google.devtools.build.skyframe.SkyframeGraphStatsEvent;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.util.Durations;
Expand All @@ -83,8 +84,6 @@ class MetricsCollector {
private final boolean recordMetricsForAllMnemonics;
// For ActionSummary.
private final ConcurrentHashMap<String, ActionStats> actionStatsMap = new ConcurrentHashMap<>();
// Mapping from worker pool hash, to statistics which we collect during a build.
private final HashMap<Integer, WorkerPoolStats> workerPoolStats = new HashMap<>();

// For CumulativeMetrics.
private final AtomicInteger numAnalyses;
Expand Down Expand Up @@ -195,44 +194,6 @@ public void handleExecutionPhaseComplete(ExecutionPhaseCompleteEvent event) {
timingMetrics.setExecutionPhaseTimeInMs(event.getTimeInMs());
}

@Subscribe
public void onWorkerDestroyed(WorkerDestroyedEvent event) {
synchronized (this) {
WorkerPoolStats stats =
getWorkerPoolStatsOrInsert(event.getWorkerPoolHash(), event.getMnemonic());

stats.incrementDestroyedCount(event.getStatus());
}
}

@Subscribe
public void onWorkerCreated(WorkerCreatedEvent event) {
synchronized (this) {
WorkerPoolStats stats =
getWorkerPoolStatsOrInsert(event.getWorkerPoolHash(), event.getMnemonic());

stats.incrementCreatedCount();
}
}

@Subscribe
public void onWorkerEvicted(WorkerEvictedEvent event) {
synchronized (this) {
WorkerPoolStats stats =
getWorkerPoolStatsOrInsert(event.getWorkerPoolHash(), event.getMnemonic());

stats.incrementEvictedCount();
}
}

private WorkerPoolStats getWorkerPoolStatsOrInsert(int workerPoolHash, String mnemonic) {
WorkerPoolStats stats =
workerPoolStats.computeIfAbsent(
workerPoolHash, (Integer k) -> new WorkerPoolStats(mnemonic));

return stats;
}

@SuppressWarnings("unused")
@Subscribe
@AllowConcurrentEvents
Expand Down Expand Up @@ -300,6 +261,17 @@ private void logActionCacheStatistics(ActionCacheStatistics stats) {
}

private BuildMetrics createBuildMetrics() {
ImmutableList<WorkerProcessMetrics> workerProcessMetrics =
WorkerProcessMetricsCollector.instance().collectMetrics();
// Restrict the number of WorkerMetrics that we report based on a predefined prioritization so
// that we don't spam the BEP in the event that something like a kill-create cycle happens.
ImmutableList<WorkerMetrics> workerMetrics =
WorkerProcessMetricsCollector.limitWorkerMetricsToPublish(
workerProcessMetrics.stream()
.map(WorkerProcessMetrics::toProto)
.collect(toImmutableList()),
WorkerProcessMetricsCollector.MAX_PUBLISHED_WORKER_METRICS);

BuildMetrics.Builder buildMetrics =
BuildMetrics.newBuilder()
.setActionSummary(finishActionSummary())
Expand All @@ -310,9 +282,8 @@ private BuildMetrics createBuildMetrics() {
.setCumulativeMetrics(createCumulativeMetrics())
.setArtifactMetrics(artifactMetrics.build())
.setBuildGraphMetrics(buildGraphMetrics.build())
.addAllWorkerMetrics(
WorkerProcessMetricsCollector.instance().getPublishedWorkerMetrics())
.setWorkerPoolMetrics(createWorkerPoolMetrics());
.addAllWorkerMetrics(workerMetrics)
.setWorkerPoolMetrics(createWorkerPoolMetrics(workerProcessMetrics));

NetworkMetrics networkMetrics = NetworkMetricsCollector.instance().collectMetrics();
if (networkMetrics != null) {
Expand Down Expand Up @@ -424,15 +395,22 @@ private TimingMetrics finishTimingMetrics() {
return timingMetrics.build();
}

private WorkerPoolMetrics createWorkerPoolMetrics() {
WorkerPoolMetrics.Builder metricsBuilder = WorkerPoolMetrics.newBuilder();

workerPoolStats.forEach(
(workerPoolHash, poolStats) ->
metricsBuilder.addWorkerPoolStats(
poolStats.toProtoBuilder().setHash(workerPoolHash).build()));

return metricsBuilder.build();
/** Creates the WorkerPoolMetrics by aggregating the collected WorkerProcessMetrics. */
static WorkerPoolMetrics createWorkerPoolMetrics(
ImmutableList<WorkerProcessMetrics> collectedWorkerProcessMetrics) {
HashMap<Integer, WorkerPoolStats> aggregatedPoolStats = new HashMap<>();
for (WorkerProcessMetrics wpm : collectedWorkerProcessMetrics) {
WorkerPoolStats poolStats =
aggregatedPoolStats.computeIfAbsent(
wpm.getWorkerKeyHash(), (hash) -> new WorkerPoolStats(wpm.getMnemonic(), hash));
poolStats.update(wpm);
}
return WorkerPoolMetrics.newBuilder()
.addAllWorkerPoolStats(
aggregatedPoolStats.values().stream()
.map(WorkerPoolStats::build)
.collect(toImmutableList()))
.build();
}

private static class WorkerPoolStats {
Expand All @@ -442,43 +420,64 @@ private static class WorkerPoolStats {
private int userExecExceptionDestroyedCount;
private int ioExceptionDestroyedCount;
private int interruptedExceptionDestroyedCount;
private int unknownDestroyedCount;
private int aliveCount;
private final String mnemonic;

WorkerPoolStats(String mnemonic) {
this.mnemonic = mnemonic;
}
private final int hash;

void incrementCreatedCount() {
createdCount++;
WorkerPoolStats(String mnemonic, int hash) {
this.mnemonic = mnemonic;
this.hash = hash;
}

void incrementDestroyedCount(WorkerProcessStatus status) {
// TODO(b/310640400): We ignore KILLED_DUE_TO_MEMORY_PRESSURE for now since that's already
// accounted for by WorkerEvictedEvent. An evicted worker is destroyed, so it doesn't make
// sense that we treat it differently from WorkerDestroyedEvent.
if (status.get() == Status.KILLED_DUE_TO_USER_EXEC_EXCEPTION) {
userExecExceptionDestroyedCount++;
} else if (status.get() == Status.KILLED_DUE_TO_IO_EXCEPTION) {
ioExceptionDestroyedCount++;
} else if (status.get() == Status.KILLED_DUE_TO_INTERRUPTED_EXCEPTION) {
interruptedExceptionDestroyedCount++;
void update(WorkerProcessMetrics wpm) {
int numWorkers = wpm.getWorkerIds().size();
if (wpm.isNewlyCreated()) {
createdCount += numWorkers;
}
WorkerProcessStatus status = wpm.getStatus();
if (status.isKilled()) {
switch (status.get()) {
// If the process is killed due to a specific reason, we attribute the cause to all
// workers of that process (plural in the case of multiplex workers).
case KILLED_UNKNOWN:
unknownDestroyedCount += numWorkers;
break;
case KILLED_DUE_TO_INTERRUPTED_EXCEPTION:
interruptedExceptionDestroyedCount += numWorkers;
break;
case KILLED_DUE_TO_IO_EXCEPTION:
ioExceptionDestroyedCount += numWorkers;
break;
case KILLED_DUE_TO_MEMORY_PRESSURE:
evictedCount += numWorkers;
break;
case KILLED_DUE_TO_USER_EXEC_EXCEPTION:
userExecExceptionDestroyedCount += numWorkers;
break;
default:
break;
}
destroyedCount += numWorkers;
} else {
aliveCount += numWorkers;
}
destroyedCount++;
}

void incrementEvictedCount() {
evictedCount++;
}

public WorkerPoolMetrics.WorkerPoolStats.Builder toProtoBuilder() {
public WorkerPoolMetrics.WorkerPoolStats build() {
return WorkerPoolMetrics.WorkerPoolStats.newBuilder()
.setMnemonic(mnemonic)
.setHash(hash)
.setCreatedCount(createdCount)
.setDestroyedCount(destroyedCount)
.setEvictedCount(evictedCount)
.setUserExecExceptionDestroyedCount(userExecExceptionDestroyedCount)
.setIoExceptionDestroyedCount(ioExceptionDestroyedCount)
.setInterruptedExceptionDestroyedCount(interruptedExceptionDestroyedCount);
.setInterruptedExceptionDestroyedCount(interruptedExceptionDestroyedCount)
.setUnknownDestroyedCount(unknownDestroyedCount)
.setAliveCount(aliveCount)
.build();
}
}

Expand Down
18 changes: 0 additions & 18 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ java_library(
deps = [
":error_message",
":worker",
":worker_events",
":worker_files_hash",
":worker_key",
":worker_options",
Expand Down Expand Up @@ -183,7 +182,6 @@ java_library(
":multiplex_worker",
":singleplex_worker",
":worker",
":worker_events",
":worker_key",
":worker_process_status",
"//src/main/java/com/google/devtools/build/lib/events",
Expand All @@ -197,20 +195,6 @@ java_library(
],
)

java_library(
name = "worker_events",
srcs = [
"WorkerCreatedEvent.java",
"WorkerDestroyedEvent.java",
"WorkerEvictedEvent.java",
],
deps = [
":worker_process_status",
"//src/main/java/com/google/devtools/build/lib/events",
"//third_party:guava",
],
)

java_library(
name = "worker_pool",
srcs = [
Expand All @@ -220,7 +204,6 @@ java_library(
":worker",
":worker_key",
"//third_party:apache_commons_pool2",
"//third_party:guava",
],
)

Expand All @@ -232,7 +215,6 @@ java_library(
],
deps = [
":worker",
":worker_events",
":worker_factory",
":worker_key",
":worker_pool",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.google.devtools.build.lib.worker;

import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -43,8 +42,6 @@ final class SimpleWorkerPool extends GenericKeyedObjectPool<WorkerKey, Worker> {
*/
private Map<WorkerKey, Integer> shrunkBy = new HashMap<>();

private EventBus eventBus;

public SimpleWorkerPool(WorkerFactory factory, int max) {
super(factory, makeConfig(max));
}
Expand Down Expand Up @@ -82,10 +79,6 @@ static SimpleWorkerPoolConfig makeConfig(int max) {
return config;
}

void setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
}

@Override
public Worker borrowObject(WorkerKey key) throws IOException, InterruptedException {
try {
Expand All @@ -102,10 +95,6 @@ public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedExcept
boolean wasPendingEviction = obj.getStatus().isPendingEviction();
super.invalidateObject(key, obj);
if (wasPendingEviction && obj.getStatus().isKilled()) {
if (eventBus != null) {
eventBus.post(
new WorkerEvictedEvent(obj.getWorkerId(), key.hashCode(), key.getMnemonic()));
}
updateShrunkBy(key, obj.getWorkerId());
}
} catch (Throwable t) {
Expand All @@ -119,9 +108,6 @@ public void returnObject(WorkerKey key, Worker obj) {
boolean wasPendingEviction = obj.getStatus().isPendingEviction();
super.returnObject(key, obj);
if (wasPendingEviction && obj.getStatus().isKilled()) {
if (eventBus != null) {
eventBus.post(new WorkerEvictedEvent(obj.getWorkerId(), key.hashCode(), key.getMnemonic()));
}
updateShrunkBy(key, obj.getWorkerId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public void finishExecution(Path execRoot, SandboxOutputs outputs) throws IOExce
status.maybeUpdateStatus(WorkerProcessStatus.Status.ALIVE);
WorkerProcessMetricsCollector.instance().onWorkerFinishExecution(getProcessId());
}
;

/**
* Destroys this worker. Once this has been called, we assume it's safe to clean up related
Expand Down
Loading

0 comments on commit 564a399

Please sign in to comment.