Skip to content

Commit

Permalink
Merge pull request apache#8 from apache/master
Browse files Browse the repository at this point in the history
rebase
  • Loading branch information
xumingmin authored Apr 7, 2017
2 parents 8b83472 + a8edbb8 commit 38f4b61
Show file tree
Hide file tree
Showing 131 changed files with 4,985 additions and 1,442 deletions.
12 changes: 9 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,17 @@
<google-cloud-bigdataoss.version>1.4.5</google-cloud-bigdataoss.version>
<google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
<guava.version>20.0</guava.version>
<grpc.version>1.0.1</grpc.version>
<grpc.version>1.2.0</grpc.version>
<grpc-google-common-protos.version>0.1.0</grpc-google-common-protos.version>
<hamcrest.version>1.3</hamcrest.version>
<jackson.version>2.7.2</jackson.version>
<findbugs.version>3.0.1</findbugs.version>
<joda.version>2.4</joda.version>
<junit.version>4.12</junit.version>
<mockito.version>1.9.5</mockito.version>
<netty.version>4.1.6.Final</netty.version>
<netty.version>4.1.8.Final</netty.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<protobuf.version>3.1.0</protobuf.version>
<protobuf.version>3.2.0</protobuf.version>
<pubsub.version>v1-rev10-1.22.0</pubsub.version>
<slf4j.version>1.7.14</slf4j.version>
<spark.version>1.6.2</spark.version>
Expand Down Expand Up @@ -446,6 +446,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-construction-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
Expand Down Expand Up @@ -139,12 +139,12 @@ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
}

@Override
public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindCombiningValue(
StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
final CombineFn<InputT, AccumT, OutputT> combineFn) {
return new ApexAccumulatorCombiningState<>(
return new ApexCombiningState<>(
namespace,
address,
accumCoder,
Expand All @@ -161,22 +161,22 @@ public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
}

@Override
public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValue(
StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
return new ApexAccumulatorCombiningState<>(
return new ApexCombiningState<>(
namespace,
address,
accumCoder,
key, combineFn);
}

@Override
public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT>
public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
bindKeyedCombiningValueWithContext(
StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> accumCoder,
KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
Expand Down Expand Up @@ -323,14 +323,14 @@ public OutputTimeFn<? super W> getOutputTimeFn() {

}

private final class ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT>
private final class ApexCombiningState<K, InputT, AccumT, OutputT>
extends AbstractState<AccumT>
implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
implements CombiningState<InputT, AccumT, OutputT> {
private final K key;
private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;

private ApexAccumulatorCombiningState(StateNamespace namespace,
StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address,
private ApexCombiningState(StateNamespace namespace,
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
Coder<AccumT> coder,
K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
Expand All @@ -339,7 +339,7 @@ private ApexAccumulatorCombiningState(StateNamespace namespace,
}

@Override
public ApexAccumulatorCombiningState<K, InputT, AccumT, OutputT> readLater() {
public ApexCombiningState<K, InputT, AccumT, OutputT> readLater() {
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
Expand All @@ -58,7 +58,7 @@ public class ApexStateInternalsTest {

private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
StateTags.value("stringValue", StringUtf8Coder.of());
private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>>
private static final StateTag<Object, CombiningState<Integer, int[], Integer>>
SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal(
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testMergeBagIntoNewNamespace() throws Exception {

@Test
public void testCombiningValue() throws Exception {
CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);

// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR));
Expand All @@ -168,7 +168,7 @@ public void testCombiningValue() throws Exception {

@Test
public void testCombiningIsEmpty() throws Exception {
CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);

assertThat(value.isEmpty().read(), Matchers.is(true));
ReadableState<Boolean> readFuture = value.isEmpty();
Expand All @@ -181,9 +181,9 @@ public void testCombiningIsEmpty() throws Exception {

@Test
public void testMergeCombiningValueIntoSource() throws Exception {
AccumulatorCombiningState<Integer, int[], Integer> value1 =
CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
AccumulatorCombiningState<Integer, int[], Integer> value2 =
CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);

value1.add(5);
Expand All @@ -202,11 +202,11 @@ public void testMergeCombiningValueIntoSource() throws Exception {

@Test
public void testMergeCombiningValueIntoNewNamespace() throws Exception {
AccumulatorCombiningState<Integer, int[], Integer> value1 =
CombiningState<Integer, int[], Integer> value1 =
underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR);
AccumulatorCombiningState<Integer, int[], Integer> value2 =
CombiningState<Integer, int[], Integer> value2 =
underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR);
AccumulatorCombiningState<Integer, int[], Integer> value3 =
CombiningState<Integer, int[], Integer> value3 =
underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR);

value1.add(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
Expand Down Expand Up @@ -349,6 +351,14 @@ public void createViewWithViewFnDifferentViewFn() {
PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn =
new ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>>() {
@Override
public Materialization<Iterable<WindowedValue<?>>> getMaterialization() {
@SuppressWarnings({"rawtypes", "unchecked"})
Materialization<Iterable<WindowedValue<?>>> materialization =
(Materialization) Materializations.iterable();
return materialization;
}

@Override
public Iterable<Integer> apply(Iterable<WindowedValue<?>> contents) {
return Collections.emptyList();
Expand Down
Loading

0 comments on commit 38f4b61

Please sign in to comment.