Skip to content

Commit

Permalink
[Prism] Implement PrismPipelineResult (#31937)
Browse files Browse the repository at this point in the history
* Implement PrismPipelineResult

* Add isAlive checks
  • Loading branch information
damondouglas committed Jul 19, 2024
1 parent 12ad2af commit bdd5fff
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 0 deletions.
2 changes: 2 additions & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":runners:portability:java")

implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre

testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.truth
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ void stop() {
}
}

/** Reports whether the Prism executable {@link Process#isAlive()}. */
boolean isAlive() {
if (process == null) {
return false;
}
return process.isAlive();
}

/**
* Execute the {@link ProcessBuilder} that starts the Prism service. Redirects output to STDOUT.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import java.io.IOException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* The {@link PipelineResult} of executing a {@link org.apache.beam.sdk.Pipeline} using the {@link
* PrismRunner} and an internal {@link PipelineResult} delegate.
*/
class PrismPipelineResult implements PipelineResult {

static PrismPipelineResult of(PipelineResult delegate, PrismExecutor executor) {
return new PrismPipelineResult(delegate, executor::stop);
}

private final PipelineResult delegate;
private final Runnable cancel;
private @Nullable MetricResults terminalMetrics;
private @Nullable State terminalState;

/**
* Instantiate the {@link PipelineResult} from the {@param delegate} and a {@param cancel} to be
* called when stopping the underlying executable Job management service.
*/
PrismPipelineResult(PipelineResult delegate, Runnable cancel) {
this.delegate = delegate;
this.cancel = cancel;
}

/** Forwards the result of the delegate {@link PipelineResult#getState}. */
@Override
public State getState() {
if (terminalState != null) {
return terminalState;
}
return delegate.getState();
}

/**
* Forwards the result of the delegate {@link PipelineResult#cancel}. Invokes {@link
* PrismExecutor#stop()} before returning the resulting {@link
* org.apache.beam.sdk.PipelineResult.State}.
*/
@Override
public State cancel() throws IOException {
State state = delegate.cancel();
this.terminalMetrics = delegate.metrics();
this.terminalState = state;
this.cancel.run();
return state;
}

/**
* Forwards the result of the delegate {@link PipelineResult#waitUntilFinish(Duration)}. Invokes
* {@link PrismExecutor#stop()} before returning the resulting {@link
* org.apache.beam.sdk.PipelineResult.State}.
*/
@Override
public State waitUntilFinish(Duration duration) {
State state = delegate.waitUntilFinish(duration);
this.terminalMetrics = delegate.metrics();
this.terminalState = state;
this.cancel.run();
return state;
}

/**
* Forwards the result of the delegate {@link PipelineResult#waitUntilFinish}. Invokes {@link
* PrismExecutor#stop()} before returning the resulting {@link
* org.apache.beam.sdk.PipelineResult.State}.
*/
@Override
public State waitUntilFinish() {
State state = delegate.waitUntilFinish();
this.terminalMetrics = delegate.metrics();
this.terminalState = state;
this.cancel.run();
return state;
}

/** Forwards the result of the delegate {@link PipelineResult#metrics}. */
@Override
public MetricResults metrics() {
if (terminalMetrics != null) {
return terminalMetrics;
}
return delegate.metrics();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.prism;

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link PrismPipelineResult}. */
@RunWith(JUnit4.class)
public class PrismPipelineResultTest {

final PrismExecutor exec = executor();

@Before
public void setUp() throws IOException {
exec.execute();
assertThat(exec.isAlive()).isTrue();
}

@After
public void tearDown() {
assertThat(exec.isAlive()).isFalse();
}

@Test
public void givenTerminated_reportsState() {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.FAILED);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
// Assigns terminal state.
underTest.waitUntilFinish();
assertThat(underTest.getState()).isEqualTo(PipelineResult.State.FAILED);
}

@Test
public void givenNotTerminated_reportsState() {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.getState()).thenReturn(PipelineResult.State.RUNNING);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
assertThat(underTest.getState()).isEqualTo(PipelineResult.State.RUNNING);
exec.stop();
}

@Test
public void cancelStopsExecutable_reportsTerminalState() throws IOException {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.cancel()).thenReturn(PipelineResult.State.CANCELLED);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.CANCELLED);
}

@Test
public void givenTerminated_cancelIsNoop_reportsTerminalState() throws IOException {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.cancel()).thenReturn(PipelineResult.State.FAILED);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.FAILED);
}

@Test
public void givenPipelineRunWithDuration_waitUntilFinish_reportsTerminalState() {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.waitUntilFinish(Duration.millis(3000L)))
.thenReturn(PipelineResult.State.CANCELLED);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
assertThat(underTest.waitUntilFinish(Duration.millis(3000L)))
.isEqualTo(PipelineResult.State.CANCELLED);
}

@Test
public void givenTerminated_waitUntilFinishIsNoop_reportsTerminalState() {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
// Terminate Job as setup for additional call.
underTest.waitUntilFinish();
assertThat(underTest.waitUntilFinish()).isEqualTo(PipelineResult.State.DONE);
}

@Test
public void givenNotTerminated_reportsMetrics() {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.metrics()).thenReturn(mock(MetricResults.class));
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
assertThat(underTest.metrics()).isNotNull();
exec.stop();
}

@Test
public void givenTerminated_reportsTerminatedMetrics() {
PipelineResult delegate = mock(PipelineResult.class);
when(delegate.metrics()).thenReturn(mock(MetricResults.class));
when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
// Terminate Job as setup for additional call.
underTest.waitUntilFinish();
assertThat(underTest.metrics()).isNotNull();
}

private static PrismExecutor executor() {
return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()).build();
}
}

0 comments on commit bdd5fff

Please sign in to comment.