Skip to content

Commit

Permalink
[FLINK-36788] Fix GlobalCommitter expansion
Browse files Browse the repository at this point in the history
So far, the global committer didn't properly get all properties set during expansion. In some cases, it didn't get expanded at all.

This commit also inline SinkTransformationTranslatorITCaseBase.java after the respective V1 implementation was removed. V2 now has the same coverage as V1 used to have.
  • Loading branch information
AHeise committed Dec 2, 2024
1 parent 00e7ad6 commit 3082baa
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME;

Expand Down Expand Up @@ -74,11 +77,10 @@ private Collection<Integer> translateInternal(
boolean commitOnInput = batch || !checkpointingEnabled || hasUpstreamCommitter(inputStream);

// Create a global shuffle and add the global committer with parallelism 1.
DataStream<CommittableMessage<CommT>> global = inputStream.global();
final PhysicalTransformation<Void> transformation =
(PhysicalTransformation<Void>)
inputStream
.global()
.transform(
global.transform(
GLOBAL_COMMITTER_TRANSFORMATION_NAME,
Types.VOID,
new GlobalCommitterOperator<>(
Expand All @@ -87,10 +89,20 @@ private Collection<Integer> translateInternal(
commitOnInput))
.getTransformation();
transformation.setChainingStrategy(ChainingStrategy.ALWAYS);
transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
return Collections.emptyList();
copySafely(transformation::setName, globalCommitterTransform::getName);
copySafely(transformation::setUid, globalCommitterTransform::getUid);
copySafely(transformation::setUidHash, globalCommitterTransform::getUserProvidedNodeHash);

return Arrays.asList(global.getId(), transformation.getId());
}

private static <T> void copySafely(Consumer<T> consumer, Supplier<T> provider) {
T value = provider.get();
if (value != null) {
consumer.accept(value);
}
}

/**
Expand All @@ -109,7 +121,6 @@ private static boolean hasUpstreamCommitter(DataStream<?> ds) {
StreamOperatorFactory<?> operatorFactory =
((OneInputTransformation<?, ?>) transformation).getOperatorFactory();
if (operatorFactory instanceof CommitterOperatorFactory) {
// found the committer, so the global committer should commit on input
return true;
}
if (operatorFactory instanceof SinkWriterOperatorFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A {@link org.apache.flink.streaming.api.graph.TransformationTranslator} for the {@link
* org.apache.flink.streaming.api.transformations.SinkTransformation}.
* SinkTransformation}.
*/
@Internal
public class SinkTransformationTranslator<Input, Output>
Expand Down Expand Up @@ -177,6 +179,12 @@ private void expand() {

getSinkTransformations(sizeBefore).forEach(context::transform);

repeatUntilConverged(
() ->
getSinkTransformations(sizeBefore).stream()
.flatMap(t -> context.transform(t).stream())
.collect(Collectors.toList()));

disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore));

// Remove all added sink subtransformations to avoid duplications and allow additional
Expand All @@ -188,6 +196,14 @@ private void expand() {
}
}

private <R> void repeatUntilConverged(Supplier<R> producer) {
R lastResult = producer.get();
R nextResult;
while (!lastResult.equals(nextResult = producer.get())) {
lastResult = nextResult;
}
}

private List<Transformation<?>> getSinkTransformations(int sizeBefore) {
return executionEnvironment
.getTransformations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand All @@ -35,6 +36,7 @@
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -69,8 +71,13 @@ private static Collection<Object> data() {

abstract SinkT sinkWithCommitter();

abstract SinkT sinkWithGlobalCommitter();

abstract DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, SinkT sink);

abstract DataStreamSink<Integer> sinkTo(
DataStream<Integer> stream, SinkT sink, CustomSinkOperatorUidHashes hashes);

@TestTemplate
void generateWriterTopology() {
final StreamGraph streamGraph = buildGraph(simpleSink(), runtimeExecutionMode);
Expand Down Expand Up @@ -127,6 +134,62 @@ void testParallelismConfigured() {
testParallelismConfiguredInternal(false);
}

@TestTemplate
void testSettingOperatorUidHash() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromData(1, 2);
final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
final String globalCommitterHash = "77e6aa6eeb1643b3765e1e4a7a672f37";
final CustomSinkOperatorUidHashes operatorsUidHashes =
CustomSinkOperatorUidHashes.builder()
.setWriterUidHash(writerHash)
.setCommitterUidHash(committerHash)
.setGlobalCommitterUidHash(globalCommitterHash)
.build();
sinkTo(src, sinkWithGlobalCommitter(), operatorsUidHashes).name(NAME);

final StreamGraph streamGraph = env.getStreamGraph();

assertThat(findWriter(streamGraph).getUserHash()).isEqualTo(writerHash);
assertThat(findCommitter(streamGraph).getUserHash()).isEqualTo(committerHash);
assertThat(findGlobalCommitter(streamGraph).getUserHash()).isEqualTo(globalCommitterHash);
}

/**
* When ever you need to change something in this test case please think about possible state
* upgrade problems introduced by your changes.
*/
@TestTemplate
void testSettingOperatorUids() {
final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromData(1, 2);
sinkTo(src, sinkWithGlobalCommitter()).name(NAME).uid(sinkUid);

final StreamGraph streamGraph = env.getStreamGraph();
assertThat(findWriter(streamGraph).getTransformationUID()).isEqualTo(sinkUid);
assertThat(findCommitter(streamGraph).getTransformationUID())
.isEqualTo(String.format("Sink Committer: %s", sinkUid));
assertThat(findGlobalCommitter(streamGraph).getTransformationUID())
.isEqualTo(String.format("Sink %s Global Committer", sinkUid));
}

@Test
void testSettingOperatorNames() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromData(1, 2);
sinkTo(src, sinkWithGlobalCommitter()).name(NAME);

final StreamGraph streamGraph = env.getStreamGraph();
assertThat(findWriter(streamGraph).getOperatorName())
.isEqualTo(String.format("%s: Writer", NAME));
assertThat(findCommitter(streamGraph).getOperatorName())
.isEqualTo(String.format("%s: Committer", NAME));
assertThat(findGlobalCommitter(streamGraph).getOperatorName())
.isEqualTo(String.format("%s: Global Committer", NAME));
}

private void testParallelismConfiguredInternal(boolean setSinkParallelism) {
final StreamGraph streamGraph =
buildGraph(sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
Expand Down Expand Up @@ -57,11 +55,24 @@ class SinkV1TransformationTranslatorITCase
return TestSink.newBuilder().setDefaultCommitter().build();
}

@Override
Sink<Integer, ?, ?, ?> sinkWithGlobalCommitter() {
return TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build();
}

@Override
DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer, ?, ?, ?> sink) {
return stream.sinkTo(sink);
}

@Override
DataStreamSink<Integer> sinkTo(
DataStream<Integer> stream,
Sink<Integer, ?, ?, ?> sink,
CustomSinkOperatorUidHashes hashes) {
return stream.sinkTo(sink, hashes);
}

@TestTemplate
void generateWriterCommitterGlobalCommitterTopology() {

Expand Down Expand Up @@ -156,53 +167,4 @@ void generateWriterGlobalCommitterTopology() {
1,
1);
}

@TestTemplate
void testSettingOperatorUidHash() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromData(1, 2);
final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
final String globalCommitterHash = "77e6aa6eeb1643b3765e1e4a7a672f37";
final CustomSinkOperatorUidHashes operatorsUidHashes =
CustomSinkOperatorUidHashes.builder()
.setWriterUidHash(writerHash)
.setCommitterUidHash(committerHash)
.setGlobalCommitterUidHash(globalCommitterHash)
.build();
src.sinkTo(
TestSink.newBuilder()
.setDefaultCommitter()
.setDefaultGlobalCommitter()
.build(),
operatorsUidHashes)
.name(NAME);

final StreamGraph streamGraph = env.getStreamGraph();

assertThat(findWriter(streamGraph).getUserHash()).isEqualTo(writerHash);
assertThat(findCommitter(streamGraph).getUserHash()).isEqualTo(committerHash);
assertThat(findGlobalCommitter(streamGraph).getUserHash()).isEqualTo(globalCommitterHash);
}

/**
* When ever you need to change something in this test case please think about possible state
* upgrade problems introduced by your changes.
*/
@TestTemplate
void testSettingOperatorUids() {
final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<Integer> src = env.fromData(1, 2);
src.sinkTo(TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build())
.name(NAME)
.uid(sinkUid);

final StreamGraph streamGraph = env.getStreamGraph();
assertThat(findWriter(streamGraph).getTransformationUID()).isEqualTo(sinkUid);
assertThat(findCommitter(streamGraph).getTransformationUID())
.isEqualTo(String.format("Sink Committer: %s", sinkUid));
assertThat(findGlobalCommitter(streamGraph).getTransformationUID())
.isEqualTo(String.format("Sink %s Global Committer", sinkUid));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,25 @@ Sink<Integer> sinkWithCommitter() {
return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
}

@Override
Sink<Integer> sinkWithGlobalCommitter() {
return TestSinkV2.<Integer>newBuilder()
.setDefaultCommitter()
.setWithPostCommitTopology(true)
.build();
}

@Override
DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer> sink) {
return stream.sinkTo(sink);
}

@Override
DataStreamSink<Integer> sinkTo(
DataStream<Integer> stream, Sink<Integer> sink, CustomSinkOperatorUidHashes hashes) {
return stream.sinkTo(sink, hashes);
}

@TestTemplate
void testSettingOperatorUidHash() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,25 @@ Sink<Integer> sinkWithCommitter() {
return TestSinkV2.<Integer>newBuilder().setDefaultCommitter().build();
}

@Override
Sink<Integer> sinkWithGlobalCommitter() {
return TestSinkV2.<Integer>newBuilder()
.setDefaultCommitter()
.setWithPostCommitTopology(true)
.build();
}

@Override
DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer> sink) {
return stream.sinkTo(sink);
}

@Override
DataStreamSink<Integer> sinkTo(
DataStream<Integer> stream, Sink<Integer> sink, CustomSinkOperatorUidHashes hashes) {
return stream.sinkTo(sink, hashes);
}

@TestTemplate
void testSettingOperatorUidHash() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -225,11 +226,6 @@ public SupportsCommitter<String> asSupportsCommitter() {
public SimpleVersionedSerializer<String> getCommittableSerializer() {
return committableSerializer;
}

@Override
public SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
return super.createWriter(context);
}
}

// -------------------------------------- Sink With PostCommitTopology -------------------------
Expand All @@ -246,7 +242,8 @@ public TestSinkV2WithPostCommitTopology(

@Override
public void addPostCommitTopology(DataStream<CommittableMessage<String>> committables) {
// We do not need to do anything for tests
StandardSinkTopologies.addGlobalCommitter(
committables, DefaultCommitter::new, this::getCommittableSerializer);
}
}

Expand Down Expand Up @@ -468,17 +465,17 @@ public void init() {
/** A {@link Committer} that always re-commits the committables data it received. */
static class RetryOnceCommitter extends DefaultCommitter {

private final Set<CommitRequest<String>> seen = new LinkedHashSet<>();
private final Set<String> seen = new LinkedHashSet<>();

@Override
public void commit(Collection<CommitRequest<String>> committables) {
committables.forEach(
c -> {
if (seen.remove(c)) {
if (seen.remove(c.getCommittable())) {
checkNotNull(committedData);
committedData.add(c);
} else {
seen.add(c);
seen.add(c.getCommittable());
c.retryLater();
}
});
Expand Down
Loading

0 comments on commit 3082baa

Please sign in to comment.