Skip to content

Commit

Permalink
Merge pull request #878: [proxima-direct-core] refactor WatermarkFact…
Browse files Browse the repository at this point in the history
…ory to setup() and create()
  • Loading branch information
je-ik authored Feb 26, 2024
2 parents 3fcfcf7 + 41e2074 commit deaf968
Show file tree
Hide file tree
Showing 23 changed files with 209 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testWithMultiplePartitions() throws InterruptedException {
direct,
createTestFamily(
event,
URI.create("kafka-test://brokers/topic-" + UUID.randomUUID().toString()),
URI.create("kafka-test://brokers/topic-" + UUID.randomUUID()),
ImmutableMap.of(
LocalKafkaCommitLogDescriptor.CFG_NUM_PARTITIONS,
numPartitions,
Expand Down Expand Up @@ -346,15 +346,21 @@ public static class FiniteElementsWatermarkEstimatorFactory implements Watermark
private static final Map<String, Map<Integer, Boolean>> CONSUMED_ELEMENTS =
new ConcurrentHashMap<>();

String name;
int numElements;

@Override
public void setup(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
name = cfg.get(WatermarkConfiguration.prefixedKey("name")).toString();
numElements =
Integer.parseInt(cfg.get(WatermarkConfiguration.prefixedKey("numElements")).toString());
}

@Override
public WatermarkEstimator create(
Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
public WatermarkEstimator create() {

return new WatermarkEstimator() {

final String name = cfg.get(WatermarkConfiguration.prefixedKey("name")).toString();
final int numElements =
Integer.parseInt(cfg.get(WatermarkConfiguration.prefixedKey("numElements")).toString());
long watermark = Watermarks.MIN_WATERMARK;
final Map<Integer, Boolean> selfElements =
CONSUMED_ELEMENTS.computeIfAbsent(name, k -> new ConcurrentHashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,14 @@

/** Factory creates watermark estimator instance. */
public interface WatermarkEstimatorFactory extends Serializable {
WatermarkEstimator create(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory);

/**
* Configure the estimator factory based on provided configuration.
*
* @param cfg the confguration
* @param idlePolicyFactory the configured idle policy factory
*/
void setup(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory);

WatermarkEstimator create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@

/** Factory creates watermark idle policy instance. */
public interface WatermarkIdlePolicyFactory extends Serializable {
WatermarkIdlePolicy create(Map<String, Object> cfg);

/** Configure the factory based on the provided configuration. */
void setup(Map<String, Object> cfg);

WatermarkIdlePolicy create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import cz.o2.proxima.core.time.WatermarkIdlePolicy;
import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory;
import cz.o2.proxima.core.time.Watermarks;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* This estimators generates watermarks assuming that elements arrive out of order, but only to a
* certain degree defined by configuration: watermark.max-out-of-orderness.
*/
@Slf4j
public class BoundedOutOfOrdernessWatermarkEstimator extends AbstractWatermarkEstimator {
private static final long serialVersionUID = 1L;

Expand All @@ -44,6 +47,7 @@ public class BoundedOutOfOrdernessWatermarkEstimator extends AbstractWatermarkEs

private BoundedOutOfOrdernessWatermarkEstimator(
long maxOutOfOrderness, long minWatermark, WatermarkIdlePolicy idlePolicy) {

super(idlePolicy);
this.maxOutOfOrderness = maxOutOfOrderness;
this.minWatermark = minWatermark;
Expand Down Expand Up @@ -81,20 +85,33 @@ public void setMinWatermark(long minWatermark) {
*/
public static class Factory implements WatermarkEstimatorFactory {
private static final long serialVersionUID = 1L;
@Getter private long maxOutOfOrderness;
private WatermarkIdlePolicyFactory idlePolicyFactory;

@Override
public WatermarkEstimator create(
Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
final long maxOutOfOrderness =
public void setup(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
this.maxOutOfOrderness =
Optional.ofNullable(cfg.get(prefixedKey(MAX_OUT_OF_ORDERNESS_MS)))
.map(v -> Long.valueOf(v.toString()))
.orElse(DEFAULT_MAX_OUT_OF_ORDERNESS_MS);
this.idlePolicyFactory = idlePolicyFactory;
}

@Override
public WatermarkEstimator create() {
return BoundedOutOfOrdernessWatermarkEstimator.newBuilder()
.withMaxOutOfOrderness(maxOutOfOrderness)
.withWatermarkIdlePolicy(idlePolicyFactory.create(cfg))
.withWatermarkIdlePolicy(idlePolicyFactory.create())
.build();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("maxOutOfOrderness", maxOutOfOrderness)
.add("idlePolicyFactory", idlePolicyFactory)
.toString();
}
}

/**
Expand Down Expand Up @@ -140,4 +157,12 @@ public BoundedOutOfOrdernessWatermarkEstimator build() {
maxOutOfOrderness, minWatermark, watermarkIdlePolicy);
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("maxOutOfOrderness", maxOutOfOrderness)
.add("minWatermark", minWatermark)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ public class NotProgressingWatermarkIdlePolicy implements WatermarkIdlePolicy {
public static class Factory implements WatermarkIdlePolicyFactory {

@Override
public WatermarkIdlePolicy create(Map<String, Object> cfg) {
public void setup(Map<String, Object> cfg) {}

@Override
public WatermarkIdlePolicy create() {
return new NotProgressingWatermarkIdlePolicy();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,19 @@
*/
package cz.o2.proxima.direct.core.time;

import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.time.AbstractWatermarkEstimator;
import cz.o2.proxima.core.time.WatermarkEstimator;
import cz.o2.proxima.core.time.WatermarkEstimatorFactory;
import cz.o2.proxima.core.time.WatermarkIdlePolicy;
import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory;
import cz.o2.proxima.core.time.Watermarks;
import java.util.Map;

/** Estimates watermark as processing time. */
public class ProcessingTimeWatermarkEstimator extends AbstractWatermarkEstimator {
public class ProcessingTimeWatermarkEstimator implements WatermarkEstimator {
private static final long serialVersionUID = 1L;
private final TimestampSupplier timestampSupplier;
private long minWatermark;

ProcessingTimeWatermarkEstimator(
long minWatermark, TimestampSupplier timestampSupplier, WatermarkIdlePolicy idlePolicy) {
super(idlePolicy);
ProcessingTimeWatermarkEstimator(long minWatermark, TimestampSupplier timestampSupplier) {
this.minWatermark = minWatermark;
this.timestampSupplier = timestampSupplier;
}
Expand All @@ -41,57 +36,44 @@ public class ProcessingTimeWatermarkEstimator extends AbstractWatermarkEstimator
* Creates an instance of {@link cz.o2.proxima.direct.core.time.ProcessingTimeWatermarkEstimator}.
*/
public static class Factory implements WatermarkEstimatorFactory {

private static final long serialVersionUID = 1L;

@Override
public WatermarkEstimator create(
Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
return ProcessingTimeWatermarkEstimator.newBuilder()
.withWatermarkIdlePolicy(idlePolicyFactory.create(cfg))
.build();
public void setup(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {}

@Override
public WatermarkEstimator create() {
return ProcessingTimeWatermarkEstimator.newBuilder().build();
}
}

/** Builder of the {@link ProcessingTimeWatermarkEstimator}. */
public static class Builder {
private final long minWatermark;
private final TimestampSupplier timestampSupplier;
private final WatermarkIdlePolicy watermarkIdlePolicy;

Builder() {
this(
Watermarks.MIN_WATERMARK,
System::currentTimeMillis,
new NotProgressingWatermarkIdlePolicy());
this(Watermarks.MIN_WATERMARK, System::currentTimeMillis);
}

private Builder(
long minWatermark, TimestampSupplier timestampSupplier, WatermarkIdlePolicy idlePolicy) {
private Builder(long minWatermark, TimestampSupplier timestampSupplier) {
this.minWatermark = minWatermark;
this.timestampSupplier = timestampSupplier;
this.watermarkIdlePolicy = idlePolicy;
}

public ProcessingTimeWatermarkEstimator.Builder withMinWatermark(long minWatermark) {
return new ProcessingTimeWatermarkEstimator.Builder(
minWatermark, timestampSupplier, watermarkIdlePolicy);
return new ProcessingTimeWatermarkEstimator.Builder(minWatermark, timestampSupplier);
}

public ProcessingTimeWatermarkEstimator.Builder withTimestampSupplier(
TimestampSupplier timestampSupplier) {
return new ProcessingTimeWatermarkEstimator.Builder(
minWatermark, timestampSupplier, watermarkIdlePolicy);
}

public ProcessingTimeWatermarkEstimator.Builder withWatermarkIdlePolicy(
WatermarkIdlePolicy watermarkIdlePolicy) {
return new ProcessingTimeWatermarkEstimator.Builder(
minWatermark, timestampSupplier, watermarkIdlePolicy);
return new ProcessingTimeWatermarkEstimator.Builder(minWatermark, timestampSupplier);
}

public ProcessingTimeWatermarkEstimator build() {
return new ProcessingTimeWatermarkEstimator(
minWatermark, timestampSupplier, watermarkIdlePolicy);
return new ProcessingTimeWatermarkEstimator(minWatermark, timestampSupplier);
}
}

Expand All @@ -100,13 +82,10 @@ public static ProcessingTimeWatermarkEstimator.Builder newBuilder() {
}

@Override
protected long estimateWatermark() {
public long getWatermark() {
return Math.max(timestampSupplier.get(), minWatermark);
}

@Override
protected void updateWatermark(StreamElement element) {}

@Override
public void setMinWatermark(long minWatermark) {
this.minWatermark = minWatermark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cz.o2.proxima.core.time.WatermarkIdlePolicy;
import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory;
import cz.o2.proxima.core.time.Watermarks;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
Expand Down Expand Up @@ -47,10 +48,10 @@ public class SkewedProcessingTimeIdlePolicy implements WatermarkIdlePolicy {

public static class Factory implements WatermarkIdlePolicyFactory {

@Override
public WatermarkIdlePolicy create(Map<String, Object> cfg) {
long timestampSkew;
@Getter long timestampSkew;

@Override
public void setup(Map<String, Object> cfg) {
// Check for legacy configuration outside watermark config
if (cfg.containsKey(TIMESTAMP_SKEW)) {
log.warn(
Expand All @@ -68,9 +69,17 @@ public WatermarkIdlePolicy create(Map<String, Object> cfg) {
.map(v -> Long.valueOf(v.toString()))
.orElse(DEFAULT_TIMESTAMP_SKEW);
}
}

@Override
public WatermarkIdlePolicy create() {
return new SkewedProcessingTimeIdlePolicy(timestampSkew);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("timestampSkew", timestampSkew).toString();
}
}

@Override
Expand All @@ -82,4 +91,12 @@ public long getIdleWatermark() {
public void idle(long currentWatermark) {
this.currentWatermark = timestampSupplier.get() - timestampSkew;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("timestampSkew", timestampSkew)
.add("timestampSupplier", timestampSupplier)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cz.o2.proxima.core.time.WatermarkIdlePolicy;
import cz.o2.proxima.core.time.WatermarkIdlePolicyFactory;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import cz.o2.proxima.internal.com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -54,22 +55,30 @@ public class UnboundedOutOfOrdernessWatermarkEstimator extends AbstractWatermark

public static class Factory implements WatermarkEstimatorFactory {

@Override
public WatermarkEstimator create(
Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
private long durationMs;
private long stepMs;
private long allowedTimestampSkew;
private long minWatermark;
private WatermarkIdlePolicyFactory idlePolicyFactory;

long durationMs = getConfiguration(ESTIMATE_DURATION_MS, cfg, DEFAULT_ESTIMATE_DURATION_MS);
long stepMs = getConfiguration(STEP_MS, cfg, DEFAULT_STEP_MS);
long allowedTimestampSkew =
@Override
public void setup(Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
this.durationMs = getConfiguration(ESTIMATE_DURATION_MS, cfg, DEFAULT_ESTIMATE_DURATION_MS);
this.stepMs = getConfiguration(STEP_MS, cfg, DEFAULT_STEP_MS);
this.allowedTimestampSkew =
getConfiguration(ALLOWED_TIMESTAMP_SKEW, cfg, DEFAULT_ALLOWED_TIMESTAMP_SKEW);
long minWatermark = getConfiguration(MIN_WATERMARK, cfg, DEFAULT_MIN_WATERMARK);
this.minWatermark = getConfiguration(MIN_WATERMARK, cfg, DEFAULT_MIN_WATERMARK);
this.idlePolicyFactory = idlePolicyFactory;
}

@Override
public WatermarkEstimator create() {
return UnboundedOutOfOrdernessWatermarkEstimator.newBuilder()
.withAllowedTimestampSkew(allowedTimestampSkew)
.withDurationMs(durationMs)
.withStepMs(stepMs)
.withMinWatermark(minWatermark)
.withWatermarkIdlePolicy(idlePolicyFactory.create(cfg))
.withWatermarkIdlePolicy(idlePolicyFactory.create())
.build();
}

Expand All @@ -79,6 +88,17 @@ private long getConfiguration(
.map(v -> Long.valueOf(v.toString()))
.orElse(defaultValue);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("allowedTimestampSkew", this.allowedTimestampSkew)
.add("minWatermark", this.minWatermark)
.add("stepMs", this.stepMs)
.add("durationMs", this.durationMs)
.add("idlePolicyFactory", this.idlePolicyFactory)
.toString();
}
}

/** Builder of the {@link UnboundedOutOfOrdernessWatermarkEstimator}. */
Expand Down
Loading

0 comments on commit deaf968

Please sign in to comment.