From fba34163b84053130c3743f21f5857bfa6a600f9 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 30 Sep 2024 10:29:36 +0200 Subject: [PATCH] coverage --- .../beam/util/state/MethodCallUtils.java | 587 +++++++++--------- .../beam/util/state/MethodCallUtilsTest.java | 55 ++ 2 files changed, 353 insertions(+), 289 deletions(-) create mode 100644 beam/core/src/test/java/cz/o2/proxima/beam/util/state/MethodCallUtilsTest.java diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java index 373398c20..efcf595e1 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/state/MethodCallUtils.java @@ -18,6 +18,7 @@ import cz.o2.proxima.core.functional.BiConsumer; import cz.o2.proxima.core.util.ExceptionUtils; import cz.o2.proxima.core.util.Pair; +import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting; import cz.o2.proxima.internal.com.google.common.base.Preconditions; import cz.o2.proxima.internal.com.google.common.collect.Iterables; import java.lang.annotation.Annotation; @@ -237,136 +238,7 @@ static Map> getStateUpdaters(DoFn d @SuppressWarnings("unchecked") private static @Nullable BiConsumer createUpdater(StateSpec stateSpec) { AtomicReference> consumer = new AtomicReference<>(); - stateSpec.bind( - "dummy", - new StateBinder() { - @Override - public @Nullable ValueState bindValue( - String id, StateSpec> spec, Coder coder) { - consumer.set( - (accessor, value) -> - ((ValueState) accessor) - .write( - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(coder, value.getValue())))); - return null; - } - - @Override - public @Nullable BagState bindBag( - String id, StateSpec> spec, Coder elemCoder) { - consumer.set( - (accessor, value) -> { - ((BagState) accessor) - .add( - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue()))); - }); - return null; - } - - @Override - public @Nullable SetState bindSet( - String id, StateSpec> spec, Coder elemCoder) { - consumer.set( - (accessor, value) -> { - ((SetState) accessor) - .add( - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue()))); - }); - return null; - } - - @Override - public @Nullable MapState bindMap( - String id, - StateSpec> spec, - Coder mapKeyCoder, - Coder mapValueCoder) { - KvCoder coder = KvCoder.of(mapKeyCoder, mapValueCoder); - consumer.set( - (accessor, value) -> { - KV decoded = - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(coder, value.getValue())); - ((MapState) accessor).put(decoded.getKey(), decoded.getValue()); - }); - return null; - } - - @Override - public @Nullable OrderedListState bindOrderedList( - String id, StateSpec> spec, Coder elemCoder) { - KvCoder coder = KvCoder.of(elemCoder, InstantCoder.of()); - consumer.set( - (accessor, value) -> { - KV decoded = - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(coder, value.getValue())); - ((OrderedListState) accessor) - .add(TimestampedValue.of(decoded.getKey(), decoded.getValue())); - }); - return null; - } - - @Override - public @Nullable MultimapState bindMultimap( - String id, - StateSpec> spec, - Coder keyCoder, - Coder valueCoder) { - KvCoder coder = KvCoder.of(keyCoder, valueCoder); - consumer.set( - (accessor, value) -> { - KV decoded = - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(coder, value.getValue())); - ((MapState) accessor).put(decoded.getKey(), decoded.getValue()); - }); - return null; - } - - @Override - public @Nullable - CombiningState bindCombining( - String id, - StateSpec> spec, - Coder accumCoder, - CombineFn combineFn) { - consumer.set( - (accessor, value) -> { - ((CombiningState) accessor) - .addAccum( - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue()))); - }); - return null; - } - - @Override - public @Nullable - CombiningState bindCombiningWithContext( - String id, - StateSpec> spec, - Coder accumCoder, - CombineFnWithContext combineFn) { - consumer.set( - (accessor, value) -> { - ((CombiningState) accessor) - .addAccum( - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue()))); - }); - return null; - } - - @Override - public @Nullable WatermarkHoldState bindWatermark( - String id, StateSpec spec, TimestampCombiner timestampCombiner) { - return null; - } - }); + stateSpec.bind("dummy", createUpdaterBinder(consumer)); return consumer.get(); } @@ -400,168 +272,305 @@ static LinkedHashMap>> g private static @Nullable BiFunction> createReader( StateSpec stateSpec) { AtomicReference>> res = new AtomicReference<>(); - stateSpec.bind( - "dummy", - new StateBinder() { - @Override - public @Nullable ValueState bindValue( - String id, StateSpec> spec, Coder coder) { - res.set( - (accessor, key) -> { - T value = ((ValueState) accessor).read(); - if (value != null) { - byte[] bytes = + stateSpec.bind("dummy", createStateReaderBinder(res)); + return res.get(); + } + + @VisibleForTesting + static StateBinder createUpdaterBinder(AtomicReference> consumer) { + return new StateBinder() { + @Override + public @Nullable ValueState bindValue( + String id, StateSpec> spec, Coder coder) { + consumer.set( + (accessor, value) -> + ((ValueState) accessor) + .write( ExceptionUtils.uncheckedFactory( - () -> CoderUtils.encodeToByteArray(coder, value)); - return Collections.singletonList(new StateValue(key, id, bytes)); - } - return Collections.emptyList(); - }); - return null; - } - - @Override - public @Nullable BagState bindBag( - String id, StateSpec> spec, Coder elemCoder) { - res.set( - (accessor, key) -> - Iterables.transform( - ((BagState) accessor).read(), - v -> - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.encodeToByteArray(elemCoder, v))))); - return null; - } - - @Override - public @Nullable SetState bindSet( - String id, StateSpec> spec, Coder elemCoder) { - res.set( - (accessor, key) -> - Iterables.transform( - ((SetState) accessor).read(), - v -> - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.encodeToByteArray(elemCoder, v))))); - return null; - } - - @Override - public @Nullable MapState bindMap( + () -> CoderUtils.decodeFromByteArray(coder, value.getValue())))); + return null; + } + + @Override + public @Nullable BagState bindBag( + String id, StateSpec> spec, Coder elemCoder) { + consumer.set( + (accessor, value) -> { + ((BagState) accessor) + .add( + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue()))); + }); + return null; + } + + @Override + public @Nullable SetState bindSet( + String id, StateSpec> spec, Coder elemCoder) { + consumer.set( + (accessor, value) -> { + ((SetState) accessor) + .add( + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue()))); + }); + return null; + } + + @Override + public @Nullable MapState bindMap( + String id, + StateSpec> spec, + Coder mapKeyCoder, + Coder mapValueCoder) { + KvCoder coder = KvCoder.of(mapKeyCoder, mapValueCoder); + consumer.set( + (accessor, value) -> { + KV decoded = + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(coder, value.getValue())); + ((MapState) accessor).put(decoded.getKey(), decoded.getValue()); + }); + return null; + } + + @Override + public @Nullable OrderedListState bindOrderedList( + String id, StateSpec> spec, Coder elemCoder) { + KvCoder coder = KvCoder.of(elemCoder, InstantCoder.of()); + consumer.set( + (accessor, value) -> { + KV decoded = + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(coder, value.getValue())); + ((OrderedListState) accessor) + .add(TimestampedValue.of(decoded.getKey(), decoded.getValue())); + }); + return null; + } + + @Override + public @Nullable MultimapState bindMultimap( + String id, + StateSpec> spec, + Coder keyCoder, + Coder valueCoder) { + KvCoder coder = KvCoder.of(keyCoder, valueCoder); + consumer.set( + (accessor, value) -> { + KV decoded = + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(coder, value.getValue())); + ((MapState) accessor).put(decoded.getKey(), decoded.getValue()); + }); + return null; + } + + @Override + public @Nullable + CombiningState bindCombining( String id, - StateSpec> spec, - Coder mapKeyCoder, - Coder mapValueCoder) { - KvCoder coder = KvCoder.of(mapKeyCoder, mapValueCoder); - res.set( - (accessor, key) -> - Iterables.transform( - ((MapState) accessor).entries().read(), - v -> - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> - CoderUtils.encodeToByteArray( - coder, KV.of(v.getKey(), v.getValue())))))); - return null; - } - - @Override - public @Nullable OrderedListState bindOrderedList( - String id, StateSpec> spec, Coder elemCoder) { - KvCoder coder = KvCoder.of(elemCoder, InstantCoder.of()); - res.set( - (accessor, key) -> - Iterables.transform( - ((OrderedListState) accessor).read(), - v -> - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> - CoderUtils.encodeToByteArray( - coder, KV.of(v.getValue(), v.getTimestamp())))))); - return null; - } - - @Override - public @Nullable MultimapState bindMultimap( + StateSpec> spec, + Coder accumCoder, + CombineFn combineFn) { + consumer.set( + (accessor, value) -> { + ((CombiningState) accessor) + .addAccum( + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue()))); + }); + return null; + } + + @Override + public @Nullable + CombiningState bindCombiningWithContext( String id, - StateSpec> spec, - Coder keyCoder, - Coder valueCoder) { - KvCoder coder = KvCoder.of(keyCoder, valueCoder); - res.set( - (accessor, key) -> - Iterables.transform( - ((MultimapState) accessor).entries().read(), - v -> - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> - CoderUtils.encodeToByteArray( - coder, KV.of(v.getKey(), v.getValue())))))); - return null; - } - - @Override - public @Nullable - CombiningState bindCombining( - String id, - StateSpec> spec, - Coder accumCoder, - CombineFn combineFn) { - res.set( - (accessor, key) -> { - AccumT accum = ((CombiningState) accessor).getAccum(); - return Collections.singletonList( - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.encodeToByteArray(accumCoder, accum)))); - }); - return null; - } - - @Override - public @Nullable - CombiningState bindCombiningWithContext( - String id, - StateSpec> spec, - Coder accumCoder, - CombineFnWithContext combineFn) { - res.set( - (accessor, key) -> { - AccumT accum = ((CombiningState) accessor).getAccum(); - return Collections.singletonList( - new StateValue( - key, - id, - ExceptionUtils.uncheckedFactory( - () -> CoderUtils.encodeToByteArray(accumCoder, accum)))); - }); - return null; - } - - @Override - public @Nullable WatermarkHoldState bindWatermark( - String id, StateSpec spec, TimestampCombiner timestampCombiner) { - return null; - } - }); - return res.get(); + StateSpec> spec, + Coder accumCoder, + CombineFnWithContext combineFn) { + consumer.set( + (accessor, value) -> { + ((CombiningState) accessor) + .addAccum( + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue()))); + }); + return null; + } + + @Override + public @Nullable WatermarkHoldState bindWatermark( + String id, StateSpec spec, TimestampCombiner timestampCombiner) { + return null; + } + }; + } + + @VisibleForTesting + static StateBinder createStateReaderBinder( + AtomicReference>> res) { + + return new StateBinder() { + @Override + public @Nullable ValueState bindValue( + String id, StateSpec> spec, Coder coder) { + res.set( + (accessor, key) -> { + T value = ((ValueState) accessor).read(); + if (value != null) { + byte[] bytes = + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.encodeToByteArray(coder, value)); + return Collections.singletonList(new StateValue(key, id, bytes)); + } + return Collections.emptyList(); + }); + return null; + } + + @Override + public @Nullable BagState bindBag( + String id, StateSpec> spec, Coder elemCoder) { + res.set( + (accessor, key) -> + Iterables.transform( + ((BagState) accessor).read(), + v -> + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.encodeToByteArray(elemCoder, v))))); + return null; + } + + @Override + public @Nullable SetState bindSet( + String id, StateSpec> spec, Coder elemCoder) { + res.set( + (accessor, key) -> + Iterables.transform( + ((SetState) accessor).read(), + v -> + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.encodeToByteArray(elemCoder, v))))); + return null; + } + + @Override + public @Nullable MapState bindMap( + String id, + StateSpec> spec, + Coder mapKeyCoder, + Coder mapValueCoder) { + KvCoder coder = KvCoder.of(mapKeyCoder, mapValueCoder); + res.set( + (accessor, key) -> + Iterables.transform( + ((MapState) accessor).entries().read(), + v -> + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> + CoderUtils.encodeToByteArray( + coder, KV.of(v.getKey(), v.getValue())))))); + return null; + } + + @Override + public @Nullable OrderedListState bindOrderedList( + String id, StateSpec> spec, Coder elemCoder) { + KvCoder coder = KvCoder.of(elemCoder, InstantCoder.of()); + res.set( + (accessor, key) -> + Iterables.transform( + ((OrderedListState) accessor).read(), + v -> + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> + CoderUtils.encodeToByteArray( + coder, KV.of(v.getValue(), v.getTimestamp())))))); + return null; + } + + @Override + public @Nullable MultimapState bindMultimap( + String id, + StateSpec> spec, + Coder keyCoder, + Coder valueCoder) { + KvCoder coder = KvCoder.of(keyCoder, valueCoder); + res.set( + (accessor, key) -> + Iterables.transform( + ((MultimapState) accessor).entries().read(), + v -> + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> + CoderUtils.encodeToByteArray( + coder, KV.of(v.getKey(), v.getValue())))))); + return null; + } + + @Override + public @Nullable + CombiningState bindCombining( + String id, + StateSpec> spec, + Coder accumCoder, + CombineFn combineFn) { + res.set( + (accessor, key) -> { + AccumT accum = ((CombiningState) accessor).getAccum(); + return Collections.singletonList( + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.encodeToByteArray(accumCoder, accum)))); + }); + return null; + } + + @Override + public @Nullable + CombiningState bindCombiningWithContext( + String id, + StateSpec> spec, + Coder accumCoder, + CombineFnWithContext combineFn) { + res.set( + (accessor, key) -> { + AccumT accum = ((CombiningState) accessor).getAccum(); + return Collections.singletonList( + new StateValue( + key, + id, + ExceptionUtils.uncheckedFactory( + () -> CoderUtils.encodeToByteArray(accumCoder, accum)))); + }); + return null; + } + + @Override + public @Nullable WatermarkHoldState bindWatermark( + String id, StateSpec spec, TimestampCombiner timestampCombiner) { + return null; + } + }; } private MethodCallUtils() {} diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/util/state/MethodCallUtilsTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/util/state/MethodCallUtilsTest.java new file mode 100644 index 000000000..9b62ab39b --- /dev/null +++ b/beam/core/src/test/java/cz/o2/proxima/beam/util/state/MethodCallUtilsTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017-2024 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.util.state; + +import cz.o2.proxima.core.functional.BiConsumer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.state.StateBinder; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.transforms.Sum; +import org.junit.Test; + +public class MethodCallUtilsTest { + + @Test + public void testBinders() { + AtomicReference>> tmp = new AtomicReference<>(); + AtomicReference> tmp2 = new AtomicReference<>(); + testBinder(MethodCallUtils.createStateReaderBinder(tmp)); + testBinder(MethodCallUtils.createUpdaterBinder(tmp2)); + } + + private void testBinder(StateBinder binder) { + List> specs = + Arrays.asList( + StateSpecs.bag(), + StateSpecs.value(), + StateSpecs.map(), + StateSpecs.multimap(), + StateSpecs.combining(Sum.ofIntegers()), + StateSpecs.orderedList(VarIntCoder.of())); + specs.forEach(s -> testBinder(s, binder)); + } + + private void testBinder(StateSpec s, StateBinder binder) { + s.bind("dummy", binder); + } +}