From 083e5b132c11286f97a7e7c907bb2ab283169cb3 Mon Sep 17 00:00:00 2001 From: Ben Sidhom Date: Tue, 27 Mar 2018 15:36:51 -0700 Subject: [PATCH] Flatten artifact names by escaping full paths --- .../construction/ArtifactServiceStager.java | 30 ++++++++++++++++++- .../runners/reference/PortableRunner.java | 3 +- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java index 47e979d5654f9..58866f13e6482 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java @@ -18,7 +18,11 @@ package org.apache.beam.runners.core.construction; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.io.BaseEncoding; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -43,6 +47,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk; import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata; @@ -61,6 +67,13 @@ public class ArtifactServiceStager { // 2 MB per file-request private static final int DEFAULT_BUFFER_SIZE = 2 * 1024 * 1024; + private static final Pattern PATH_ESCAPE_PATTERN = Pattern.compile("[_\\\\/.]"); + private static final Map PATH_ESCAPE_MAP = ImmutableMap.of( + "_", Matcher.quoteReplacement("__"), + "\\", Matcher.quoteReplacement("_."), + "/", Matcher.quoteReplacement("._"), + ".", Matcher.quoteReplacement("..")); + public static ArtifactServiceStager overChannel(Channel channel) { return overChannel(channel, DEFAULT_BUFFER_SIZE); } @@ -135,7 +148,10 @@ public ArtifactMetadata get() throws Exception { // TODO: Add Retries PutArtifactResponseObserver responseObserver = new PutArtifactResponseObserver(); StreamObserver requestObserver = stub.putArtifact(responseObserver); - ArtifactMetadata metadata = ArtifactMetadata.newBuilder().setName(file.getName()).build(); + // HACK: Encode paths into a flat namespace. The SDK harness will attempt to load artifacts + // into a flat directory by name. + String artifactName = escapePath(file.getPath()); + ArtifactMetadata metadata = ArtifactMetadata.newBuilder().setName(artifactName).build(); requestObserver.onNext(PutArtifactRequest.newBuilder().setMetadata(metadata).build()); MessageDigest md5Digest = MessageDigest.getInstance("MD5"); @@ -242,4 +258,16 @@ boolean isSuccess() { abstract Map getFailures(); } + + private static String escapePath(String path) { + Matcher m = PATH_ESCAPE_PATTERN.matcher(path); + StringBuffer result = new StringBuffer(); + while (m.find()) { + String replacement = PATH_ESCAPE_MAP.get(m.group()); + checkState(replacement != null); + m.appendReplacement(result, replacement); + } + m.appendTail(result); + return result.toString(); + } } diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java index 0f5b9ffa724d8..3d56a170aacb5 100644 --- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java +++ b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java @@ -136,7 +136,6 @@ public PipelineResult run(Pipeline pipeline) { throw new RuntimeException(e); } - PrepareJobRequest prepareJobRequest = PrepareJobRequest.newBuilder() .setJobName(options.getJobName()) .setPipeline(PipelineTranslation.toProto(pipeline)) @@ -200,7 +199,7 @@ private ManagedChannelFactory getChannelFactory(PipelineOptions options) { return channelFactory; } - protected void replaceTransforms(Pipeline pipeline, boolean streaming) { + private void replaceTransforms(Pipeline pipeline, boolean streaming) { pipeline.replaceAll(getOverrides(streaming)); }