Skip to content

Commit

Permalink
Merge pull request apache#10 from apache/DSL_SQL
Browse files Browse the repository at this point in the history
rebase for dsl_sql_init
  • Loading branch information
xumingmin authored Apr 10, 2017
2 parents 38f4b61 + 870e42d commit a2014ba
Show file tree
Hide file tree
Showing 52 changed files with 1,346 additions and 1,181 deletions.
56 changes: 56 additions & 0 deletions dsls/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>beam-dsls-parent</artifactId>
<name>Apache Beam :: DSLs</name>

<modules>
<!-- <module>sql</module> -->
</modules>

<build>
<pluginManagement>
<plugins>
<!-- DSLs will generally offer test suites for runners, as sdks/java does. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>default-test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>

</project>
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
<module>sdks/java/build-tools</module>
<module>sdks</module>
<module>runners</module>
<module>dsls</module>
<module>examples</module>
<!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. -->
<module>sdks/java/javadoc</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@
* A {@link PipelineRunner} that translates the
* pipeline to an Apex DAG and executes it on an Apex cluster.
*
* <p>Currently execution is always in embedded mode,
* launch on Hadoop cluster will be added in subsequent iteration.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,56 +173,57 @@ public void shutdown(ShutdownMode arg0) throws LauncherException {
* @throws IOException when dependency information cannot be read
*/
public static List<File> getYarnDeployDependencies() throws IOException {
InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree");
BufferedReader br = new BufferedReader(new InputStreamReader(dependencyTree));
String line = null;
List<String> excludes = new ArrayList<>();
int excludeLevel = Integer.MAX_VALUE;
while ((line = br.readLine()) != null) {
for (int i = 0; i < line.length(); i++) {
char c = line.charAt(i);
if (Character.isLetter(c)) {
if (i > excludeLevel) {
excludes.add(line.substring(i));
} else {
if (line.substring(i).startsWith("org.apache.hadoop")) {
excludeLevel = i;
excludes.add(line.substring(i));
} else {
excludeLevel = Integer.MAX_VALUE;
try (InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree")) {
try (BufferedReader br = new BufferedReader(new InputStreamReader(dependencyTree))) {
String line;
List<String> excludes = new ArrayList<>();
int excludeLevel = Integer.MAX_VALUE;
while ((line = br.readLine()) != null) {
for (int i = 0; i < line.length(); i++) {
char c = line.charAt(i);
if (Character.isLetter(c)) {
if (i > excludeLevel) {
excludes.add(line.substring(i));
} else {
if (line.substring(i).startsWith("org.apache.hadoop")) {
excludeLevel = i;
excludes.add(line.substring(i));
} else {
excludeLevel = Integer.MAX_VALUE;
}
}
break;
}
}
break;
}
}
}
br.close();

Set<String> excludeJarFileNames = Sets.newHashSet();
for (String exclude : excludes) {
String[] mvnc = exclude.split(":");
String fileName = mvnc[1] + "-";
if (mvnc.length == 6) {
fileName += mvnc[4] + "-" + mvnc[3]; // with classifier
} else {
fileName += mvnc[3];
}
fileName += ".jar";
excludeJarFileNames.add(fileName);
}

ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader();
URL[] urls = ((URLClassLoader) classLoader).getURLs();
List<File> dependencyJars = new ArrayList<>();
for (int i = 0; i < urls.length; i++) {
File f = new File(urls[i].getFile());
// dependencies can also be directories in the build reactor,
// the Apex client will automatically create jar files for those.
if (f.exists() && !excludeJarFileNames.contains(f.getName())) {
dependencyJars.add(f);
Set<String> excludeJarFileNames = Sets.newHashSet();
for (String exclude : excludes) {
String[] mvnc = exclude.split(":");
String fileName = mvnc[1] + "-";
if (mvnc.length == 6) {
fileName += mvnc[4] + "-" + mvnc[3]; // with classifier
} else {
fileName += mvnc[3];
}
fileName += ".jar";
excludeJarFileNames.add(fileName);
}

ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader();
URL[] urls = ((URLClassLoader) classLoader).getURLs();
List<File> dependencyJars = new ArrayList<>();
for (int i = 0; i < urls.length; i++) {
File f = new File(urls[i].getFile());
// dependencies can also be directories in the build reactor,
// the Apex client will automatically create jar files for those.
if (f.exists() && !excludeJarFileNames.contains(f.getName())) {
dependencyJars.add(f);
}
}
return dependencyJars;
}
}
return dependencyJars;
}

/**
Expand All @@ -238,17 +239,17 @@ public static void createJar(File dir, File jarFile) throws IOException {
throw new RuntimeException("Failed to remove " + jarFile);
}
URI uri = URI.create("jar:" + jarFile.toURI());
try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) {
try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env)) {

File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
Files.createDirectory(zipfs.getPath("META-INF"));
final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME));
if (!manifestFile.exists()) {
new Manifest().write(out);
} else {
FileUtils.copyFile(manifestFile, out);
try (final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME))) {
if (!manifestFile.exists()) {
new Manifest().write(out);
} else {
FileUtils.copyFile(manifestFile, out);
}
}
out.close();

final java.nio.file.Path root = dir.toPath();
Files.walkFileTree(root, new java.nio.file.SimpleFileVisitor<Path>() {
Expand All @@ -274,9 +275,9 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
String name = relativePath + file.getFileName();
if (!JarFile.MANIFEST_NAME.equals(name)) {
final OutputStream out = Files.newOutputStream(zipfs.getPath(name));
FileUtils.copyFile(file.toFile(), out);
out.close();
try (final OutputStream out = Files.newOutputStream(zipfs.getPath(name))) {
FileUtils.copyFile(file.toFile(), out);
}
}
return super.visitFile(file, attrs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>

@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
PCollection<KV<K, V>> input = context.getInput();
ApexGroupByKeyOperator<K, V> group = new ApexGroupByKeyOperator<>(context.getPipelineOptions(),
input, context.<K>stateInternalsFactory()
input, context.getStateBackend()
);
context.addOperator(group, group.output);
context.addStream(input, group.input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,22 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
}

List<TaggedPValue> outputs = context.getOutputs();
PCollection<InputT> input = (PCollection<InputT>) context.getInput();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
FullWindowedValueCoder.of(
inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());

ApexParDoOperator<InputT, OutputT> operator =
new ApexParDoOperator<>(
ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
context.getPipelineOptions(),
doFn,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
((PCollection<InputT>) context.getInput()).getWindowingStrategy(),
input.getWindowingStrategy(),
sideInputs,
wvInputCoder,
context.<Void>stateInternalsFactory());
context.getStateBackend());

Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
for (TaggedPValue output : outputs) {
Expand Down Expand Up @@ -126,15 +125,15 @@ public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationC
context.addOperator(operator, ports);
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
addSideInputs(operator, sideInputs, context);
addSideInputs(operator.sideInput1, sideInputs, context);
}
}

static void addSideInputs(
ApexParDoOperator<?, ?> operator,
Operator.InputPort<?> sideInputPort,
List<PCollectionView<?>> sideInputs,
TranslationContext context) {
Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
Operator.InputPort<?>[] sideInputPorts = {sideInputPort};
if (sideInputs.size() > sideInputPorts.length) {
PCollection<?> unionCollection = unionSideInputs(sideInputs, context);
context.addStream(unionCollection, sideInputPorts[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand Down Expand Up @@ -89,16 +89,16 @@ public List<TaggedPValue> getInputs() {
return getCurrentTransform().getInputs();
}

public PValue getInput() {
return Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
public <InputT extends PValue> InputT getInput() {
return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue();
}

public List<TaggedPValue> getOutputs() {
return getCurrentTransform().getOutputs();
}

public PValue getOutput() {
return Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
public <OutputT extends PValue> OutputT getOutput() {
return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue();
}

private AppliedPTransform<?, ?, ?> getCurrentTransform() {
Expand Down Expand Up @@ -192,10 +192,10 @@ public void populateDAG(DAG dag) {
}

/**
* Return the {@link StateInternalsFactory} for the pipeline translation.
* Return the state backend for the pipeline translation.
* @return
*/
public <K> StateInternalsFactory<K> stateInternalsFactory() {
return new ApexStateInternals.ApexStateInternalsFactory();
public ApexStateBackend getStateBackend() {
return new ApexStateInternals.ApexStateBackend();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,35 @@

package org.apache.beam.runners.apex.translation;

import java.util.Collections;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

/**
* {@link Window} is translated to {link ApexParDoOperator} that wraps an {@link
* AssignWindowsDoFn}.
* {@link Window} is translated to {@link ApexProcessFnOperator#assignWindows}.
*/
class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
private static final long serialVersionUID = 1L;

@Override
public void translate(Window.Assign<T> transform, TranslationContext context) {
PCollection<T> output = (PCollection<T>) context.getOutput();
PCollection<T> input = (PCollection<T>) context.getInput();
@SuppressWarnings("unchecked")
WindowingStrategy<T, BoundedWindow> windowingStrategy =
(WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
PCollection<T> output = context.getOutput();
PCollection<T> input = context.getInput();

OldDoFn<T, T> fn =
(transform.getWindowFn() == null)
? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
: new AssignWindowsDoFn<>(transform.getWindowFn());
if (transform.getWindowFn() == null) {
// no work to do
context.addAlias(output, input);
} else {
@SuppressWarnings("unchecked")
WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) transform.getWindowFn();
ApexProcessFnOperator<T> operator = ApexProcessFnOperator.assignWindows(windowFn,
context.getPipelineOptions());
context.addOperator(operator, operator.outputPort);
context.addStream(context.getInput(), operator.inputPort);
}

ApexParDoOperator<T, T> operator =
new ApexParDoOperator<T, T>(
context.getPipelineOptions().as(ApexPipelineOptions.class),
fn,
new TupleTag<T>(),
TupleTagList.empty().getAll(),
windowingStrategy,
Collections.<PCollectionView<?>>emptyList(),
WindowedValue.getFullCoder(
input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
context.<Void>stateInternalsFactory());
context.addOperator(operator, operator.output);
context.addStream(context.getInput(), operator.input);
}

private static class IdentityFn<T> extends DoFn<T, T> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
}
}
Loading

0 comments on commit a2014ba

Please sign in to comment.