Skip to content

Commit

Permalink
only record per state change processing times for streaming pipelines (
Browse files Browse the repository at this point in the history
  • Loading branch information
clmccart authored Apr 1, 2024
1 parent fecc9d9 commit db33805
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
Expand Down Expand Up @@ -252,6 +254,8 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
private final ContextActivationObserverRegistry contextActivationObserverRegistry;
private final String workItemId;

private final boolean isStreaming;

/**
* Metadata on the message whose processing is currently being managed by this tracker. If no
* message is actively being processed, activeMessageMetadata will be null.
Expand All @@ -277,6 +281,11 @@ public DataflowExecutionStateTracker(
this.otherState = otherState;
this.workItemId = workItemId;
this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault();
if (options instanceof DataflowPipelineOptions) {
this.isStreaming = ((DataflowPipelineOptions) options).isStreaming();
} else {
this.isStreaming = false;
}
}

@Override
Expand Down Expand Up @@ -318,20 +327,24 @@ public Closeable enterState(ExecutionState newState) {
newState.isProcessElementState && newState instanceof DataflowExecutionState;
if (isDataflowProcessElementState) {
DataflowExecutionState newDFState = (DataflowExecutionState) newState;
if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) {
recordActiveMessageInProcessingTimesMap();
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
newDFState.getStepName().userName(), clock.getMillis());
if (isStreaming) {
if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) {
recordActiveMessageInProcessingTimesMap();
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
newDFState.getStepName().userName(), clock.getMillis());
}
}
}
elementExecutionTracker.enter(newDFState.getStepName());
}

return () -> {
if (isDataflowProcessElementState) {
recordActiveMessageInProcessingTimesMap();
if (isStreaming) {
recordActiveMessageInProcessingTimesMap();
}
elementExecutionTracker.exit();
}
baseCloseable.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionState;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -123,9 +125,13 @@ public void testContextActivationObserverActivation() throws Exception {

@Test
public void testDataflowExecutionStateTrackerRecordsActiveMessageMetadata() throws IOException {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.fromArgs("--experiments=enable_streaming_engine")
.as(DataflowWorkerHarnessOptions.class);
options.setStreaming(true);
DataflowExecutionContext.DataflowExecutionStateTracker tracker =
new DataflowExecutionContext.DataflowExecutionStateTracker(
ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), "");
ExecutionStateSampler.instance(), null, null, options, "");
StreamingModeExecutionState state =
new StreamingModeExecutionState(
NameContextsForTests.nameContextForTest(),
Expand All @@ -151,9 +157,14 @@ public void testDataflowExecutionStateTrackerRecordsActiveMessageMetadata() thro
@Test
public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes()
throws IOException {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.fromArgs("--experiments=enable_streaming_engine")
.as(DataflowWorkerHarnessOptions.class);
options.setStreaming(true);

DataflowExecutionContext.DataflowExecutionStateTracker tracker =
new DataflowExecutionContext.DataflowExecutionStateTracker(
ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), "");
ExecutionStateSampler.instance(), null, null, options, "");

// Enter a processing state
StreamingModeExecutionState state =
Expand Down Expand Up @@ -184,4 +195,39 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes()
Assert.assertEquals(
expectedMetadata.userStepName(), tracker.getActiveMessageMetadata().get().userStepName());
}

@Test
public void testDataflowExecutionStateTrackerDoesNotRecordCompletedProcessingTimesForBatch()
throws IOException {
DataflowExecutionContext.DataflowExecutionStateTracker tracker =
new DataflowExecutionContext.DataflowExecutionStateTracker(
ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), "");

// Enter a processing state
BatchModeExecutionState state =
new BatchModeExecutionState(
NameContextsForTests.nameContextForTest(),
"testState",
null /* requestingStepName */,
null /* inputIndex */,
null /* metricsContainer */,
NoopProfileScope.NOOP);
tracker.enterState(state);
// Enter a new processing state
BatchModeExecutionState newState =
new BatchModeExecutionState(
NameContextsForTests.nameContextForTest(),
"testState2",
null /* requestingStepName */,
null /* inputIndex */,
null /* metricsContainer */,
NoopProfileScope.NOOP);
tracker.enterState(newState);

// The first completed state should be recorded and the new state should be active.
Map<String, IntSummaryStatistics> gotProcessingTimes = tracker.getProcessingTimesByStepCopy();
Assert.assertEquals(0, gotProcessingTimes.size());
Assert.assertEquals(0, gotProcessingTimes.keySet().size());
assertFalse(tracker.getActiveMessageMetadata().isPresent());
}
}

0 comments on commit db33805

Please sign in to comment.