Skip to content

Commit

Permalink
Flatten artifact names by escaping full paths
Browse files Browse the repository at this point in the history
  • Loading branch information
bsidhom committed Mar 27, 2018
1 parent 34f6cc5 commit 083e5b1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -43,6 +47,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
Expand All @@ -61,6 +67,13 @@ public class ArtifactServiceStager {
// 2 MB per file-request
private static final int DEFAULT_BUFFER_SIZE = 2 * 1024 * 1024;

private static final Pattern PATH_ESCAPE_PATTERN = Pattern.compile("[_\\\\/.]");
private static final Map<String, String> PATH_ESCAPE_MAP = ImmutableMap.of(
"_", Matcher.quoteReplacement("__"),
"\\", Matcher.quoteReplacement("_."),
"/", Matcher.quoteReplacement("._"),
".", Matcher.quoteReplacement(".."));

public static ArtifactServiceStager overChannel(Channel channel) {
return overChannel(channel, DEFAULT_BUFFER_SIZE);
}
Expand Down Expand Up @@ -135,7 +148,10 @@ public ArtifactMetadata get() throws Exception {
// TODO: Add Retries
PutArtifactResponseObserver responseObserver = new PutArtifactResponseObserver();
StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver);
ArtifactMetadata metadata = ArtifactMetadata.newBuilder().setName(file.getName()).build();
// HACK: Encode paths into a flat namespace. The SDK harness will attempt to load artifacts
// into a flat directory by name.
String artifactName = escapePath(file.getPath());
ArtifactMetadata metadata = ArtifactMetadata.newBuilder().setName(artifactName).build();
requestObserver.onNext(PutArtifactRequest.newBuilder().setMetadata(metadata).build());

MessageDigest md5Digest = MessageDigest.getInstance("MD5");
Expand Down Expand Up @@ -242,4 +258,16 @@ boolean isSuccess() {

abstract Map<File, Throwable> getFailures();
}

private static String escapePath(String path) {
Matcher m = PATH_ESCAPE_PATTERN.matcher(path);
StringBuffer result = new StringBuffer();
while (m.find()) {
String replacement = PATH_ESCAPE_MAP.get(m.group());
checkState(replacement != null);
m.appendReplacement(result, replacement);
}
m.appendTail(result);
return result.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ public PipelineResult run(Pipeline pipeline) {
throw new RuntimeException(e);
}


PrepareJobRequest prepareJobRequest = PrepareJobRequest.newBuilder()
.setJobName(options.getJobName())
.setPipeline(PipelineTranslation.toProto(pipeline))
Expand Down Expand Up @@ -200,7 +199,7 @@ private ManagedChannelFactory getChannelFactory(PipelineOptions options) {
return channelFactory;
}

protected void replaceTransforms(Pipeline pipeline, boolean streaming) {
private void replaceTransforms(Pipeline pipeline, boolean streaming) {
pipeline.replaceAll(getOverrides(streaming));
}

Expand Down

0 comments on commit 083e5b1

Please sign in to comment.