Skip to content

Commit

Permalink
sonar, missing one test
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Sep 30, 2024
1 parent fba3416 commit 788a3f8
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.com.google.common.collect.Iterables;
import java.io.File;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -103,7 +102,6 @@
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.TupleTag;
Expand All @@ -122,6 +120,7 @@ public class ExternalStateExpander {
static final String EXPANDER_FLUSH_STATE_NAME = "_expanderFlush";
static final String EXPANDER_TIMER_SPEC = "expanderTimerSpec";
static final String EXPANDER_TIMER_NAME = "_expanderTimer";
static final String DELEGATE_FIELD_NAME = "delegate";

static final TupleTag<StateValue> STATE_TUPLE_TAG = new StateTupleTag() {};

Expand Down Expand Up @@ -179,29 +178,18 @@ private static void validatePipeline(Pipeline pipeline) {
// check that all nodes have unique names
Set<String> names = new HashSet<>();
pipeline.traverseTopologically(
new PipelineVisitor() {
@Override
public void enterPipeline(Pipeline p) {}
new PipelineVisitor.Defaults() {

@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
Preconditions.checkState(names.add(node.getFullName()));
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
public void leaveCompositeTransform(TransformHierarchy.Node node) {}

@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
Preconditions.checkState(names.add(node.getFullName()));
}

@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {}

@Override
public void leavePipeline(Pipeline pipeline) {}
});
}

Expand All @@ -215,12 +203,13 @@ private static PTransformOverride statefulParMultiDoOverride(
parMultiDoReplacementFactory(inputs, stateWriteInstant, nextFlushInstantFn));
}

private static PTransformOverrideFactory parMultiDoReplacementFactory(
@SuppressWarnings({"unchecked", "rawtypes"})
private static PTransformOverrideFactory<?, ?, ?> parMultiDoReplacementFactory(
PCollection<KV<String, StateValue>> inputs,
Instant stateWriteInstant,
UnaryFunction<Instant, Instant> nextFlushInstantFn) {

return new PTransformOverrideFactory() {
return new PTransformOverrideFactory<>() {
@Override
public PTransformReplacement getReplacementTransform(AppliedPTransform transform) {
return replaceParMultiDo(transform, inputs, stateWriteInstant, nextFlushInstantFn);
Expand Down Expand Up @@ -257,7 +246,6 @@ private static PTransformReplacement<PInput, POutput> replaceParMultiDo(
return PTransformReplacement.of(
pMainInput,
transformedParDo(
Objects.requireNonNull(transformName),
transformInputs,
(DoFn) doFn,
mainOutputTag,
Expand All @@ -272,7 +260,6 @@ private static PTransformReplacement<PInput, POutput> replaceParMultiDo(
@SuppressWarnings("unchecked")
private static <K, V, InputT extends KV<K, V>, OutputT>
PTransform<PCollection<InputT>, PCollectionTuple> transformedParDo(
String transformName,
PCollection<StateValue> transformInputs,
DoFn<KV<K, V>, OutputT> doFn,
TupleTag<OutputT> mainOutputTag,
Expand Down Expand Up @@ -389,8 +376,8 @@ DoFn<InputT, OutputT> transformedDoFn(
buddy
.subclass(doFnGeneric)
.name(className)
.defineField("delegate", doFnClass, Visibility.PRIVATE);
builder = addStateAndTimers(doFnClass, inputType, inputCoder, builder);
.defineField(DELEGATE_FIELD_NAME, doFnClass, Visibility.PRIVATE);
builder = addStateAndTimers(doFnClass, inputType, builder);
builder =
builder
.defineConstructor(Visibility.PUBLIC)
Expand All @@ -401,7 +388,7 @@ DoFn<InputT, OutputT> transformedDoFn(
inputCoder,
MethodCall.invoke(
ExceptionUtils.uncheckedFactory(() -> DoFn.class.getConstructor()))
.andThen(FieldAccessor.ofField("delegate").setsArgumentAt(0))));
.andThen(FieldAccessor.ofField(DELEGATE_FIELD_NAME).setsArgumentAt(0))));

builder =
addProcessingMethods(
Expand All @@ -414,8 +401,6 @@ DoFn<InputT, OutputT> transformedDoFn(
nextFlushInstantFn,
builder);
Unloaded<DoFn<InputT, OutputT>> dynamicClass = builder.make();
// FIXME
ExceptionUtils.unchecked(() -> dynamicClass.saveIn(new File("/tmp/dynamic-debug")));
return ExceptionUtils.uncheckedFactory(
() ->
dynamicClass
Expand Down Expand Up @@ -641,7 +626,7 @@ Builder<DoFn<InputT, OutputT>> addProcessingMethod(
builder
.defineMethod(method.getName(), method.getReturnType(), Visibility.PUBLIC)
.withParameters(method.getGenericParameterTypes())
.intercept(MethodCall.invoke(method).onField("delegate").withAllArguments());
.intercept(MethodCall.invoke(method).onField(DELEGATE_FIELD_NAME).withAllArguments());

// retrieve parameter annotations and apply them
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
Expand All @@ -660,7 +645,6 @@ Builder<DoFn<InputT, OutputT>> addProcessingMethod(
Builder<DoFn<InputT, OutputT>> addStateAndTimers(
Class<? extends DoFn<KV<K, V>, OutputT>> doFnClass,
ParameterizedType inputType,
Coder<? extends KV<K, V>> inputCoder,
Builder<DoFn<InputT, OutputT>> builder) {

builder = cloneFields(doFnClass, StateId.class, builder);
Expand Down Expand Up @@ -909,4 +893,7 @@ static Generic bagStateFromInputType(ParameterizedType inputType) {
}

private static class StateTupleTag extends TupleTag<StateValue> {}

// do not construct
private ExternalStateExpander() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ static FlushTimerParameterExpander of(

final LinkedHashMap<TypeId, Pair<Annotation, Type>> processArgs = extractArgs(processElement);
final LinkedHashMap<TypeId, Pair<AnnotationDescription, TypeDefinition>> wrapperArgs =
createWrapperArgs(doFn, inputType, processArgs);
createWrapperArgs(doFn, inputType);
final List<java.util.function.BiFunction<Object[], KV<?, ?>, Object>> processArgsGenerators =
projectArgs(wrapperArgs, processArgs, mainTag, outputType);

Expand All @@ -67,10 +67,7 @@ public Object[] getProcessElementArgs(KV<?, ?> input, Object[] wrapperArgs) {
}

private static LinkedHashMap<TypeId, Pair<AnnotationDescription, TypeDefinition>>
createWrapperArgs(
DoFn<?, ?> doFn,
ParameterizedType inputType,
LinkedHashMap<TypeId, Pair<Annotation, Type>> processArgs) {
createWrapperArgs(DoFn<?, ?> doFn, ParameterizedType inputType) {

List<Pair<Annotation, Type>> states =
Arrays.stream(doFn.getClass().getDeclaredFields())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ static Generic getInputKvType(ParameterizedType inputType) {
Type valueType = inputType.getActualTypeArguments()[1];

// generic type: KV<K, V>
Generic kvType = Generic.Builder.parameterizedType(KV.class, keyType, valueType).build();
return kvType;
return Generic.Builder.parameterizedType(KV.class, keyType, valueType).build();
}

private static <T> OutputReceiver<T> fromMultiOutput(
Expand Down Expand Up @@ -295,25 +294,23 @@ static StateBinder createUpdaterBinder(AtomicReference<BiConsumer<Object, StateV
public <T> @Nullable BagState<T> bindBag(
String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
consumer.set(
(accessor, value) -> {
((BagState<T>) accessor)
.add(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue())));
});
(accessor, value) ->
((BagState<T>) accessor)
.add(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue()))));
return null;
}

@Override
public <T> @Nullable SetState<T> bindSet(
String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
consumer.set(
(accessor, value) -> {
((SetState<T>) accessor)
.add(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue())));
});
(accessor, value) ->
((SetState<T>) accessor)
.add(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(elemCoder, value.getValue()))));
return null;
}

Expand Down Expand Up @@ -374,12 +371,11 @@ CombiningState<InputT, AccumT, OutputT> bindCombining(
Coder<AccumT> accumCoder,
CombineFn<InputT, AccumT, OutputT> combineFn) {
consumer.set(
(accessor, value) -> {
((CombiningState<InputT, AccumT, OutputT>) accessor)
.addAccum(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue())));
});
(accessor, value) ->
((CombiningState<InputT, AccumT, OutputT>) accessor)
.addAccum(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue()))));
return null;
}

Expand All @@ -391,12 +387,11 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext(
Coder<AccumT> accumCoder,
CombineFnWithContext<InputT, AccumT, OutputT> combineFn) {
consumer.set(
(accessor, value) -> {
((CombiningState<InputT, AccumT, OutputT>) accessor)
.addAccum(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue())));
});
(accessor, value) ->
((CombiningState<InputT, AccumT, OutputT>) accessor)
.addAccum(
ExceptionUtils.uncheckedFactory(
() -> CoderUtils.decodeFromByteArray(accumCoder, value.getValue()))));
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ private static UnaryFunction<Object[], Boolean> createProcessFn(
return false;
}
Instant nextFlush = finishedState.read();
boolean shouldBuffer = nextFlush == null /* we have not finished reading state */
// FIXME: || nextFlush.isBefore(/* timestamp */)
;
boolean shouldBuffer =
nextFlush == null /* we have not finished reading state */
|| nextFlush.isBefore(ts) /* the timestamp if after next flush */;
if (shouldBuffer) {
// store to state
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import cz.o2.proxima.core.util.SerializableScopedValue;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -40,6 +41,7 @@
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
Expand All @@ -57,6 +59,7 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.jetbrains.annotations.NotNull;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -204,6 +207,41 @@ public void testSimpleExpandWithStateStore() {
assertEquals(1, states.size());
}

@Test
@Ignore
public void testStateWithElementEarly() throws CoderException {
Pipeline pipeline = createPipeline();
Instant now = new Instant(0);
PCollection<String> inputs =
pipeline.apply(
TestStream.create(StringUtf8Coder.of())
.addElements(TimestampedValue.of("1", now))
.advanceWatermarkTo(new Instant(0))
.addElements(TimestampedValue.of("3", now.plus(2)))
.advanceWatermarkToInfinity());
PCollection<KV<Integer, String>> withKeys =
inputs.apply(
WithKeys.<Integer, String>of(e -> Integer.parseInt(e) % 2)
.withKeyType(TypeDescriptors.integers()));
PCollection<Long> count = withKeys.apply("sum", ParDo.of(getSumFn()));
PAssert.that(count).containsInAnyOrder(4L);
Map<String, StateValue> states = new HashMap<>();
Pipeline expanded =
ExternalStateExpander.expand(
pipeline,
Create.empty(KvCoder.of(StringUtf8Coder.of(), StateValue.coder())),
now,
current -> current.equals(now) ? now.plus(1) : BoundedWindow.TIMESTAMP_MAX_VALUE,
collectStates(states));
expanded.run();
assertEquals(1, states.size());
assertEquals(
1L,
(long)
CoderUtils.decodeFromByteArray(
VarLongCoder.of(), Iterables.getOnlyElement(states.values()).getValue()));
}

private static PTransform<PCollection<KV<String, StateValue>>, PDone> collectStates(
Map<String, StateValue> states) {

Expand Down

0 comments on commit 788a3f8

Please sign in to comment.