Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a utility for starting up the Transform Service from Java #26834

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/extensions/python/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":runners:core-construction-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:transform-service:launcher", configuration: "shadow")
Abacn marked this conversation as resolved.
Show resolved Hide resolved
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation project(":runners:core-construction-java").sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -45,8 +46,10 @@
import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
Expand Down Expand Up @@ -447,30 +450,63 @@ public OutputT expand(InputT input) {
15000);
return apply(input, expansionService, payload);
} else {
OutputT output = null;
int port = PythonService.findAvailablePort();
ImmutableList.Builder<String> args = ImmutableList.builder();
args.add("--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
PipelineOptionsFactory.register(PythonExternalTransformOptions.class);
if (input
.getPipeline()
.getOptions()
.as(PythonExternalTransformOptions.class)
.getUseTransformService()) {
// A unique project name ensures that this expansion gets a dedicated instance of the
// transform service.
String projectName = UUID.randomUUID().toString();
TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
// TODO: add support for installing extra packages.
Abacn marked this conversation as resolved.
Show resolved Hide resolved
if (!extraPackages.isEmpty()) {
throw new RuntimeException(
"Transform Service does not support installing extra packages yet");
}
try {
// Starting the transform service.
service.start();
// Waiting the service to be ready.
service.waitTillUp(15000);
// Expanding the transform.
output = apply(input, String.format("localhost:%s", port), payload);
} finally {
// Shutting down the transform service.
service.shutdown();
}
return output;
} else {

ImmutableList.Builder<String> args = ImmutableList.builder();
args.add(
"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
}
} catch (RuntimeException exn) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.python;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/** Pipeline options for {@link PythonExternalTransform}. */
public interface PythonExternalTransformOptions extends PipelineOptions {

@Description("Use Docker Compose based Beam Transform Service to expand transforms.")
@Default.Boolean(false)
boolean getUseTransformService();

void setUseTransformService(boolean useTransformService);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.python;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** A registrar for {@link PythonExternalTransformOptions}. */
@AutoService(PipelineOptionsRegistrar.class)
@Internal
public class PythonExternalTransformOptionsRegistrar implements PipelineOptionsRegistrar {

@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>builder()
.add(PythonExternalTransformOptions.class)
.build();
}
}
12 changes: 5 additions & 7 deletions sdks/java/transform-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@
* limitations under the License.
*/

plugins {
id 'org.apache.beam.module'
id 'com.github.johnrengelman.shadow'
}

apply plugin: 'org.apache.beam.module'

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.transform.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)


description = "Apache Beam :: SDKs :: Java :: Transform Service"
ext.summary = """Contains code that can be used to run an transform service."""

ext.summary = """Contains core code of the transform service."""

// Exclude tests that need a runner
test {
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/transform-service/docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
BEAM_VERSION=$BEAM_VERSION
CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME
GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json
COMPOSE_PROJECT_NAME=apache.beam.transform.service
TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
restart: on-failure
command: -port 5001
ports:
- "5001:5001"
- "${TRANSFORM_SERVICE_PORT}:5001"
expansion-service-1:
image: "apache/beam_java_expansion_service:${BEAM_VERSION}"
restart: on-failure
Expand Down
61 changes: 61 additions & 0 deletions sdks/java/transform-service/launcher/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher"

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.transform.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)


description = "Apache Beam :: SDKs :: Java :: Transform Service :: Launcher"
ext.summary = """Contains code that can be used to run an transform service."""


// Exclude tests that need a runner
test {
systemProperty "beamUseDummyRunner", "true"
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.NeedsRunner"
}
}

dependencies {
shadow library.java.vendored_guava_26_0_jre
shadow library.java.slf4j_api
runtimeOnly library.java.slf4j_jdk14
}

sourceSets {
main {
resources {
srcDirs "../docker-compose"
}
output.resourcesDir = "$buildDir/resources/docker-compose/$name"
}
}

jar {
manifest {
attributes 'Main-Class': application.mainClass
}
}
Loading