Skip to content

Commit

Permalink
Revert "Adds a utility for starting up the Transform Service from Java (
Browse files Browse the repository at this point in the history
apache#26834)"

This reverts commit e1b4ed3.
  • Loading branch information
chamikaramj committed Jun 2, 2023
1 parent e1b4ed3 commit 5a5c5d3
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 525 deletions.
1 change: 0 additions & 1 deletion sdks/java/extensions/python/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":runners:core-construction-java")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:transform-service:launcher", configuration: "shadow")
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation project(":runners:core-construction-java").sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -46,10 +45,8 @@
import org.apache.beam.sdk.schemas.utils.StaticSchemaInference;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.PythonCallableSource;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
Expand Down Expand Up @@ -450,64 +447,30 @@ public OutputT expand(InputT input) {
15000);
return apply(input, expansionService, payload);
} else {
OutputT output = null;
int port = PythonService.findAvailablePort();
PipelineOptionsFactory.register(PythonExternalTransformOptions.class);
if (input
.getPipeline()
.getOptions()
.as(PythonExternalTransformOptions.class)
.getUseTransformService()) {
// A unique project name ensures that this expansion gets a dedicated instance of the
// transform service.
String projectName = UUID.randomUUID().toString();
TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
// TODO(https://github.com/apache/beam/issues/26833): add support for installing extra
// packages.
if (!extraPackages.isEmpty()) {
throw new RuntimeException(
"Transform Service does not support installing extra packages yet");
}
try {
// Starting the transform service.
service.start();
// Waiting the service to be ready.
service.waitTillUp(15000);
// Expanding the transform.
output = apply(input, String.format("localhost:%s", port), payload);
} finally {
// Shutting down the transform service.
service.shutdown();
}
return output;
} else {

ImmutableList.Builder<String> args = ImmutableList.builder();
args.add(
"--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
ImmutableList.Builder<String> args = ImmutableList.builder();
args.add("--port=" + port, "--fully_qualified_name_glob=*", "--pickle_library=cloudpickle");
if (!extraPackages.isEmpty()) {
File requirementsFile = File.createTempFile("requirements", ".txt");
requirementsFile.deleteOnExit();
try (Writer fout =
new OutputStreamWriter(
new FileOutputStream(requirementsFile.getAbsolutePath()), Charsets.UTF_8)) {
for (String pkg : extraPackages) {
fout.write(pkg);
fout.write('\n');
}
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
args.add("--requirements_file=" + requirementsFile.getAbsolutePath());
}
PythonService service =
new PythonService(
"apache_beam.runners.portability.expansion_service_main", args.build())
.withExtraPackages(extraPackages);
try (AutoCloseable p = service.start()) {
// allow more time waiting for the port ready for transient expansion service setup.
PythonService.waitForPort("localhost", port, 60000);
return apply(input, String.format("localhost:%s", port), payload);
}
}
} catch (RuntimeException exn) {
Expand Down

This file was deleted.

This file was deleted.

12 changes: 7 additions & 5 deletions sdks/java/transform-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
plugins {
id 'org.apache.beam.module'
id 'com.github.johnrengelman.shadow'
}

apply plugin: 'org.apache.beam.module'
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.transform.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)


description = "Apache Beam :: SDKs :: Java :: Transform Service"
ext.summary = """Contains core code of the transform service."""
ext.summary = """Contains code that can be used to run an transform service."""


// Exclude tests that need a runner
test {
Expand Down
2 changes: 0 additions & 2 deletions sdks/java/transform-service/docker-compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,3 @@
BEAM_VERSION=$BEAM_VERSION
CREDENTIALS_VOLUME=$CREDENTIALS_VOLUME
GOOGLE_APPLICATION_CREDENTIALS_FILE_NAME=application_default_credentials.json
COMPOSE_PROJECT_NAME=apache.beam.transform.service
TRANSFORM_SERVICE_PORT=$TRANSFORM_SERVICE_PORT
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
restart: on-failure
command: -port 5001
ports:
- "${TRANSFORM_SERVICE_PORT}:5001"
- "5001:5001"
expansion-service-1:
image: "apache/beam_java_expansion_service:${BEAM_VERSION}"
restart: on-failure
Expand Down
62 changes: 0 additions & 62 deletions sdks/java/transform-service/launcher/build.gradle

This file was deleted.

Loading

0 comments on commit 5a5c5d3

Please sign in to comment.