diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index 7707ffca548b7..a0eb66bc9fa14 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -53,6 +53,7 @@ dependencies { shadow project(path: ":sdks:java:core", configuration: "shadow") shadow project(path: ":runners:core-java", configuration: "shadow") shadow project(path: ":runners:core-construction-java", configuration: "shadow") + shadow project(path: ":runners:java-fn-execution", configuration: "shadow") shadow library.java.jackson_annotations shadow library.java.findbugs_jsr305 shadow library.java.slf4j_api diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index ae135a3313812..f052e32bdac7c 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -251,6 +251,17 @@ + + org.apache.beam + beam-runners-java-fn-execution + + + org.slf4j + slf4j-jdk14 + + + + org.apache.beam beam-model-pipeline diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/EnvironmentSession.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/EnvironmentSession.java new file mode 100644 index 0000000000000..31ffa812be7f2 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/EnvironmentSession.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.execution; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; +import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; + +/** + * A handle to an {@link Environment} managed by a {@link SdkHarnessManager}. + * Closing the session indicates to the session's {@link SdkHarnessManager} + * that it can no longer use session resources such as its + * {@link ArtifactSource}. + */ +public interface EnvironmentSession extends AutoCloseable { + + /** + * Get the environment that the session uses. + */ + Environment getEnvironment(); + + /** + * Get the ArtifactSource that the session uses. + */ + ArtifactSource getArtifactSource(); + + /** + * Get an {@link SdkHarnessClient} that can communicate with an instance of the environment. + */ + SdkHarnessClient getClient(); +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/SdkHarnessManager.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/SdkHarnessManager.java new file mode 100644 index 0000000000000..f73108ac54733 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/SdkHarnessManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink.execution; + +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.artifact.ArtifactSource; + +/** + * A long-lived class to manage SDK harness container instances on behalf of + * shorter-lived Flink operators. + */ +public interface SdkHarnessManager { + /** + * Get a new {@link EnvironmentSession} to an {@link Environment} container + * instance, creating a new instance if necessary. + * + * @param environment The environment specification for the desired session. + * @param artifactSource An artifact source that can be used during creation. + */ + EnvironmentSession getSession(String jobId, Environment environment, ArtifactSource artifactSource); +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/package-info.java new file mode 100644 index 0000000000000..f740a3cb546dd --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/execution/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal metrics implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.execution; diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index b1aa9e8c0ca6d..e5e1b67c199dd 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -33,6 +33,7 @@ dependencies { compile library.java.guava shadow project(path: ":model:pipeline", configuration: "shadow") shadow project(path: ":model:fn-execution", configuration: "shadow") + shadow project(path: ":model:job-management", configuration: "shadow") shadow project(path: ":sdks:java:core", configuration: "shadow") shadow project(path: ":sdks:java:fn-execution", configuration: "shadow") shadow library.java.grpc_core diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index 7958410d516e0..11001efe33529 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -43,6 +43,11 @@ beam-model-fn-execution + + org.apache.beam + beam-model-job-management + + org.apache.beam beam-sdks-java-fn-execution diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java new file mode 100644 index 0000000000000..0562fd49c2696 --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactSource.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.fnexecution.artifact; + +import org.apache.beam.model.jobmanagement.v1.ArtifactApi; + +import java.util.stream.Stream; + +/** + * Makes artifacts available to an ArtifactRetrievalService by + * encapsulating runner-specific resources. + */ +public interface ArtifactSource { + + /** + * Get the artifact manifest available from this source. + */ + ArtifactApi.Manifest getManifest(); + + /** + * Get an artifact by its name. + */ + Stream getArtifact(String name); +}