Skip to content

Commit

Permalink
Adds a utility for starting up the Transform Service from Java (apach…
Browse files Browse the repository at this point in the history
…e#26834)

* Adds a utility for starting up the Transform Service from Java and adds an option for enabling it

* Address reviewer comments

* Addressing reviewer comments
  • Loading branch information
chamikaramj authored and cushon committed May 24, 2024
1 parent ebac08b commit 16de5b4
Show file tree
Hide file tree
Showing 11 changed files with 525 additions and 29 deletions.
1 change: 1 addition & 0 deletions sdks/java/extensions/python/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":runners:core-construction-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:transform-service:launcher", configuration: "shadow")
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation project(":runners:core-construction-java").sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -45,8 +46,10 @@
import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
Expand Down Expand Up @@ -447,30 +450,64 @@ public OutputT expand(InputT input) {
15000);
return apply(input, expansionService, payload);
} else {
OutputT output = null;
int port = PythonService.findAvailablePort();
ImmutableList.Builder<String> args = ImmutableList.builder();
args.add("--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
PipelineOptionsFactory.register(PythonExternalTransformOptions.class);
if (input
.getPipeline()
.getOptions()
.as(PythonExternalTransformOptions.class)
.getUseTransformService()) {
// A unique project name ensures that this expansion gets a dedicated instance of the
// transform service.
String projectName = UUID.randomUUID().toString();
TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
// TODO(https://github.com/apache/beam/issues/26833): add support for installing extra
// packages.
if (!extraPackages.isEmpty()) {
throw new RuntimeException(
"Transform Service does not support installing extra packages yet");
}
try {
// Starting the transform service.
service.start();
// Waiting the service to be ready.
service.waitTillUp(15000);
// Expanding the transform.
output = apply(input, String.format("localhost:%s", port), payload);
} finally {
// Shutting down the transform service.
service.shutdown();
}
return output;
} else {

ImmutableList.Builder<String> args = ImmutableList.builder();
args.add(
"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
}
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
}
} catch (RuntimeException exn) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.python;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/** Pipeline options for {@link PythonExternalTransform}. */
public interface PythonExternalTransformOptions extends PipelineOptions {

@Description("Use Docker Compose based Beam Transform Service to expand transforms.")
@Default.Boolean(false)
boolean getUseTransformService();

void setUseTransformService(boolean useTransformService);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.python;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/** A registrar for {@link PythonExternalTransformOptions}. */
@AutoService(PipelineOptionsRegistrar.class)
@Internal
public class PythonExternalTransformOptionsRegistrar implements PipelineOptionsRegistrar {

@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>builder()
.add(PythonExternalTransformOptions.class)
.build();
}
}
12 changes: 5 additions & 7 deletions sdks/java/transform-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@
* limitations under the License.
*/

plugins {
id 'org.apache.beam.module'
id 'com.github.johnrengelman.shadow'
}

apply plugin: 'org.apache.beam.module'

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.transform.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)


description = "Apache Beam :: SDKs :: Java :: Transform Service"
ext.summary = """Contains code that can be used to run an transform service."""

ext.summary = """Contains core code of the transform service."""

// Exclude tests that need a runner
test {
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/transform-service/docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
BEAM_VERSION=$BEAM_VERSION
CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME
GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json
COMPOSE_PROJECT_NAME=apache.beam.transform.service
TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
restart: on-failure
command: -port 5001
ports:
- "5001:5001"
- "${TRANSFORM_SERVICE_PORT}:5001"
expansion-service-1:
image: "apache/beam_java_expansion_service:${BEAM_VERSION}"
restart: on-failure
Expand Down
62 changes: 62 additions & 0 deletions sdks/java/transform-service/launcher/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher"

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.transform.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)


description = "Apache Beam :: SDKs :: Java :: Transform Service :: Launcher"
ext.summary = """Contains code that can be used to run an transform service."""


// Exclude tests that need a runner
test {
systemProperty "beamUseDummyRunner", "true"
useJUnit {
excludeCategories "org.apache.beam.sdk.testing.NeedsRunner"
}
}

dependencies {
shadow library.java.vendored_guava_26_0_jre
shadow library.java.slf4j_api
shadow library.java.args4j
runtimeOnly library.java.slf4j_jdk14
}

sourceSets {
main {
resources {
srcDirs "../docker-compose"
}
output.resourcesDir = "$buildDir/resources/docker-compose/$name"
}
}

jar {
manifest {
attributes 'Main-Class': application.mainClass
}
}
Loading

0 comments on commit 16de5b4

Please sign in to comment.