Skip to content

Commit

Permalink
KAFKA-12749: Changelog topic config on suppressed KTable lost (#10664)
Browse files Browse the repository at this point in the history
Refactored logConfig to be passed appropriately when using shutDownWhenFull or emitEarlyWhenFull. Removed the constructor that doesn't accept a logConfig parameter so you're forced to specify it explicitly, whether it's empty/unspecified or not.

Co-authored-by: Bruno Cadonna <cadonna@apache.org>

Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
vichu authored Jun 3, 2021
1 parent c2c08b4 commit 93dca8e
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;

public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {
Expand All @@ -48,7 +49,7 @@ interface BufferConfig<BC extends BufferConfig<BC>> {
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
*/
static EagerBufferConfig maxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE, Collections.emptyMap());
}

/**
Expand All @@ -60,7 +61,7 @@ static EagerBufferConfig maxRecords(final long recordLimit) {
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
*/
static EagerBufferConfig maxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit, Collections.emptyMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ public Suppressed.StrictBufferConfig withNoBound() {
return new StrictBufferConfigImpl(
Long.MAX_VALUE,
Long.MAX_VALUE,
SHUT_DOWN // doesn't matter, given the bounds
SHUT_DOWN, // doesn't matter, given the bounds
getLogConfig()
);
}

@Override
public Suppressed.StrictBufferConfig shutDownWhenFull() {
return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN);
return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, getLogConfig());
}

@Override
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
return new EagerBufferConfigImpl(maxRecords(), maxBytes());
return new EagerBufferConfigImpl(maxRecords(), maxBytes(), getLogConfig());
}

public abstract boolean isLoggingEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,9 @@ public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.Eager
private final long maxBytes;
private final Map<String, String> logConfig;

public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = Collections.emptyMap();
}

private EagerBufferConfigImpl(final long maxRecords,
final long maxBytes,
final Map<String, String> logConfig) {
public EagerBufferConfigImpl(final long maxRecords,
final long maxBytes,
final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = logConfig;
Expand Down Expand Up @@ -97,16 +91,20 @@ public boolean equals(final Object o) {
}
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes;
maxBytes == that.maxBytes &&
Objects.equals(getLogConfig(), that.getLogConfig());
}

@Override
public int hashCode() {
return Objects.hash(maxRecords, maxBytes);
return Objects.hash(maxRecords, maxBytes, getLogConfig());
}

@Override
public String toString() {
return "EagerBufferConfigImpl{maxRecords=" + maxRecords + ", maxBytes=" + maxBytes + '}';
return "EagerBufferConfigImpl{maxRecords=" + maxRecords +
", maxBytes=" + maxBytes +
", logConfig=" + getLogConfig() +
"}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ public StrictBufferConfigImpl(final long maxRecords,
this.logConfig = logConfig;
}

public StrictBufferConfigImpl(final long maxRecords,
final long maxBytes,
final BufferFullStrategy bufferFullStrategy) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.bufferFullStrategy = bufferFullStrategy;
this.logConfig = Collections.emptyMap();
}

public StrictBufferConfigImpl() {
this.maxRecords = Long.MAX_VALUE;
Expand All @@ -59,12 +51,12 @@ public StrictBufferConfigImpl() {

@Override
public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy);
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, getLogConfig());
}

@Override
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy);
return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, getLogConfig());
}

@Override
Expand Down Expand Up @@ -113,18 +105,21 @@ public boolean equals(final Object o) {
final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
return maxRecords == that.maxRecords &&
maxBytes == that.maxBytes &&
bufferFullStrategy == that.bufferFullStrategy;
bufferFullStrategy == that.bufferFullStrategy &&
Objects.equals(getLogConfig(), ((StrictBufferConfigImpl) o).getLogConfig());
}

@Override
public int hashCode() {
return Objects.hash(maxRecords, maxBytes, bufferFullStrategy);
return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, getLogConfig());
}

@Override
public String toString() {
return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
", maxBytes=" + maxBytes +
", bufferFullStrategy=" + bufferFullStrategy + '}';
", bufferFullStrategy=" + bufferFullStrategy +
", logConfig=" + getLogConfig().toString() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.junit.Test;

import java.util.Collections;

import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
Expand All @@ -46,13 +48,19 @@ public void bufferBuilderShouldBeConsistent() {
assertThat(
"keys alone should be set",
maxRecords(2L),
is(new EagerBufferConfigImpl(2L, MAX_VALUE))
is(new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.emptyMap()))
);

assertThat(
"size alone should be set",
maxBytes(2L),
is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
is(new EagerBufferConfigImpl(MAX_VALUE, 2L, Collections.emptyMap()))
);

assertThat(
"config should be set even after max records",
maxRecords(2L).withMaxBytes(4L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")),
is(new EagerBufferConfigImpl(2L, 4L, Collections.singletonMap("myConfigKey", "myConfigValue")))
);
}

Expand Down Expand Up @@ -91,7 +99,13 @@ public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
assertThat(
"all constraints should be set",
untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false))
is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L, Collections.emptyMap()), null, false))
);

assertThat(
"config is not lost early emit is set",
untilTimeLimit(ofMillis(2), maxRecords(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).emitEarlyWhenFull()),
is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.singletonMap("myConfigKey", "myConfigValue")), null, false))
);
}

Expand All @@ -105,13 +119,13 @@ public void finalEventsShouldAcceptStrictBuffersAndSetBounds() {

assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
)
);

assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
)
);

Expand All @@ -122,14 +136,79 @@ public void finalEventsShouldAcceptStrictBuffersAndSetBounds() {

assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()).withName("name"),
is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap()))
)
);

assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()).withName("name"),
is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap()))
)
);

assertThat(
"config is not lost when shutdown when full is set",
untilWindowCloses(maxBytes(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).shutDownWhenFull()),
is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue"))))
);
}

@Test
public void supportLongChainOfMethods() {
final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfig = unbounded()
.emitEarlyWhenFull()
.withMaxRecords(3L)
.withMaxBytes(4L)
.withMaxRecords(5L)
.withMaxBytes(6L);

assertThat(
"long chain of eager buffer config sets attributes properly",
bufferConfig,
is(new EagerBufferConfigImpl(5L, 6L, Collections.emptyMap()))
);
assertThat(
"long chain of strict buffer config sets attributes properly",
bufferConfig.shutDownWhenFull(),
is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.emptyMap()))
);

final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfigWithLogging = unbounded()
.withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue"))
.emitEarlyWhenFull()
.withMaxRecords(3L)
.withMaxBytes(4L)
.withMaxRecords(5L)
.withMaxBytes(6L);

assertThat(
"long chain of eager buffer config sets attributes properly with logging enabled",
bufferConfigWithLogging,
is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue")))
);
assertThat(
"long chain of strict buffer config sets attributes properly with logging enabled",
bufferConfigWithLogging.shutDownWhenFull(),
is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))
);

final Suppressed.BufferConfig<Suppressed.EagerBufferConfig> bufferConfigWithLoggingCalledAtTheEnd = unbounded()
.emitEarlyWhenFull()
.withMaxRecords(3L)
.withMaxBytes(4L)
.withMaxRecords(5L)
.withMaxBytes(6L)
.withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue"));

assertThat(
"long chain of eager buffer config sets logging even after other setters",
bufferConfigWithLoggingCalledAtTheEnd,
is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue")))
);
assertThat(
"long chain of strict buffer config sets logging even after other setters",
bufferConfigWithLoggingCalledAtTheEnd.shutDownWhenFull(),
is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))
);
}
}

0 comments on commit 93dca8e

Please sign in to comment.