diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 96ab4e70a579..deee8876af6f 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -37,6 +37,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.vendored_grpc_1_60_1
implementation library.java.vendored_guava_32_1_2_jre
+ compileOnly library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
index 620d5508f22a..fda5db923a7f 100644
--- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
@@ -45,6 +45,9 @@
abstract class PrismExecutor {
private static final Logger LOG = LoggerFactory.getLogger(PrismExecutor.class);
+ static final String IDLE_SHUTDOWN_TIMEOUT = "-idle_shutdown_timeout=%s";
+ static final String JOB_PORT_FLAG_TEMPLATE = "-job_port=%s";
+ static final String SERVE_HTTP_FLAG_TEMPLATE = "-serve_http=%s";
protected @MonotonicNonNull Process process;
protected ExecutorService executorService = Executors.newSingleThreadExecutor();
@@ -71,7 +74,7 @@ void stop() {
}
executorService.shutdown();
try {
- boolean ignored = executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS);
+ boolean ignored = executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
if (process == null) {
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
index f32e4d88f42b..f69260344d12 100644
--- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
@@ -127,7 +127,9 @@ private String resolve(Path from, Path to) throws IOException {
}
copyFn.accept(from.toUri().toURL().openStream(), to);
- ByteStreams.copy(from.toUri().toURL().openStream(), Files.newOutputStream(to));
+ try (OutputStream out = Files.newOutputStream(to)) {
+ ByteStreams.copy(from.toUri().toURL().openStream(), out);
+ }
Files.setPosixFilePermissions(to, PERMS);
return to.toString();
@@ -159,16 +161,16 @@ private static void unzip(InputStream from, Path to) {
}
private static void copy(InputStream from, Path to) {
- try {
- ByteStreams.copy(from, Files.newOutputStream(to));
+ try (OutputStream out = Files.newOutputStream(to)) {
+ ByteStreams.copy(from, out);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void download(URL from, Path to) {
- try {
- ByteStreams.copy(from.openStream(), Files.newOutputStream(to));
+ try (OutputStream out = Files.newOutputStream(to)) {
+ ByteStreams.copy(from.openStream(), out);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java
index ec0f8beb620a..6a6ca4e615d0 100644
--- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.prism;
+import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PortablePipelineOptions;
@@ -25,6 +26,9 @@
* org.apache.beam.sdk.Pipeline} on the {@link PrismRunner}.
*/
public interface PrismPipelineOptions extends PortablePipelineOptions {
+
+ String JOB_PORT_FLAG_NAME = "job_port";
+
@Description(
"Path or URL to a prism binary, or zipped binary for the current "
+ "platform (Operating System and Architecture). May also be an Apache "
@@ -41,4 +45,17 @@ public interface PrismPipelineOptions extends PortablePipelineOptions {
String getPrismVersionOverride();
void setPrismVersionOverride(String prismVersionOverride);
+
+ @Description("Enable or disable Prism Web UI")
+ @Default.Boolean(true)
+ Boolean getEnableWebUI();
+
+ void setEnableWebUI(Boolean enableWebUI);
+
+ @Description(
+ "Duration, represented as a String, that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\".")
+ @Default.String("5m")
+ String getIdleShutdownTimeout();
+
+ void setIdleShutdownTimeout(String idleShutdownTimeout);
}
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
index a551196c9b6f..7508e505725e 100644
--- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricResults;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
/**
@@ -29,14 +28,8 @@
*/
class PrismPipelineResult implements PipelineResult {
- static PrismPipelineResult of(PipelineResult delegate, PrismExecutor executor) {
- return new PrismPipelineResult(delegate, executor::stop);
- }
-
private final PipelineResult delegate;
- private final Runnable cancel;
- private @Nullable MetricResults terminalMetrics;
- private @Nullable State terminalState;
+ private final Runnable cleanup;
/**
* Instantiate the {@link PipelineResult} from the {@param delegate} and a {@param cancel} to be
@@ -44,15 +37,16 @@ static PrismPipelineResult of(PipelineResult delegate, PrismExecutor executor) {
*/
PrismPipelineResult(PipelineResult delegate, Runnable cancel) {
this.delegate = delegate;
- this.cancel = cancel;
+ this.cleanup = cancel;
+ }
+
+ Runnable getCleanup() {
+ return cleanup;
}
/** Forwards the result of the delegate {@link PipelineResult#getState}. */
@Override
public State getState() {
- if (terminalState != null) {
- return terminalState;
- }
return delegate.getState();
}
@@ -64,9 +58,7 @@ public State getState() {
@Override
public State cancel() throws IOException {
State state = delegate.cancel();
- this.terminalMetrics = delegate.metrics();
- this.terminalState = state;
- this.cancel.run();
+ this.cleanup.run();
return state;
}
@@ -78,9 +70,7 @@ public State cancel() throws IOException {
@Override
public State waitUntilFinish(Duration duration) {
State state = delegate.waitUntilFinish(duration);
- this.terminalMetrics = delegate.metrics();
- this.terminalState = state;
- this.cancel.run();
+ this.cleanup.run();
return state;
}
@@ -92,18 +82,13 @@ public State waitUntilFinish(Duration duration) {
@Override
public State waitUntilFinish() {
State state = delegate.waitUntilFinish();
- this.terminalMetrics = delegate.metrics();
- this.terminalState = state;
- this.cancel.run();
+ this.cleanup.run();
return state;
}
/** Forwards the result of the delegate {@link PipelineResult#metrics}. */
@Override
public MetricResults metrics() {
- if (terminalMetrics != null) {
- return terminalMetrics;
- }
return delegate.metrics();
}
}
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java
index 1ea4367292b0..6099db4b63ee 100644
--- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java
@@ -17,6 +17,12 @@
*/
package org.apache.beam.runners.prism;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Arrays;
import org.apache.beam.runners.portability.PortableRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
@@ -34,34 +40,38 @@
* submit to an already running Prism service, use the {@link PortableRunner} with the {@link
* PortablePipelineOptions#getJobEndpoint()} option instead. Prism is a {@link
* org.apache.beam.runners.portability.PortableRunner} maintained at sdks/go/cmd/prism.
+ * href="https://github.com/apache/beam/tree/master/sdks/go/cmd/prism">sdks/go/cmd/prism. For
+ * testing, use {@link TestPrismRunner}.
*/
-// TODO(https://github.com/apache/beam/issues/31793): add public modifier after finalizing
-// PrismRunner. Depends on: https://github.com/apache/beam/issues/31402 and
-// https://github.com/apache/beam/issues/31792.
-class PrismRunner extends PipelineRunner {
+public class PrismRunner extends PipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(PrismRunner.class);
- private static final String DEFAULT_PRISM_ENDPOINT = "localhost:8073";
-
- private final PortableRunner internal;
private final PrismPipelineOptions prismPipelineOptions;
- private PrismRunner(PortableRunner internal, PrismPipelineOptions prismPipelineOptions) {
- this.internal = internal;
+ protected PrismRunner(PrismPipelineOptions prismPipelineOptions) {
this.prismPipelineOptions = prismPipelineOptions;
}
+ PrismPipelineOptions getPrismPipelineOptions() {
+ return prismPipelineOptions;
+ }
+
/**
* Invoked from {@link Pipeline#run} where {@link PrismRunner} instantiates using {@link
* PrismPipelineOptions} configuration details.
*/
public static PrismRunner fromOptions(PipelineOptions options) {
PrismPipelineOptions prismPipelineOptions = options.as(PrismPipelineOptions.class);
+ validate(prismPipelineOptions);
assignDefaultsIfNeeded(prismPipelineOptions);
- PortableRunner internal = PortableRunner.fromOptions(options);
- return new PrismRunner(internal, prismPipelineOptions);
+ return new PrismRunner(prismPipelineOptions);
+ }
+
+ private static void validate(PrismPipelineOptions options) {
+ checkArgument(
+ Strings.isNullOrEmpty(options.getJobEndpoint()),
+ "when specifying --jobEndpoint, use --runner=PortableRunner instead");
}
@Override
@@ -72,15 +82,47 @@ public PipelineResult run(Pipeline pipeline) {
prismPipelineOptions.getDefaultEnvironmentType(),
prismPipelineOptions.getJobEndpoint());
- return internal.run(pipeline);
+ try {
+ PrismExecutor executor = startPrism();
+ PortableRunner delegate = PortableRunner.fromOptions(prismPipelineOptions);
+ return new PrismPipelineResult(delegate.run(pipeline), executor::stop);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ PrismExecutor startPrism() throws IOException {
+ PrismLocator locator = new PrismLocator(prismPipelineOptions);
+ int port = findAvailablePort();
+ String portFlag = String.format(PrismExecutor.JOB_PORT_FLAG_TEMPLATE, port);
+ String serveHttpFlag =
+ String.format(
+ PrismExecutor.SERVE_HTTP_FLAG_TEMPLATE, prismPipelineOptions.getEnableWebUI());
+ String idleShutdownTimeoutFlag =
+ String.format(
+ PrismExecutor.IDLE_SHUTDOWN_TIMEOUT, prismPipelineOptions.getIdleShutdownTimeout());
+ String endpoint = "localhost:" + port;
+ prismPipelineOptions.setJobEndpoint(endpoint);
+ String command = locator.resolve();
+ PrismExecutor executor =
+ PrismExecutor.builder()
+ .setCommand(command)
+ .setArguments(Arrays.asList(portFlag, serveHttpFlag, idleShutdownTimeoutFlag))
+ .build();
+ executor.execute();
+ checkState(executor.isAlive());
+ return executor;
}
private static void assignDefaultsIfNeeded(PrismPipelineOptions prismPipelineOptions) {
if (Strings.isNullOrEmpty(prismPipelineOptions.getDefaultEnvironmentType())) {
prismPipelineOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK);
}
- if (Strings.isNullOrEmpty(prismPipelineOptions.getJobEndpoint())) {
- prismPipelineOptions.setJobEndpoint(DEFAULT_PRISM_ENDPOINT);
+ }
+
+ private static int findAvailablePort() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
}
}
}
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java
new file mode 100644
index 000000000000..ff7b73ab6db0
--- /dev/null
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunnerRegistrar.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+
+/**
+ * Registers {@link PrismRunner} and {@link TestPrismRunner} with {@link PipelineRunnerRegistrar}.
+ */
+@AutoService(PipelineRunnerRegistrar.class)
+public class PrismRunnerRegistrar implements PipelineRunnerRegistrar {
+
+ @Override
+ public Iterable>> getPipelineRunners() {
+ return ImmutableList.of(PrismRunner.class, TestPrismRunner.class);
+ }
+}
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java
new file mode 100644
index 000000000000..1c1252bee55d
--- /dev/null
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismPipelineOptions.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/** {@link org.apache.beam.sdk.options.PipelineOptions} for use with the {@link TestPrismRunner}. */
+public interface TestPrismPipelineOptions extends PrismPipelineOptions, TestPipelineOptions {}
diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java
new file mode 100644
index 000000000000..fbcb9e3d9576
--- /dev/null
+++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/TestPrismRunner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.function.Supplier;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+
+/**
+ * {@link TestPrismRunner} is the recommended {@link PipelineRunner} to use for tests that rely on
+ * sdks/go/cmd/prism. See
+ * {@link PrismRunner} for more details.
+ */
+public class TestPrismRunner extends PipelineRunner {
+
+ private final PrismRunner internal;
+ private final TestPrismPipelineOptions prismPipelineOptions;
+
+ /**
+ * Invoked from {@link Pipeline#run} where {@link TestPrismRunner} instantiates using {@link
+ * TestPrismPipelineOptions} configuration details.
+ */
+ public static TestPrismRunner fromOptions(PipelineOptions options) {
+ TestPrismPipelineOptions prismPipelineOptions = options.as(TestPrismPipelineOptions.class);
+ PrismRunner delegate = PrismRunner.fromOptions(options);
+ return new TestPrismRunner(delegate, prismPipelineOptions);
+ }
+
+ private TestPrismRunner(PrismRunner internal, TestPrismPipelineOptions options) {
+ this.internal = internal;
+ this.prismPipelineOptions = options;
+ }
+
+ TestPrismPipelineOptions getTestPrismPipelineOptions() {
+ return prismPipelineOptions;
+ }
+
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ PrismPipelineResult result = (PrismPipelineResult) internal.run(pipeline);
+ try {
+ PipelineResult.State state = getWaitUntilFinishRunnable(result).get();
+ assertThat(
+ "Pipeline did not succeed. Check Prism logs for further details.",
+ state,
+ Matchers.is(PipelineResult.State.DONE));
+ } catch (RuntimeException e) {
+ // This is a temporary workaround to close the Prism process.
+ result.getCleanup().run();
+ throw new AssertionError(e);
+ }
+ return result;
+ }
+
+ private Supplier getWaitUntilFinishRunnable(PipelineResult result) {
+ if (prismPipelineOptions.getTestTimeoutSeconds() != null) {
+ Long testTimeoutSeconds = checkStateNotNull(prismPipelineOptions.getTestTimeoutSeconds());
+ return () -> result.waitUntilFinish(Duration.standardSeconds(testTimeoutSeconds));
+ }
+ return result::waitUntilFinish;
+ }
+}
diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java
index 315e585a0c5f..eb497f0a4c43 100644
--- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java
+++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.prism;
import static com.google.common.truth.Truth.assertThat;
+import static org.apache.beam.runners.prism.PrismPipelineOptions.JOB_PORT_FLAG_NAME;
import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
import java.io.ByteArrayOutputStream;
@@ -77,7 +78,9 @@ public void executeWithFileOutputThenStop() throws IOException {
@Test
public void executeWithCustomArgumentsThenStop() throws IOException {
PrismExecutor executor =
- underTest().setArguments(Collections.singletonList("-job_port=5555")).build();
+ underTest()
+ .setArguments(Collections.singletonList("-" + JOB_PORT_FLAG_NAME + "=5555"))
+ .build();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
executor.execute(outputStream);
sleep(3000L);
@@ -86,6 +89,9 @@ public void executeWithCustomArgumentsThenStop() throws IOException {
assertThat(output).contains("INFO Serving JobManagement endpoint=localhost:5555");
}
+ @Test
+ public void executeWithPortFinderThenStop() throws IOException {}
+
private PrismExecutor.Builder underTest() {
return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest());
}
diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
deleted file mode 100644
index 2ad7e2eb3dd9..000000000000
--- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.prism;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.joda.time.Duration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link PrismPipelineResult}. */
-@RunWith(JUnit4.class)
-public class PrismPipelineResultTest {
-
- final PrismExecutor exec = executor();
-
- @Before
- public void setUp() throws IOException {
- exec.execute();
- assertThat(exec.isAlive()).isTrue();
- }
-
- @After
- public void tearDown() {
- assertThat(exec.isAlive()).isFalse();
- }
-
- @Test
- public void givenTerminated_reportsState() {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.FAILED);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- // Assigns terminal state.
- underTest.waitUntilFinish();
- assertThat(underTest.getState()).isEqualTo(PipelineResult.State.FAILED);
- }
-
- @Test
- public void givenNotTerminated_reportsState() {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.getState()).thenReturn(PipelineResult.State.RUNNING);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- assertThat(underTest.getState()).isEqualTo(PipelineResult.State.RUNNING);
- exec.stop();
- }
-
- @Test
- public void cancelStopsExecutable_reportsTerminalState() throws IOException {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.cancel()).thenReturn(PipelineResult.State.CANCELLED);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.CANCELLED);
- }
-
- @Test
- public void givenTerminated_cancelIsNoop_reportsTerminalState() throws IOException {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.cancel()).thenReturn(PipelineResult.State.FAILED);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.FAILED);
- }
-
- @Test
- public void givenPipelineRunWithDuration_waitUntilFinish_reportsTerminalState() {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.waitUntilFinish(Duration.millis(3000L)))
- .thenReturn(PipelineResult.State.CANCELLED);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- assertThat(underTest.waitUntilFinish(Duration.millis(3000L)))
- .isEqualTo(PipelineResult.State.CANCELLED);
- }
-
- @Test
- public void givenTerminated_waitUntilFinishIsNoop_reportsTerminalState() {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- // Terminate Job as setup for additional call.
- underTest.waitUntilFinish();
- assertThat(underTest.waitUntilFinish()).isEqualTo(PipelineResult.State.DONE);
- }
-
- @Test
- public void givenNotTerminated_reportsMetrics() {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.metrics()).thenReturn(mock(MetricResults.class));
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- assertThat(underTest.metrics()).isNotNull();
- exec.stop();
- }
-
- @Test
- public void givenTerminated_reportsTerminatedMetrics() {
- PipelineResult delegate = mock(PipelineResult.class);
- when(delegate.metrics()).thenReturn(mock(MetricResults.class));
- when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
- PrismPipelineResult underTest = new PrismPipelineResult(delegate, exec::stop);
- // Terminate Job as setup for additional call.
- underTest.waitUntilFinish();
- assertThat(underTest.metrics()).isNotNull();
- }
-
- private static PrismExecutor executor() {
- return PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()).build();
- }
-}
diff --git a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java
index 2cacb671be3e..e4d239275988 100644
--- a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java
+++ b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java
@@ -18,54 +18,121 @@
package org.apache.beam.runners.prism;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeTrue;
-import java.io.IOException;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.construction.Environments;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
-import org.junit.Ignore;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link PrismRunner}. */
-
-// TODO(https://github.com/apache/beam/issues/31793): Remove @Ignore after finalizing PrismRunner.
-// Depends on: https://github.com/apache/beam/issues/31402 and
-// https://github.com/apache/beam/issues/31792.
-@Ignore
@RunWith(JUnit4.class)
public class PrismRunnerTest {
+
+ @Rule public TestPipeline pipeline = TestPipeline.fromOptions(options());
+
// See build.gradle for test task configuration.
private static final String PRISM_BUILD_TARGET_PROPERTY_NAME = "prism.buildTarget";
@Test
- public void givenBoundedSource_runsUntilDone() {
- Pipeline pipeline = Pipeline.create(options());
- pipeline.apply(Create.of(1, 2, 3));
- PipelineResult.State state = pipeline.run().waitUntilFinish();
- assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ public void givenJobEndpointSet_TestPrismRunner_validateThrows() {
+ TestPrismPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestPrismPipelineOptions.class);
+ options.setRunner(TestPrismRunner.class);
+ options.setJobEndpoint("endpoint");
+ IllegalArgumentException error =
+ assertThrows(IllegalArgumentException.class, () -> TestPrismRunner.fromOptions(options));
+ assertThat(error.getMessage())
+ .isEqualTo("when specifying --jobEndpoint, use --runner=PortableRunner instead");
+ }
+
+ @Test
+ public void givenJobEndpointSet_PrismRunner_validateThrows() {
+ PrismPipelineOptions options = PipelineOptionsFactory.create().as(PrismPipelineOptions.class);
+ options.setRunner(PrismRunner.class);
+ options.setJobEndpoint("endpoint");
+ IllegalArgumentException error =
+ assertThrows(IllegalArgumentException.class, () -> TestPrismRunner.fromOptions(options));
+ assertThat(error.getMessage())
+ .isEqualTo("when specifying --jobEndpoint, use --runner=PortableRunner instead");
+ }
+
+ @Test
+ public void givenEnvironmentTypeEmpty_TestPrismRunner_defaultsToLoopback() {
+ TestPrismPipelineOptions options =
+ PipelineOptionsFactory.create().as(TestPrismPipelineOptions.class);
+ options.setRunner(TestPrismRunner.class);
+ assertThat(
+ TestPrismRunner.fromOptions(options)
+ .getTestPrismPipelineOptions()
+ .getDefaultEnvironmentType())
+ .isEqualTo(Environments.ENVIRONMENT_LOOPBACK);
}
@Test
- public void givenUnboundedSource_runsUntilCancel() throws IOException {
- Pipeline pipeline = Pipeline.create(options());
- pipeline.apply(PeriodicImpulse.create());
- PipelineResult result = pipeline.run();
- assertThat(result.getState()).isEqualTo(PipelineResult.State.RUNNING);
- PipelineResult.State state = result.cancel();
- assertThat(state).isEqualTo(PipelineResult.State.CANCELLED);
+ public void givenEnvironmentTypeEmpty_PrismRunner_defaultsToLoopback() {
+ PrismPipelineOptions options = PipelineOptionsFactory.create().as(PrismPipelineOptions.class);
+ options.setRunner(PrismRunner.class);
+ assertThat(
+ PrismRunner.fromOptions(options).getPrismPipelineOptions().getDefaultEnvironmentType())
+ .isEqualTo(Environments.ENVIRONMENT_LOOPBACK);
+ }
+
+ @Test
+ public void prismReportsPAssertFailure() {
+ PAssert.that(pipeline.apply(Create.of(1, 2, 3)))
+ // Purposely introduce a failed assertion.
+ .containsInAnyOrder(1, 2, 3, 4);
+ assertThrows(AssertionError.class, pipeline::run);
+ }
+
+ @Test
+ public void windowing() {
+ PCollection>> got =
+ pipeline
+ .apply(Create.of(1, 2, 100, 101, 102, 123))
+ .apply(WithTimestamps.of(t -> Instant.ofEpochSecond(t)))
+ .apply(WithKeys.of("k"))
+ .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
+ .apply(GroupByKey.create());
+
+ List>> want =
+ Arrays.asList(
+ KV.of("k", Arrays.asList(1, 2)),
+ KV.of("k", Arrays.asList(100, 101, 102)),
+ KV.of("k", Collections.singletonList(123)));
+
+ PAssert.that(got).containsInAnyOrder(want);
+
+ pipeline.run();
}
- private static PrismPipelineOptions options() {
- PrismPipelineOptions opts = PipelineOptionsFactory.create().as(PrismPipelineOptions.class);
+ private static TestPrismPipelineOptions options() {
+ TestPrismPipelineOptions opts =
+ PipelineOptionsFactory.create().as(TestPrismPipelineOptions.class);
- opts.setRunner(PrismRunner.class);
+ opts.setRunner(TestPrismRunner.class);
opts.setPrismLocation(getLocalPrismBuildOrIgnoreTest());
+ opts.setEnableWebUI(false);
return opts;
}