Skip to content

Commit

Permalink
Updates the Java Expansion Service container to support gRPC ALTS aut…
Browse files Browse the repository at this point in the history
…hentication (#31352)

* Updates the Java Expansion Service container to support Google ALTS authentication

* Fix spotless

* Updates the description
  • Loading branch information
chamikaramj authored May 22, 2024
1 parent 649e33c commit 33fccac
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 1 deletion.
4 changes: 4 additions & 0 deletions sdks/java/expansion-service/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
port = flag.Int("port", 0, "Port for the expansion service (required)")
dependencies_dir = flag.String("dependencies_dir", "", "A directory containing the set of jar files to load transforms from (required)")
config_file = flag.String("config_file", "", "Expansion service config YAML file. (required)")
use_alts = flag.Bool("use_alts", false, "Starts an Expansion Service with support for gRPC ALTS authentication")
)

const entrypoint = "org.apache.beam.sdk.expansion.service.ExpansionService"
Expand Down Expand Up @@ -90,6 +91,9 @@ func main() {
if *config_file != "" {
args = append(args, fmt.Sprintf("--expansionServiceConfigFile=%s", *config_file))
}
if *use_alts {
args = append(args, "--useAltsServer=true")
}

log.Printf("Executing: java %v", strings.Join(args, " "))
log.Fatalf("Java exited: %v", execx.Execute("java", args...))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.alts.AltsServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
Expand Down Expand Up @@ -823,8 +824,15 @@ public static void main(String[] args) throws Exception {
System.out.println("\nDid not find any registered transforms or SchemaTransforms.\n");
}

boolean useAlts = options.as(ExpansionServiceOptions.class).getUseAltsServer();
ServerBuilder serverBuilder =
ServerBuilder.forPort(port).addService(service).addService(new ArtifactRetrievalService());
useAlts ? AltsServerBuilder.forPort(port) : ServerBuilder.forPort(port);

if (useAlts) {
LOG.info("Running with gRPC ALTS authentication.");
}

serverBuilder.addService(service).addService(new ArtifactRetrievalService());
if (options.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) {
serverBuilder.addService(new ExternalWorkerService(options));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public interface ExpansionServiceOptions extends PipelineOptions {

void setExpansionServiceConfig(ExpansionServiceConfig configFile);

@Description("Starts an Expansion Service with support for gRPC ALTS authentication.")
@Default.Boolean(false)
boolean getUseAltsServer();

void setUseAltsServer(boolean useAltsServer);

/**
* Loads the allow list from {@link #getJavaClassLookupAllowlistFile}, defaulting to an empty
* {@link JavaClassLookupTransformProvider.AllowList}.
Expand Down
4 changes: 4 additions & 0 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ applyJavaNature(
shadowClosure: {},
)

shadowJar {
mergeServiceFiles()
}

description = "Apache Beam :: SDKs :: Java :: IO :: Expansion Service"
ext.summary = "Expansion service serving several Java IOs"

Expand Down

0 comments on commit 33fccac

Please sign in to comment.