diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/CommitLogReadTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/CommitLogReadTest.java index 5775fc094..dd7248ffa 100644 --- a/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/CommitLogReadTest.java +++ b/beam/core/src/test/java/cz/o2/proxima/beam/core/direct/io/CommitLogReadTest.java @@ -168,7 +168,7 @@ public void testWithMultiplePartitions() throws InterruptedException { direct, createTestFamily( event, - URI.create("kafka-test://brokers/topic-" + UUID.randomUUID().toString()), + URI.create("kafka-test://brokers/topic-" + UUID.randomUUID()), ImmutableMap.of( LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS, numPartitions, @@ -346,15 +346,21 @@ public static class FiniteElementsWatermarkEstimatorFactory implements Watermark private static final Map> CONSUMED_ELEMENTS = new ConcurrentHashMap<>(); + String name; + int numElements; + + @Override + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + name = cfg.get(WatermarkConfiguration.prefixedKey("name")).toString(); + numElements = + Integer.parseInt(cfg.get(WatermarkConfiguration.prefixedKey("numElements")).toString()); + } + @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + public WatermarkEstimator create() { return new WatermarkEstimator() { - final String name = cfg.get(WatermarkConfiguration.prefixedKey("name")).toString(); - final int numElements = - Integer.parseInt(cfg.get(WatermarkConfiguration.prefixedKey("numElements")).toString()); long watermark = Watermarks.MIN_WATERMARK; final Map selfElements = CONSUMED_ELEMENTS.computeIfAbsent(name, k -> new ConcurrentHashMap<>()); diff --git a/core/src/main/java/cz/o2/proxima/core/time/WatermarkEstimatorFactory.java b/core/src/main/java/cz/o2/proxima/core/time/WatermarkEstimatorFactory.java index 8e03d560c..c4119d981 100644 --- a/core/src/main/java/cz/o2/proxima/core/time/WatermarkEstimatorFactory.java +++ b/core/src/main/java/cz/o2/proxima/core/time/WatermarkEstimatorFactory.java @@ -20,5 +20,14 @@ /** Factory creates watermark estimator instance. */ public interface WatermarkEstimatorFactory extends Serializable { - WatermarkEstimator create(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory); + + /** + * Configure the estimator factory based on provided configuration. + * + * @param cfg the confguration + * @param idlePolicyFactory the configured idle policy factory + */ + void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory); + + WatermarkEstimator create(); } diff --git a/core/src/main/java/cz/o2/proxima/core/time/WatermarkIdlePolicyFactory.java b/core/src/main/java/cz/o2/proxima/core/time/WatermarkIdlePolicyFactory.java index ee51a1a0e..db7c475e4 100644 --- a/core/src/main/java/cz/o2/proxima/core/time/WatermarkIdlePolicyFactory.java +++ b/core/src/main/java/cz/o2/proxima/core/time/WatermarkIdlePolicyFactory.java @@ -20,5 +20,9 @@ /** Factory creates watermark idle policy instance. */ public interface WatermarkIdlePolicyFactory extends Serializable { - WatermarkIdlePolicy create(Map cfg); + + /** Configure the factory based on the provided configuration. */ + void setup(Map cfg); + + WatermarkIdlePolicy create(); } diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimator.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimator.java index 063fa6a15..66d7eb85a 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimator.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimator.java @@ -24,14 +24,17 @@ import cz.o2.proxima.core.time.WatermarkIdlePolicy; import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.core.time.Watermarks; +import cz.o2.proxima.internal.com.google.common.base.MoreObjects; import java.util.Map; import java.util.Optional; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * This estimators generates watermarks assuming that elements arrive out of order, but only to a * certain degree defined by configuration: watermark.max-out-of-orderness. */ +@Slf4j public class BoundedOutOfOrdernessWatermarkEstimator extends AbstractWatermarkEstimator { private static final long serialVersionUID = 1L; @@ -44,6 +47,7 @@ public class BoundedOutOfOrdernessWatermarkEstimator extends AbstractWatermarkEs private BoundedOutOfOrdernessWatermarkEstimator( long maxOutOfOrderness, long minWatermark, WatermarkIdlePolicy idlePolicy) { + super(idlePolicy); this.maxOutOfOrderness = maxOutOfOrderness; this.minWatermark = minWatermark; @@ -81,20 +85,33 @@ public void setMinWatermark(long minWatermark) { */ public static class Factory implements WatermarkEstimatorFactory { private static final long serialVersionUID = 1L; + @Getter private long maxOutOfOrderness; + private WatermarkIdlePolicyFactory idlePolicyFactory; @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { - final long maxOutOfOrderness = + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + this.maxOutOfOrderness = Optional.ofNullable(cfg.get(prefixedKey(MAX_OUT_OF_ORDERNESS_MS))) .map(v -> Long.valueOf(v.toString())) .orElse(DEFAULT_MAX_OUT_OF_ORDERNESS_MS); + this.idlePolicyFactory = idlePolicyFactory; + } + @Override + public WatermarkEstimator create() { return BoundedOutOfOrdernessWatermarkEstimator.newBuilder() .withMaxOutOfOrderness(maxOutOfOrderness) - .withWatermarkIdlePolicy(idlePolicyFactory.create(cfg)) + .withWatermarkIdlePolicy(idlePolicyFactory.create()) .build(); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxOutOfOrderness", maxOutOfOrderness) + .add("idlePolicyFactory", idlePolicyFactory) + .toString(); + } } /** @@ -140,4 +157,12 @@ public BoundedOutOfOrdernessWatermarkEstimator build() { maxOutOfOrderness, minWatermark, watermarkIdlePolicy); } } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("maxOutOfOrderness", maxOutOfOrderness) + .add("minWatermark", minWatermark) + .toString(); + } } diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicy.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicy.java index 08fe1209a..17e7da09b 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicy.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicy.java @@ -28,7 +28,10 @@ public class NotProgressingWatermarkIdlePolicy implements WatermarkIdlePolicy { public static class Factory implements WatermarkIdlePolicyFactory { @Override - public WatermarkIdlePolicy create(Map cfg) { + public void setup(Map cfg) {} + + @Override + public WatermarkIdlePolicy create() { return new NotProgressingWatermarkIdlePolicy(); } } diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/ProcessingTimeWatermarkEstimator.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/ProcessingTimeWatermarkEstimator.java index e2170d6bb..49eff8fb7 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/ProcessingTimeWatermarkEstimator.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/ProcessingTimeWatermarkEstimator.java @@ -15,24 +15,19 @@ */ package cz.o2.proxima.direct.core.time; -import cz.o2.proxima.core.storage.StreamElement; -import cz.o2.proxima.core.time.AbstractWatermarkEstimator; import cz.o2.proxima.core.time.WatermarkEstimator; import cz.o2.proxima.core.time.WatermarkEstimatorFactory; -import cz.o2.proxima.core.time.WatermarkIdlePolicy; import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.core.time.Watermarks; import java.util.Map; /** Estimates watermark as processing time. */ -public class ProcessingTimeWatermarkEstimator extends AbstractWatermarkEstimator { +public class ProcessingTimeWatermarkEstimator implements WatermarkEstimator { private static final long serialVersionUID = 1L; private final TimestampSupplier timestampSupplier; private long minWatermark; - ProcessingTimeWatermarkEstimator( - long minWatermark, TimestampSupplier timestampSupplier, WatermarkIdlePolicy idlePolicy) { - super(idlePolicy); + ProcessingTimeWatermarkEstimator(long minWatermark, TimestampSupplier timestampSupplier) { this.minWatermark = minWatermark; this.timestampSupplier = timestampSupplier; } @@ -41,14 +36,15 @@ public class ProcessingTimeWatermarkEstimator extends AbstractWatermarkEstimator * Creates an instance of {@link cz.o2.proxima.direct.core.time.ProcessingTimeWatermarkEstimator}. */ public static class Factory implements WatermarkEstimatorFactory { + private static final long serialVersionUID = 1L; @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { - return ProcessingTimeWatermarkEstimator.newBuilder() - .withWatermarkIdlePolicy(idlePolicyFactory.create(cfg)) - .build(); + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {} + + @Override + public WatermarkEstimator create() { + return ProcessingTimeWatermarkEstimator.newBuilder().build(); } } @@ -56,42 +52,28 @@ public WatermarkEstimator create( public static class Builder { private final long minWatermark; private final TimestampSupplier timestampSupplier; - private final WatermarkIdlePolicy watermarkIdlePolicy; Builder() { - this( - Watermarks.MIN_WATERMARK, - System::currentTimeMillis, - new NotProgressingWatermarkIdlePolicy()); + this(Watermarks.MIN_WATERMARK, System::currentTimeMillis); } - private Builder( - long minWatermark, TimestampSupplier timestampSupplier, WatermarkIdlePolicy idlePolicy) { + private Builder(long minWatermark, TimestampSupplier timestampSupplier) { this.minWatermark = minWatermark; this.timestampSupplier = timestampSupplier; - this.watermarkIdlePolicy = idlePolicy; } public ProcessingTimeWatermarkEstimator.Builder withMinWatermark(long minWatermark) { - return new ProcessingTimeWatermarkEstimator.Builder( - minWatermark, timestampSupplier, watermarkIdlePolicy); + return new ProcessingTimeWatermarkEstimator.Builder(minWatermark, timestampSupplier); } public ProcessingTimeWatermarkEstimator.Builder withTimestampSupplier( TimestampSupplier timestampSupplier) { - return new ProcessingTimeWatermarkEstimator.Builder( - minWatermark, timestampSupplier, watermarkIdlePolicy); - } - public ProcessingTimeWatermarkEstimator.Builder withWatermarkIdlePolicy( - WatermarkIdlePolicy watermarkIdlePolicy) { - return new ProcessingTimeWatermarkEstimator.Builder( - minWatermark, timestampSupplier, watermarkIdlePolicy); + return new ProcessingTimeWatermarkEstimator.Builder(minWatermark, timestampSupplier); } public ProcessingTimeWatermarkEstimator build() { - return new ProcessingTimeWatermarkEstimator( - minWatermark, timestampSupplier, watermarkIdlePolicy); + return new ProcessingTimeWatermarkEstimator(minWatermark, timestampSupplier); } } @@ -100,13 +82,10 @@ public static ProcessingTimeWatermarkEstimator.Builder newBuilder() { } @Override - protected long estimateWatermark() { + public long getWatermark() { return Math.max(timestampSupplier.get(), minWatermark); } - @Override - protected void updateWatermark(StreamElement element) {} - @Override public void setMinWatermark(long minWatermark) { this.minWatermark = minWatermark; diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicy.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicy.java index 38f54f529..b5e459b2e 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicy.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicy.java @@ -20,6 +20,7 @@ import cz.o2.proxima.core.time.WatermarkIdlePolicy; import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.core.time.Watermarks; +import cz.o2.proxima.internal.com.google.common.base.MoreObjects; import java.util.Map; import java.util.Optional; import lombok.Getter; @@ -47,10 +48,10 @@ public class SkewedProcessingTimeIdlePolicy implements WatermarkIdlePolicy { public static class Factory implements WatermarkIdlePolicyFactory { - @Override - public WatermarkIdlePolicy create(Map cfg) { - long timestampSkew; + @Getter long timestampSkew; + @Override + public void setup(Map cfg) { // Check for legacy configuration outside watermark config if (cfg.containsKey(TIMESTAMP_SKEW)) { log.warn( @@ -68,9 +69,17 @@ public WatermarkIdlePolicy create(Map cfg) { .map(v -> Long.valueOf(v.toString())) .orElse(DEFAULT_TIMESTAMP_SKEW); } + } + @Override + public WatermarkIdlePolicy create() { return new SkewedProcessingTimeIdlePolicy(timestampSkew); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("timestampSkew", timestampSkew).toString(); + } } @Override @@ -82,4 +91,12 @@ public long getIdleWatermark() { public void idle(long currentWatermark) { this.currentWatermark = timestampSupplier.get() - timestampSkew; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("timestampSkew", timestampSkew) + .add("timestampSupplier", timestampSupplier) + .toString(); + } } diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimator.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimator.java index dcf033144..7f4d782b1 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimator.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimator.java @@ -25,6 +25,7 @@ import cz.o2.proxima.core.time.WatermarkIdlePolicy; import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting; +import cz.o2.proxima.internal.com.google.common.base.MoreObjects; import cz.o2.proxima.internal.com.google.common.base.Preconditions; import java.util.Arrays; import java.util.Map; @@ -54,22 +55,30 @@ public class UnboundedOutOfOrdernessWatermarkEstimator extends AbstractWatermark public static class Factory implements WatermarkEstimatorFactory { - @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + private long durationMs; + private long stepMs; + private long allowedTimestampSkew; + private long minWatermark; + private WatermarkIdlePolicyFactory idlePolicyFactory; - long durationMs = getConfiguration(ESTIMATE_DURATION_MS, cfg, DEFAULT_ESTIMATE_DURATION_MS); - long stepMs = getConfiguration(STEP_MS, cfg, DEFAULT_STEP_MS); - long allowedTimestampSkew = + @Override + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + this.durationMs = getConfiguration(ESTIMATE_DURATION_MS, cfg, DEFAULT_ESTIMATE_DURATION_MS); + this.stepMs = getConfiguration(STEP_MS, cfg, DEFAULT_STEP_MS); + this.allowedTimestampSkew = getConfiguration(ALLOWED_TIMESTAMP_SKEW, cfg, DEFAULT_ALLOWED_TIMESTAMP_SKEW); - long minWatermark = getConfiguration(MIN_WATERMARK, cfg, DEFAULT_MIN_WATERMARK); + this.minWatermark = getConfiguration(MIN_WATERMARK, cfg, DEFAULT_MIN_WATERMARK); + this.idlePolicyFactory = idlePolicyFactory; + } + @Override + public WatermarkEstimator create() { return UnboundedOutOfOrdernessWatermarkEstimator.newBuilder() .withAllowedTimestampSkew(allowedTimestampSkew) .withDurationMs(durationMs) .withStepMs(stepMs) .withMinWatermark(minWatermark) - .withWatermarkIdlePolicy(idlePolicyFactory.create(cfg)) + .withWatermarkIdlePolicy(idlePolicyFactory.create()) .build(); } @@ -79,6 +88,17 @@ private long getConfiguration( .map(v -> Long.valueOf(v.toString())) .orElse(defaultValue); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("allowedTimestampSkew", this.allowedTimestampSkew) + .add("minWatermark", this.minWatermark) + .add("stepMs", this.stepMs) + .add("durationMs", this.durationMs) + .add("idlePolicyFactory", this.idlePolicyFactory) + .toString(); + } } /** Builder of the {@link UnboundedOutOfOrdernessWatermarkEstimator}. */ diff --git a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/WatermarkConfiguration.java b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/WatermarkConfiguration.java index 37dbdbfb2..6ec179096 100644 --- a/direct/core/src/main/java/cz/o2/proxima/direct/core/time/WatermarkConfiguration.java +++ b/direct/core/src/main/java/cz/o2/proxima/direct/core/time/WatermarkConfiguration.java @@ -68,7 +68,7 @@ public static String prefixedKey(String cfgName) { */ protected abstract WatermarkEstimatorFactory getDefaultEstimatorFactory(); - protected void configure() { + protected final void configure() { watermarkIdlePolicyFactory = Optional.ofNullable(cfg.get(prefixedKey(CFG_IDLE_POLICY_FACTORY))) .map(Object::toString) @@ -81,9 +81,7 @@ protected void configure() { .map(cls -> Classpath.newInstance(cls, WatermarkEstimatorFactory.class)) .orElse(getDefaultEstimatorFactory()); - log.debug( - "Configured watermark with watermarkEstimatorFactory {}, idlePolicyFactory {}", - watermarkEstimatorFactory, - watermarkIdlePolicyFactory); + watermarkIdlePolicyFactory.setup(cfg); + watermarkEstimatorFactory.setup(cfg, watermarkIdlePolicyFactory); } } diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimatorTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimatorTest.java index 9c93eab19..9c2cc746b 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimatorTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/BoundedOutOfOrdernessWatermarkEstimatorTest.java @@ -159,14 +159,15 @@ public void testFactory() { ImmutableMap.of(prefixedKey(MAX_OUT_OF_ORDERNESS_MS), OUT_OF_ORDERNESS); WatermarkIdlePolicyFactory idlePolicyFactory = mock(WatermarkIdlePolicyFactory.class); - when(idlePolicyFactory.create(cfg)).thenReturn(mock(WatermarkIdlePolicy.class)); + when(idlePolicyFactory.create()).thenReturn(mock(WatermarkIdlePolicy.class)); WatermarkEstimatorFactory factory = new BoundedOutOfOrdernessWatermarkEstimator.Factory(); + factory.setup(cfg, idlePolicyFactory); BoundedOutOfOrdernessWatermarkEstimator watermarkEstimator = - (BoundedOutOfOrdernessWatermarkEstimator) factory.create(cfg, idlePolicyFactory); + (BoundedOutOfOrdernessWatermarkEstimator) factory.create(); assertEquals(OUT_OF_ORDERNESS, watermarkEstimator.getMaxOutOfOrderness()); - verify(idlePolicyFactory, times(1)).create(cfg); + verify(idlePolicyFactory, times(1)).create(); } private StreamElement element(long ts) { diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicyTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicyTest.java index 82c71954a..157150282 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicyTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/NotProgressingWatermarkIdlePolicyTest.java @@ -54,7 +54,6 @@ public void testGetIdleWatermarkMonotonicity() { for (int i = 0; i < 100; i++) { long previousWatermark = policy.getIdleWatermark(); policy.idle(random.nextLong()); - assertTrue(previousWatermark <= policy.getIdleWatermark()); } } @@ -62,7 +61,8 @@ public void testGetIdleWatermarkMonotonicity() { @Test public void testFactory() { WatermarkIdlePolicyFactory factory = new NotProgressingWatermarkIdlePolicy.Factory(); - WatermarkIdlePolicy policy = factory.create(Collections.emptyMap()); + factory.setup(Collections.emptyMap()); + WatermarkIdlePolicy policy = factory.create(); assertNotNull(policy); assertEquals(NotProgressingWatermarkIdlePolicy.class, policy.getClass()); } diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicyTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicyTest.java index 391ee7a63..b48f5e833 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicyTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/SkewedProcessingTimeIdlePolicyTest.java @@ -76,8 +76,8 @@ public void testGetIdleWatermarkMonotonicity() { @Test public void testFactory() { WatermarkIdlePolicyFactory factory = new SkewedProcessingTimeIdlePolicy.Factory(); - SkewedProcessingTimeIdlePolicy policy = - (SkewedProcessingTimeIdlePolicy) factory.create(Collections.emptyMap()); + factory.setup(Collections.emptyMap()); + SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(); assertNotNull(policy); assertEquals(DEFAULT_TIMESTAMP_SKEW, policy.getTimestampSkew()); } @@ -88,7 +88,8 @@ public void testFactoryConfig() { ImmutableMap.of(prefixedKey(SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW), TIMESTAMP_SKEW); WatermarkIdlePolicyFactory factory = new SkewedProcessingTimeIdlePolicy.Factory(); - SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(cfg); + factory.setup(cfg); + SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(); assertEquals(TIMESTAMP_SKEW, policy.getTimestampSkew()); } @@ -98,7 +99,8 @@ public void testFactoryLegacyConfig() { ImmutableMap.of(SkewedProcessingTimeIdlePolicy.TIMESTAMP_SKEW, TIMESTAMP_SKEW); WatermarkIdlePolicyFactory factory = new SkewedProcessingTimeIdlePolicy.Factory(); - SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(cfg); + factory.setup(cfg); + SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(); assertEquals(TIMESTAMP_SKEW, policy.getTimestampSkew()); } @@ -115,7 +117,8 @@ public void testFactoryLegacyAndNewConfig() { legacyTimestampSkew); WatermarkIdlePolicyFactory factory = new SkewedProcessingTimeIdlePolicy.Factory(); - SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(cfg); + factory.setup(cfg); + SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) factory.create(); // Legacy config has higher priority assertEquals(legacyTimestampSkew, policy.getTimestampSkew()); } diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimatorTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimatorTest.java index eb4265164..a96a9056a 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimatorTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/UnboundedOutOfOrdernessWatermarkEstimatorTest.java @@ -58,7 +58,7 @@ public void testInitializedSameStamp() { for (int i = 0; i < 3; i++) { assertEquals( UnboundedOutOfOrdernessWatermarkEstimator.DEFAULT_MIN_WATERMARK, est.getWatermark()); - stamp.accumulateAndGet(250, (a, b) -> a + b); + stamp.accumulateAndGet(250, Long::sum); est.add(stamp.get()); } assertEquals(550, est.getWatermark()); @@ -74,7 +74,7 @@ public void testInitializedIncreasingStamp() { public void testCreateBacklog() { UnboundedOutOfOrdernessWatermarkEstimator est = createEstimator(); testTimestampIncreaseInitializes(est); - stamp.accumulateAndGet(250, (a, b) -> a + b); + stamp.accumulateAndGet(250, Long::sum); est.add(stamp.get() - 1000); assertEquals(stamp.get() - 450, est.getWatermark()); testTimestampIncreaseInitializes(est); @@ -83,7 +83,7 @@ public void testCreateBacklog() { private UnboundedOutOfOrdernessWatermarkEstimator testTimestampIncreaseInitializes( UnboundedOutOfOrdernessWatermarkEstimator est) { for (int i = 0; i < 10; i++) { - stamp.accumulateAndGet(250, (a, b) -> a + b); + stamp.accumulateAndGet(250, Long::sum); est.add(stamp.get() - i * 5); } assertEquals(stamp.get() - 200, est.getWatermark()); @@ -130,16 +130,17 @@ public void testFactory() { prefixedKey(ALLOWED_TIMESTAMP_SKEW), allowedTimestampSkew); WatermarkIdlePolicyFactory idlePolicyFactory = mock(WatermarkIdlePolicyFactory.class); - when(idlePolicyFactory.create(cfg)).thenReturn(mock(WatermarkIdlePolicy.class)); + when(idlePolicyFactory.create()).thenReturn(mock(WatermarkIdlePolicy.class)); WatermarkEstimatorFactory factory = new UnboundedOutOfOrdernessWatermarkEstimator.Factory(); + factory.setup(cfg, idlePolicyFactory); UnboundedOutOfOrdernessWatermarkEstimator watermarkEstimator = - (UnboundedOutOfOrdernessWatermarkEstimator) factory.create(cfg, idlePolicyFactory); + (UnboundedOutOfOrdernessWatermarkEstimator) factory.create(); assertEquals(estimateDurationMs, watermarkEstimator.getEstimateDurationMs()); assertEquals(stepMs, watermarkEstimator.getStepMs()); assertEquals(allowedTimestampSkew, watermarkEstimator.getAllowedTimestampSkew()); - verify(idlePolicyFactory, times(1)).create(cfg); + verify(idlePolicyFactory, times(1)).create(); } private UnboundedOutOfOrdernessWatermarkEstimator createEstimator() { diff --git a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/WatermarkConfigurationTest.java b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/WatermarkConfigurationTest.java index 855506ac4..4bb053af8 100644 --- a/direct/core/src/test/java/cz/o2/proxima/direct/core/time/WatermarkConfigurationTest.java +++ b/direct/core/src/test/java/cz/o2/proxima/direct/core/time/WatermarkConfigurationTest.java @@ -89,9 +89,11 @@ protected WatermarkEstimatorFactory getDefaultEstimatorFactory() { } public static class IdlePolicy implements WatermarkIdlePolicyFactory { + @Override + public void setup(Map cfg) {} @Override - public WatermarkIdlePolicy create(Map cfg) { + public WatermarkIdlePolicy create() { return null; } } @@ -99,8 +101,10 @@ public WatermarkIdlePolicy create(Map cfg) { public static class Estimator implements WatermarkEstimatorFactory { @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {} + + @Override + public WatermarkEstimator create() { return null; } } @@ -109,7 +113,10 @@ public WatermarkEstimator create( public static class CustomIdlePolicyFactory implements WatermarkIdlePolicyFactory { @Override - public WatermarkIdlePolicy create(Map cfg) { + public void setup(Map cfg) {} + + @Override + public WatermarkIdlePolicy create() { return null; } } @@ -117,8 +124,10 @@ public WatermarkIdlePolicy create(Map cfg) { public static class CustomWatermarkEstimatorFactory implements WatermarkEstimatorFactory { @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {} + + @Override + public WatermarkEstimator create() { return null; } } diff --git a/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaAccessor.java b/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaAccessor.java index bf3538246..8ad8ee7ed 100644 --- a/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaAccessor.java +++ b/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaAccessor.java @@ -41,7 +41,6 @@ import java.util.Optional; import java.util.Properties; import javax.annotation.Nullable; -import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; @@ -108,34 +107,25 @@ public class KafkaAccessor extends SerializableAbstractStorage implements DataAc @Getter @Nullable private final String topicPattern; - @Getter(AccessLevel.PACKAGE) - private final Map cfg; + @Getter private final Map cfg; - @Getter(AccessLevel.PACKAGE) - private Partitioner partitioner = new KeyPartitioner(); + @Getter private Partitioner partitioner = new KeyPartitioner(); - @Getter(AccessLevel.PACKAGE) - private long consumerPollInterval = 100; + @Getter private long consumerPollInterval = 100; - @Getter(AccessLevel.PACKAGE) - private long maxBytesPerSec = Long.MAX_VALUE; + @Getter private long maxBytesPerSec = Long.MAX_VALUE; - @Getter(AccessLevel.PACKAGE) - private int maxPollRecords = 500; + @Getter private int maxPollRecords = 500; - @Getter(AccessLevel.PACKAGE) - private long autoCommitIntervalMs = Long.MAX_VALUE; + @Getter private long autoCommitIntervalMs = Long.MAX_VALUE; - @Getter(AccessLevel.PACKAGE) - private long logStaleCommitIntervalMs = 60_000L; + @Getter private long logStaleCommitIntervalMs = 60_000L; - @Getter(AccessLevel.PACKAGE) - private long assignmentTimeoutMillis = 10_000L; + @Getter private long assignmentTimeoutMillis = 10_000L; - @Getter(AccessLevel.PACKAGE) - private KafkaWatermarkConfiguration watermarkConfiguration; + @Getter private KafkaWatermarkConfiguration watermarkConfiguration; - Class> serializerClass; + @Getter Class> serializerClass; public KafkaAccessor(EntityDescriptor entity, URI uri, Map cfg) { diff --git a/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java b/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java index 6cac79725..55358742c 100644 --- a/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java +++ b/direct/io-kafka/src/main/java/cz/o2/proxima/direct/io/kafka/KafkaLogReader.java @@ -24,8 +24,6 @@ import cz.o2.proxima.core.storage.commitlog.Position; import cz.o2.proxima.core.time.PartitionedWatermarkEstimator; import cz.o2.proxima.core.time.WatermarkEstimator; -import cz.o2.proxima.core.time.WatermarkEstimatorFactory; -import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.core.time.Watermarks; import cz.o2.proxima.core.util.ExceptionUtils; import cz.o2.proxima.direct.core.Context; @@ -945,11 +943,7 @@ List getCommittedTopicOffsets( } private WatermarkEstimator createWatermarkEstimator() { - final WatermarkIdlePolicyFactory idlePolicyFactory = - accessor.getWatermarkConfiguration().getWatermarkIdlePolicyFactory(); - final WatermarkEstimatorFactory estimatorFactory = - accessor.getWatermarkConfiguration().getWatermarkEstimatorFactory(); - return estimatorFactory.create(cfg, idlePolicyFactory); + return accessor.getWatermarkConfiguration().getWatermarkEstimatorFactory().create(); } }; } diff --git a/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/KafkaWatermarkConfigurationTest.java b/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/KafkaWatermarkConfigurationTest.java index b63a0d4f6..0dfbd8ac0 100644 --- a/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/KafkaWatermarkConfigurationTest.java +++ b/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/KafkaWatermarkConfigurationTest.java @@ -33,11 +33,11 @@ public void testConfigureDefault() { Map cfg = ImmutableMap.of("timestamp-skew", 10L); KafkaWatermarkConfiguration configuration = new KafkaWatermarkConfiguration(cfg); WatermarkIdlePolicyFactory policyFactory = configuration.getWatermarkIdlePolicyFactory(); + configuration.getWatermarkEstimatorFactory().setup(cfg, policyFactory); BoundedOutOfOrdernessWatermarkEstimator estimator = (BoundedOutOfOrdernessWatermarkEstimator) - configuration.getWatermarkEstimatorFactory().create(cfg, policyFactory); - SkewedProcessingTimeIdlePolicy policy = - (SkewedProcessingTimeIdlePolicy) policyFactory.create(cfg); + configuration.getWatermarkEstimatorFactory().create(); + SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) policyFactory.create(); assertNotNull(estimator); assertNotNull(policy); diff --git a/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptor.java b/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptor.java index ff93af045..7804e51de 100644 --- a/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptor.java +++ b/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptor.java @@ -201,7 +201,7 @@ private void configure(URI uri, Map cfg) { public KafkaConsumerFactory createConsumerFactory() { ElementSerializer serializer = getSerializer(); - return new KafkaConsumerFactory( + return new KafkaConsumerFactory<>( getUri(), new Properties(), serializer.keySerde(), serializer.valueSerde()) { @Override @@ -239,8 +239,7 @@ public KafkaConsumer create( private List allPartitions() { List ret = new ArrayList<>(); for (int i = 0; i < numPartitions; i++) { - int id = i; - ret.add(new PartitionWithTopic(getTopic(), id)); + ret.add(new PartitionWithTopic(getTopic(), i)); } return ret; } diff --git a/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptorTest.java b/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptorTest.java index 6c5c2e5e8..fef40eb5b 100644 --- a/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptorTest.java +++ b/direct/io-kafka/src/test/java/cz/o2/proxima/direct/io/kafka/LocalKafkaCommitLogDescriptorTest.java @@ -177,7 +177,7 @@ public void testSinglePartitionWriteAndConsumeBySingleConsumerRunAfterWrite() AttributeFamilyDescriptor testFamily = createTestFamily(entity, storageUri, partitionsCfg(1)); Accessor accessor = kafka.createAccessor(direct, testFamily); assertTrue(accessor.isAcceptable(testFamily)); - LocalKafkaWriter writer = accessor.newWriter(); + LocalKafkaWriter writer = accessor.newWriter(); KafkaConsumer consumer = accessor.createConsumerFactory().create(); CountDownLatch latch = new CountDownLatch(1); writer.write( @@ -3318,8 +3318,10 @@ public static final class FixedWatermarkEstimatorFactory implements WatermarkEst public static final long FIXED_WATERMARK = 333L; @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {} + + @Override + public WatermarkEstimator create() { return new WatermarkEstimator() { @Override public long getWatermark() { @@ -3336,7 +3338,10 @@ public static final class FixedWatermarkIdlePolicyFactory implements WatermarkId public static final long FIXED_IDLE_WATERMARK = 555L; @Override - public WatermarkIdlePolicy create(Map cfg) { + public void setup(Map cfg) {} + + @Override + public WatermarkIdlePolicy create() { return () -> FIXED_IDLE_WATERMARK; } } diff --git a/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubReader.java b/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubReader.java index a8cf2f89c..e5c6dde44 100644 --- a/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubReader.java +++ b/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubReader.java @@ -39,7 +39,6 @@ import cz.o2.proxima.core.storage.commitlog.Position; import cz.o2.proxima.core.time.WatermarkEstimator; import cz.o2.proxima.core.time.WatermarkEstimatorFactory; -import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.core.time.WatermarkSupplier; import cz.o2.proxima.direct.core.Context; import cz.o2.proxima.direct.core.commitlog.CommitLogObserver; @@ -58,7 +57,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -151,28 +149,26 @@ private interface PubSubConsumer extends Serializable { } private final PubSubAccessor accessor; - private final Map cfg; private final Context context; private final String project; private final String topic; private final int maxAckDeadline; private final int subscriptionAckDeadline; private final boolean subscriptionAutoCreate; - private final PubSubWatermarkConfiguration watermarkConfiguration; + private final WatermarkEstimatorFactory watermarkFactory; private transient ExecutorService executor; PubSubReader(PubSubAccessor accessor, Context context) { super(accessor.getEntityDescriptor(), accessor.getUri()); this.accessor = accessor; - this.cfg = accessor.getCfg(); this.context = context; this.project = accessor.getProject(); this.topic = accessor.getTopic(); this.maxAckDeadline = accessor.getMaxAckDeadline(); this.subscriptionAckDeadline = accessor.getSubscriptionAckDeadline(); this.subscriptionAutoCreate = accessor.isSubscriptionAutoCreate(); - this.watermarkConfiguration = accessor.getWatermarkConfiguration(); + this.watermarkFactory = accessor.getWatermarkConfiguration().getWatermarkEstimatorFactory(); } @Override @@ -421,13 +417,9 @@ Subscriber newSubscriber(ProjectSubscriptionName subscription, MessageReceiver r } WatermarkEstimator createWatermarkEstimator(long minWatermark) { - final WatermarkIdlePolicyFactory idlePolicyFactory = - watermarkConfiguration.getWatermarkIdlePolicyFactory(); - final WatermarkEstimatorFactory estimatorFactory = - watermarkConfiguration.getWatermarkEstimatorFactory(); - final WatermarkEstimator estimator = estimatorFactory.create(cfg, idlePolicyFactory); - estimator.setMinWatermark(minWatermark); - return estimator; + WatermarkEstimator res = watermarkFactory.create(); + res.setMinWatermark(minWatermark); + return res; } private void createSubscription( diff --git a/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfiguration.java b/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfiguration.java index 445980342..f69c7ebcd 100644 --- a/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfiguration.java +++ b/direct/io-pubsub/module/src/main/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfiguration.java @@ -24,6 +24,7 @@ import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory; import cz.o2.proxima.direct.core.time.SkewedProcessingTimeIdlePolicy; import cz.o2.proxima.direct.core.time.UnboundedOutOfOrdernessWatermarkEstimator; +import cz.o2.proxima.direct.core.time.UnboundedOutOfOrdernessWatermarkEstimator.Factory; import cz.o2.proxima.direct.core.time.WatermarkConfiguration; import java.util.HashMap; import java.util.Map; @@ -63,6 +64,8 @@ static class PubSubWatermarkEstimatorFactory implements WatermarkEstimatorFactor private final long defaultEstimateDuration; private final long defaultAllowedTimestampSkew; + private UnboundedOutOfOrdernessWatermarkEstimator.Factory wrappedFactory; + PubSubWatermarkEstimatorFactory( long defaultEstimateDuration, long defaultAllowedTimestampSkew) { this.defaultEstimateDuration = defaultEstimateDuration; @@ -70,8 +73,7 @@ static class PubSubWatermarkEstimatorFactory implements WatermarkEstimatorFactor } @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { // Preserves backward compatible behaviour by adding default values to config. HashMap newConfig = new HashMap<>(cfg); @@ -88,8 +90,14 @@ public WatermarkEstimator create( newConfig.putIfAbsent(prefixedKey(ESTIMATE_DURATION_MS), defaultEstimateDuration); newConfig.putIfAbsent(prefixedKey(ALLOWED_TIMESTAMP_SKEW), defaultAllowedTimestampSkew); - return new UnboundedOutOfOrdernessWatermarkEstimator.Factory() - .create(newConfig, idlePolicyFactory); + idlePolicyFactory.setup(newConfig); + this.wrappedFactory = new Factory(); + wrappedFactory.setup(newConfig, idlePolicyFactory); + } + + @Override + public WatermarkEstimator create() { + return wrappedFactory.create(); } } } diff --git a/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubReaderTest.java b/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubReaderTest.java index bbd019f73..b45744ad8 100644 --- a/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubReaderTest.java +++ b/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubReaderTest.java @@ -112,8 +112,10 @@ Subscriber newSubscriber(ProjectSubscriptionName subscription, MessageReceiver r public static class TestWatermarkEstimatorFactory implements WatermarkEstimatorFactory { @Override - public WatermarkEstimator create( - Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) { + public void setup(Map cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {} + + @Override + public WatermarkEstimator create() { return UnboundedOutOfOrdernessWatermarkEstimator.newBuilder() .withDurationMs(1) .withStepMs(1) diff --git a/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfigurationTest.java b/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfigurationTest.java index eeefccd0a..a72afb914 100644 --- a/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfigurationTest.java +++ b/direct/io-pubsub/module/src/test/java/cz/o2/proxima/direct/io/pubsub/PubSubWatermarkConfigurationTest.java @@ -38,9 +38,8 @@ public void testConfigureDefault() { WatermarkIdlePolicyFactory policyFactory = configuration.getWatermarkIdlePolicyFactory(); UnboundedOutOfOrdernessWatermarkEstimator estimator = (UnboundedOutOfOrdernessWatermarkEstimator) - configuration.getWatermarkEstimatorFactory().create(cfg, policyFactory); - SkewedProcessingTimeIdlePolicy policy = - (SkewedProcessingTimeIdlePolicy) policyFactory.create(cfg); + configuration.getWatermarkEstimatorFactory().create(); + SkewedProcessingTimeIdlePolicy policy = (SkewedProcessingTimeIdlePolicy) policyFactory.create(); assertNotNull(estimator); assertNotNull(policy);