diff --git a/pom.xml b/pom.xml
index 4ecfdd7c268c0..09f39859b0416 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
1.4.5
0.5.160304
20.0
- 1.0.1
+ 1.2.0
0.1.0
1.3
2.7.2
@@ -127,9 +127,9 @@
2.4
4.12
1.9.5
- 4.1.6.Final
+ 4.1.8.Final
1.5.0.Final
- 3.1.0
+ 3.2.0
v1-rev10-1.22.0
1.7.14
1.6.2
@@ -446,6 +446,12 @@
${project.version}
+
+ org.apache.beam
+ beam-sdks-java-io-hadoop-input-format
+ ${project.version}
+
+
org.apache.beam
beam-runners-core-construction-java
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 763436642f70e..c59afc5961ca8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -43,8 +43,8 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
@@ -139,12 +139,12 @@ public MapState bindMap(
}
@Override
- public AccumulatorCombiningState
+ public CombiningState
bindCombiningValue(
- StateTag super K, AccumulatorCombiningState> address,
+ StateTag super K, CombiningState> address,
Coder accumCoder,
final CombineFn combineFn) {
- return new ApexAccumulatorCombiningState<>(
+ return new ApexCombiningState<>(
namespace,
address,
accumCoder,
@@ -161,12 +161,12 @@ public WatermarkHoldState bindWatermark(
}
@Override
- public AccumulatorCombiningState
+ public CombiningState
bindKeyedCombiningValue(
- StateTag super K, AccumulatorCombiningState> address,
+ StateTag super K, CombiningState> address,
Coder accumCoder,
KeyedCombineFn super K, InputT, AccumT, OutputT> combineFn) {
- return new ApexAccumulatorCombiningState<>(
+ return new ApexCombiningState<>(
namespace,
address,
accumCoder,
@@ -174,9 +174,9 @@ public WatermarkHoldState bindWatermark(
}
@Override
- public AccumulatorCombiningState
+ public CombiningState
bindKeyedCombiningValueWithContext(
- StateTag super K, AccumulatorCombiningState> address,
+ StateTag super K, CombiningState> address,
Coder accumCoder,
KeyedCombineFnWithContext super K, InputT, AccumT, OutputT> combineFn) {
return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c));
@@ -323,14 +323,14 @@ public OutputTimeFn super W> getOutputTimeFn() {
}
- private final class ApexAccumulatorCombiningState
+ private final class ApexCombiningState
extends AbstractState
- implements AccumulatorCombiningState {
+ implements CombiningState {
private final K key;
private final KeyedCombineFn super K, InputT, AccumT, OutputT> combineFn;
- private ApexAccumulatorCombiningState(StateNamespace namespace,
- StateTag super K, AccumulatorCombiningState> address,
+ private ApexCombiningState(StateNamespace namespace,
+ StateTag super K, CombiningState> address,
Coder coder,
K key, KeyedCombineFn super K, InputT, AccumT, OutputT> combineFn) {
super(namespace, address, coder);
@@ -339,7 +339,7 @@ private ApexAccumulatorCombiningState(StateNamespace namespace,
}
@Override
- public ApexAccumulatorCombiningState readLater() {
+ public ApexCombiningState readLater() {
return this;
}
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 3e83a7fd89023..4f4ecfb3fc81c 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -35,9 +35,9 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.GroupingState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
@@ -58,7 +58,7 @@ public class ApexStateInternalsTest {
private static final StateTag