diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 2f96993719f3d..31a53cec8f69a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; +import java.util.Collections; import java.util.Map; public interface Suppressed extends NamedOperation> { @@ -48,7 +49,7 @@ interface BufferConfig> { * Create a size-constrained buffer in terms of the maximum number of keys it will store. */ static EagerBufferConfig maxRecords(final long recordLimit) { - return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE); + return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE, Collections.emptyMap()); } /** @@ -60,7 +61,7 @@ static EagerBufferConfig maxRecords(final long recordLimit) { * Create a size-constrained buffer in terms of the maximum number of bytes it will use. */ static EagerBufferConfig maxBytes(final long byteLimit) { - return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit); + return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit, Collections.emptyMap()); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index 74de6ef1d452f..800a2a52bff0f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -35,18 +35,19 @@ public Suppressed.StrictBufferConfig withNoBound() { return new StrictBufferConfigImpl( Long.MAX_VALUE, Long.MAX_VALUE, - SHUT_DOWN // doesn't matter, given the bounds + SHUT_DOWN, // doesn't matter, given the bounds + getLogConfig() ); } @Override public Suppressed.StrictBufferConfig shutDownWhenFull() { - return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN); + return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, getLogConfig()); } @Override public Suppressed.EagerBufferConfig emitEarlyWhenFull() { - return new EagerBufferConfigImpl(maxRecords(), maxBytes()); + return new EagerBufferConfigImpl(maxRecords(), maxBytes(), getLogConfig()); } public abstract boolean isLoggingEnabled(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index c56532d7f4775..7665e66742358 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -28,15 +28,9 @@ public class EagerBufferConfigImpl extends BufferConfigInternal logConfig; - public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { - this.maxRecords = maxRecords; - this.maxBytes = maxBytes; - this.logConfig = Collections.emptyMap(); - } - - private EagerBufferConfigImpl(final long maxRecords, - final long maxBytes, - final Map logConfig) { + public EagerBufferConfigImpl(final long maxRecords, + final long maxBytes, + final Map logConfig) { this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.logConfig = logConfig; @@ -97,16 +91,20 @@ public boolean equals(final Object o) { } final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o; return maxRecords == that.maxRecords && - maxBytes == that.maxBytes; + maxBytes == that.maxBytes && + Objects.equals(getLogConfig(), that.getLogConfig()); } @Override public int hashCode() { - return Objects.hash(maxRecords, maxBytes); + return Objects.hash(maxRecords, maxBytes, getLogConfig()); } @Override public String toString() { - return "EagerBufferConfigImpl{maxRecords=" + maxRecords + ", maxBytes=" + maxBytes + '}'; + return "EagerBufferConfigImpl{maxRecords=" + maxRecords + + ", maxBytes=" + maxBytes + + ", logConfig=" + getLogConfig() + + "}"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index 13ffccdfffb85..2ca5ef9b4ee72 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -41,14 +41,6 @@ public StrictBufferConfigImpl(final long maxRecords, this.logConfig = logConfig; } - public StrictBufferConfigImpl(final long maxRecords, - final long maxBytes, - final BufferFullStrategy bufferFullStrategy) { - this.maxRecords = maxRecords; - this.maxBytes = maxBytes; - this.bufferFullStrategy = bufferFullStrategy; - this.logConfig = Collections.emptyMap(); - } public StrictBufferConfigImpl() { this.maxRecords = Long.MAX_VALUE; @@ -59,12 +51,12 @@ public StrictBufferConfigImpl() { @Override public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) { - return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy); + return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, getLogConfig()); } @Override public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) { - return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy); + return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, getLogConfig()); } @Override @@ -113,18 +105,21 @@ public boolean equals(final Object o) { final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o; return maxRecords == that.maxRecords && maxBytes == that.maxBytes && - bufferFullStrategy == that.bufferFullStrategy; + bufferFullStrategy == that.bufferFullStrategy && + Objects.equals(getLogConfig(), ((StrictBufferConfigImpl) o).getLogConfig()); } @Override public int hashCode() { - return Objects.hash(maxRecords, maxBytes, bufferFullStrategy); + return Objects.hash(maxRecords, maxBytes, bufferFullStrategy, getLogConfig()); } @Override public String toString() { return "StrictBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + maxBytes + - ", bufferFullStrategy=" + bufferFullStrategy + '}'; + ", bufferFullStrategy=" + bufferFullStrategy + + ", logConfig=" + getLogConfig().toString() + + '}'; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java index 112b9eb70d231..b799884263555 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.junit.Test; +import java.util.Collections; + import static java.lang.Long.MAX_VALUE; import static java.time.Duration.ofMillis; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; @@ -46,13 +48,19 @@ public void bufferBuilderShouldBeConsistent() { assertThat( "keys alone should be set", maxRecords(2L), - is(new EagerBufferConfigImpl(2L, MAX_VALUE)) + is(new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.emptyMap())) ); assertThat( "size alone should be set", maxBytes(2L), - is(new EagerBufferConfigImpl(MAX_VALUE, 2L)) + is(new EagerBufferConfigImpl(MAX_VALUE, 2L, Collections.emptyMap())) + ); + + assertThat( + "config should be set even after max records", + maxRecords(2L).withMaxBytes(4L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")), + is(new EagerBufferConfigImpl(2L, 4L, Collections.singletonMap("myConfigKey", "myConfigValue"))) ); } @@ -91,7 +99,13 @@ public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() { assertThat( "all constraints should be set", untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)), - is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false)) + is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(3L, 2L, Collections.emptyMap()), null, false)) + ); + + assertThat( + "config is not lost early emit is set", + untilTimeLimit(ofMillis(2), maxRecords(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).emitEarlyWhenFull()), + is(new SuppressedInternal<>(null, ofMillis(2), new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.singletonMap("myConfigKey", "myConfigValue")), null, false)) ); } @@ -105,13 +119,13 @@ public void finalEventsShouldAcceptStrictBuffersAndSetBounds() { assertThat( untilWindowCloses(maxRecords(2L).shutDownWhenFull()), - is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN)) + is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap())) ) ); assertThat( untilWindowCloses(maxBytes(2L).shutDownWhenFull()), - is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN)) + is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap())) ) ); @@ -122,14 +136,79 @@ public void finalEventsShouldAcceptStrictBuffersAndSetBounds() { assertThat( untilWindowCloses(maxRecords(2L).shutDownWhenFull()).withName("name"), - is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN)) + is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN, Collections.emptyMap())) ) ); assertThat( untilWindowCloses(maxBytes(2L).shutDownWhenFull()).withName("name"), - is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN)) + is(new FinalResultsSuppressionBuilder<>("name", new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.emptyMap())) ) ); + + assertThat( + "config is not lost when shutdown when full is set", + untilWindowCloses(maxBytes(2L).withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")).shutDownWhenFull()), + is(new FinalResultsSuppressionBuilder<>(null, new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue")))) + ); + } + + @Test + public void supportLongChainOfMethods() { + final Suppressed.BufferConfig bufferConfig = unbounded() + .emitEarlyWhenFull() + .withMaxRecords(3L) + .withMaxBytes(4L) + .withMaxRecords(5L) + .withMaxBytes(6L); + + assertThat( + "long chain of eager buffer config sets attributes properly", + bufferConfig, + is(new EagerBufferConfigImpl(5L, 6L, Collections.emptyMap())) + ); + assertThat( + "long chain of strict buffer config sets attributes properly", + bufferConfig.shutDownWhenFull(), + is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.emptyMap())) + ); + + final Suppressed.BufferConfig bufferConfigWithLogging = unbounded() + .withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")) + .emitEarlyWhenFull() + .withMaxRecords(3L) + .withMaxBytes(4L) + .withMaxRecords(5L) + .withMaxBytes(6L); + + assertThat( + "long chain of eager buffer config sets attributes properly with logging enabled", + bufferConfigWithLogging, + is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue"))) + ); + assertThat( + "long chain of strict buffer config sets attributes properly with logging enabled", + bufferConfigWithLogging.shutDownWhenFull(), + is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue"))) + ); + + final Suppressed.BufferConfig bufferConfigWithLoggingCalledAtTheEnd = unbounded() + .emitEarlyWhenFull() + .withMaxRecords(3L) + .withMaxBytes(4L) + .withMaxRecords(5L) + .withMaxBytes(6L) + .withLoggingEnabled(Collections.singletonMap("myConfigKey", "myConfigValue")); + + assertThat( + "long chain of eager buffer config sets logging even after other setters", + bufferConfigWithLoggingCalledAtTheEnd, + is(new EagerBufferConfigImpl(5L, 6L, Collections.singletonMap("myConfigKey", "myConfigValue"))) + ); + assertThat( + "long chain of strict buffer config sets logging even after other setters", + bufferConfigWithLoggingCalledAtTheEnd.shutDownWhenFull(), + is(new StrictBufferConfigImpl(5L, 6L, SHUT_DOWN, Collections.singletonMap("myConfigKey", "myConfigValue"))) + ); } }