Skip to content

Commit

Permalink
Merge pull request apache#16 from apache/master
Browse files Browse the repository at this point in the history
sync up from master
  • Loading branch information
xumingmin authored Aug 30, 2017
2 parents 417ff43 + d64f2cc commit 4ac4b75
Show file tree
Hide file tree
Showing 556 changed files with 38,470 additions and 7,090 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ sdks/python/apache_beam/portability/api/*pb2*.*
.apt_generated/
.settings/

# Ignore Visual Studio Code files.
.vscode/

# The build process generates the dependency-reduced POM, but it shouldn't be
# committed.
dependency-reduced-pom.xml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ mavenJob('beam_PostCommit_Java_ValidatesRunner_Gearpump') {
'Run Gearpump ValidatesRunner')

// Maven goals for this job.
goals('-B -e clean verify -am -pl runners/gearpump -DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[ "--runner=TestGearpumpRunner", "--streaming=false" ]\'')
goals('-B -e clean verify -am -pl runners/gearpump -DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[ "--runner=TestGearpumpRunner"]\'')
}
34 changes: 34 additions & 0 deletions .test-infra/kubernetes/postgres/pkb-config-local.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This file is a pkb benchmark configuration file, used when running the IO ITs
# that use this data store. It allows users to run tests when they are on a
# separate network from the kubernetes cluster by reading the postgres IP
# address from the LoadBalancer service.
#
# This file defines pipeline options to pass to beam, as well as how to derive
# the values for those pipeline options from kubernetes (where appropriate.)

static_pipeline_options:
- postgresUsername: postgres
- postgresPassword: uuinkks
- postgresDatabaseName: postgres
- postgresSsl: false
dynamic_pipeline_options:
- name: postgresServerName
type: LoadBalancerIp
serviceName: postgres-for-dev
32 changes: 32 additions & 0 deletions .test-infra/kubernetes/postgres/pkb-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This file is a pkb benchmark configuration file, used when running the IO ITs
# that use this data store.
#
# This file defines pipeline options to pass to beam, as well as how to derive
# the values for those pipeline options from kubernetes (where appropriate.)

static_pipeline_options:
- postgresUsername: postgres
- postgresPassword: uuinkks
- postgresDatabaseName: postgres
- postgresSsl: false
dynamic_pipeline_options:
- name: postgresServerName
type: NodePortIp
podLabel: name=postgres
14 changes: 0 additions & 14 deletions examples/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,7 @@
</profiles>

<build>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<beamUseDummyRunner />
<beamTestPipelineOptions>
</beamTestPipelineOptions>
</systemPropertyVariables>
</configuration>
</plugin>

<!-- Coverage analysis for unit tests. -->
<plugin>
<groupId>org.jacoco</groupId>
Expand Down Expand Up @@ -518,7 +505,6 @@
</dependency>

<!-- Test dependencies -->

<!--
For testing the example itself, use the direct runner. This is separate from
the use of ValidatesRunner tests for testing a particular runner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringDelegateCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -154,11 +153,6 @@ public ReadDocuments(Iterable<URI> uris) {
this.uris = uris;
}

@Override
public Coder<?> getDefaultOutputCoder() {
return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of());
}

@Override
public PCollection<KV<URI, String>> expand(PBegin input) {
Pipeline pipeline = input.getPipeline();
Expand All @@ -179,9 +173,11 @@ public PCollection<KV<URI, String>> expand(PBegin input) {
uriString = uri.toString();
}

PCollection<KV<URI, String>> oneUriToLines = pipeline
.apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
PCollection<KV<URI, String>> oneUriToLines =
pipeline
.apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri))
.setCoder(KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()));

urisToLines = urisToLines.and(oneUriToLines);
}
Expand Down
23 changes: 12 additions & 11 deletions examples/java8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@
</dependency>
</dependencies>
</profile>

<!-- Include the Apache Gearpump (incubating) runner with -P gearpump-runner -->
<profile>
<id>gearpump-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-gearpump</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand Down Expand Up @@ -178,17 +190,6 @@
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<beamTestPipelineOptions>
</beamTestPipelineOptions>
</systemPropertyVariables>
</configuration>
</plugin>

<!-- Coverage analysis for unit tests. -->
<plugin>
<groupId>org.jacoco</groupId>
Expand Down
51 changes: 45 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@
<apex.kryo.version>2.24.0</apex.kryo.version>
<api-common.version>1.0.0-rc2</api-common.version>
<avro.version>1.8.2</avro.version>
<bigquery.version>v2-rev295-1.22.0</bigquery.version>
<bigquery.version>v2-rev355-1.22.0</bigquery.version>
<bigtable.version>0.9.7.1</bigtable.version>
<cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
<dataflow.version>v1b3-rev198-1.20.0</dataflow.version>
<dataflow.version>v1b3-rev198-1.22.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.client.version>1.4.0</datastore.client.version>
<datastore.proto.version>1.3.0</datastore.proto.version>
Expand Down Expand Up @@ -159,17 +159,25 @@
<groovy-maven-plugin.version>2.0</groovy-maven-plugin.version>
<surefire-plugin.version>2.20</surefire-plugin.version>
<failsafe-plugin.version>2.20</failsafe-plugin.version>
<maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
<maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
<maven-dependency-plugin.version>3.0.1</maven-dependency-plugin.version>
<maven-enforcer-plugin.version>3.0.0-M1</maven-enforcer-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<maven-javadoc-plugin.version>3.0.0-M1</maven-javadoc-plugin.version>
<maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
<maven-shade-plugin.version>3.0.0</maven-shade-plugin.version>
<reproducible-build-maven-plugin.version>0.3</reproducible-build-maven-plugin.version>

<compiler.error.flag>-Werror</compiler.error.flag>
<compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag>
<compiler.default.exclude>nothing</compiler.default.exclude>
<gax-grpc.version>0.20.0</gax-grpc.version>

<!-- standard binary for kubectl -->
<kubectl>kubectl</kubectl>
<!-- the standard location for kubernete's config file -->
<kubeconfig>${user.home}/.kube/config</kubeconfig>
</properties>

<packaging>pom</packaging>
Expand Down Expand Up @@ -266,6 +274,11 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>

<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
Expand Down Expand Up @@ -519,7 +532,13 @@
<version>${project.version}</version>
</dependency>

<dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-solr</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
<version>${project.version}</version>
Expand Down Expand Up @@ -574,6 +593,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-gearpump</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
Expand Down Expand Up @@ -1398,13 +1423,14 @@
here, we leave things simple here. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<additionalparam>${beam.javadoc_opts}</additionalparam>
<windowtitle>Apache Beam SDK for Java, version ${project.version} API</windowtitle>
<doctitle>Apache Beam SDK for Java, version ${project.version}</doctitle>
<use>false</use>
<quiet>true</quiet>
<notimestamp>true</notimestamp>
</configuration>
</plugin>

Expand Down Expand Up @@ -1767,6 +1793,19 @@
</execution>
</executions>
</plugin>

<plugin>
<groupId>io.github.zlika</groupId>
<artifactId>reproducible-build-maven-plugin</artifactId>
<version>${reproducible-build-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>strip-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>

Expand All @@ -1789,7 +1828,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.4.1</version>
<version>${maven-enforcer-plugin.version}</version>
<executions>
<execution>
<id>enforce</id>
Expand Down
8 changes: 0 additions & 8 deletions runners/apex/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,6 @@
<artifactId>malhar-library</artifactId>
<version>${apex.malhar.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>apex-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,8 @@ public static <ElemT, ViewT> CreateApexPCollectionView<ElemT, ViewT> of(

@Override
public PCollection<ElemT> expand(PCollection<ElemT> input) {
return PCollection.<ElemT>createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
.setCoder(input.getCoder());
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), input.getWindowingStrategy(), input.isBounded(), input.getCoder());
}

public PCollectionView<ViewT> getView() {
Expand Down Expand Up @@ -380,8 +379,9 @@ static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransfo
public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
transform) {
return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
SplittableParDo.forJavaParDo(transform.getTransform()));
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
SplittableParDo.forAppliedParDo(transform));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,12 @@ static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollec
}

if (collections.size() > 2) {
PCollection<T> intermediateCollection = intermediateCollection(collection,
collection.getCoder());
PCollection<T> intermediateCollection =
PCollection.createPrimitiveOutputInternal(
collection.getPipeline(),
collection.getWindowingStrategy(),
collection.isBounded(),
collection.getCoder());
context.addOperator(operator, operator.out, intermediateCollection);
remainingCollections.add(intermediateCollection);
} else {
Expand All @@ -135,11 +139,4 @@ static <T> void flattenCollections(List<PCollection<T>> collections, Map<PCollec
}
}

static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(),
input.getWindowingStrategy(), input.isBounded());
output.setCoder(outputCoder);
return output;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,11 @@ private static PCollection<?> unionSideInputs(
}

PCollection<Object> resultCollection =
FlattenPCollectionTranslator.intermediateCollection(
firstSideInput, firstSideInput.getCoder());
PCollection.createPrimitiveOutputInternal(
firstSideInput.getPipeline(),
firstSideInput.getWindowingStrategy(),
firstSideInput.isBounded(),
firstSideInput.getCoder());
FlattenPCollectionTranslator.flattenCollections(
sourceCollections, unionTags, resultCollection, context);
return resultCollection;
Expand Down
Loading

0 comments on commit 4ac4b75

Please sign in to comment.