Skip to content

Commit

Permalink
handled unsigned and default values better (#635)
Browse files Browse the repository at this point in the history
* fixed copy paste issue with valueOrDefault

* handled unsigned and default values better

* handled unsigned and default values better

* removing using getters until review

* addFieldWhenGtZero

* middle ground on server value awareness

* handle pull vs push server defaults

* handle pull vs push server defaults

* handle pull vs push server defaults

* test organization

* test coverage

* future proof and self documenting code for unsigned consumer config vars

* don't need server values

* test UlongChangeHelper
  • Loading branch information
scottf authored Apr 26, 2022
1 parent 98f7a26 commit ce2c647
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 256 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ Subscription creation has many checks to make sure that a valid, operable subscr
| JsSubExistingConsumerCannotBeModified | SUB | 90016 | Existing consumer cannot be modified. |
| JsSubConsumerNotFoundRequiredInBind | SUB | 90017 | Consumer not found, required in bind mode. |
| JsSubOrderedNotAllowOnQueues | SUB | 90018 | Ordered consumer not allowed on queues. |
| JsSubPushCantHaveMaxBatch | SUB | 90019 | Push subscriptions cannot supply max batch. |

### Message Acknowledgements

Expand Down
171 changes: 114 additions & 57 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
*/
public class ConsumerConfiguration implements JsonSerializable {

public static final Duration MIN_ACK_WAIT = Duration.ofNanos(1);
public static final DeliverPolicy DEFAULT_DELIVER_POLICY = DeliverPolicy.All;
public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.Explicit;
public static final ReplayPolicy DEFAULT_REPLAY_POLICY = ReplayPolicy.Instant;

public static final Duration MIN_IDLE_HEARTBEAT = Duration.ofMillis(100);

protected final DeliverPolicy deliverPolicy;
Expand All @@ -54,20 +57,16 @@ public class ConsumerConfiguration implements JsonSerializable {
protected final Duration idleHeartbeat;
protected final Duration maxExpires;
protected final Duration inactiveThreshold;
protected final Long startSeq;
protected final Long startSeq; // server side this is unsigned
protected final Long maxDeliver;
protected final Long rateLimit;
protected final Long rateLimit; // server side this is unsigned
protected final Long maxAckPending;
protected final Long maxPullWaiting;
protected final Long maxBatch;
protected final Boolean flowControl;
protected final Boolean headersOnly;
protected final List<Duration> backoff;

private static DeliverPolicy GetOrDefault(DeliverPolicy p) { return p == null ? DeliverPolicy.All : p; }
private static AckPolicy GetOrDefault(AckPolicy p) { return p == null ? AckPolicy.Explicit : p; }
private static ReplayPolicy GetOrDefault(ReplayPolicy p) { return p == null ? ReplayPolicy.Instant : p; }

protected ConsumerConfiguration(ConsumerConfiguration cc) {
this.deliverPolicy = cc.deliverPolicy;
this.ackPolicy = cc.ackPolicy;
Expand Down Expand Up @@ -97,13 +96,13 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
// for the response from the server
ConsumerConfiguration(String json) {
Matcher m = DELIVER_POLICY_RE.matcher(json);
deliverPolicy = GetOrDefault(m.find() ? DeliverPolicy.get(m.group(1)) : null);
deliverPolicy = m.find() ? DeliverPolicy.get(m.group(1)) : null;

m = ACK_POLICY_RE.matcher(json);
ackPolicy = GetOrDefault(m.find() ? AckPolicy.get(m.group(1)) : null);
ackPolicy = m.find() ? AckPolicy.get(m.group(1)) : null;

m = REPLAY_POLICY_RE.matcher(json);
replayPolicy = GetOrDefault(m.find() ? ReplayPolicy.get(m.group(1)) : null);
replayPolicy = m.find() ? ReplayPolicy.get(m.group(1)) : null;

description = JsonUtils.readString(json, DESCRIPTION_RE);
durable = JsonUtils.readString(json, DURABLE_NAME_RE);
Expand All @@ -125,8 +124,8 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
maxPullWaiting = JsonUtils.readLong(json, MAX_WAITING_RE);
maxBatch = JsonUtils.readLong(json, MAX_BATCH_RE);

flowControl = JsonUtils.readBoolean(json, FLOW_CONTROL_RE);
headersOnly = JsonUtils.readBoolean(json, HEADERS_ONLY_RE);
flowControl = JsonUtils.readBoolean(json, FLOW_CONTROL_RE, null);
headersOnly = JsonUtils.readBoolean(json, HEADERS_ONLY_RE, null);

backoff = JsonUtils.getDurationList(BACKOFF, json);
}
Expand Down Expand Up @@ -175,16 +174,16 @@ public String toJson() {
JsonUtils.addField(sb, DELIVER_SUBJECT, deliverSubject);
JsonUtils.addField(sb, DELIVER_GROUP, deliverGroup);
JsonUtils.addField(sb, DELIVER_POLICY, GetOrDefault(deliverPolicy).toString());
JsonUtils.addField(sb, OPT_START_SEQ, startSeq);
JsonUtils.addFieldWhenGtZero(sb, OPT_START_SEQ, startSeq);
JsonUtils.addField(sb, OPT_START_TIME, startTime);
JsonUtils.addField(sb, ACK_POLICY, GetOrDefault(ackPolicy).toString());
JsonUtils.addFieldAsNanos(sb, ACK_WAIT, ackWait);
JsonUtils.addField(sb, MAX_DELIVER, maxDeliver);
JsonUtils.addFieldWhenGtZero(sb, MAX_DELIVER, maxDeliver);
JsonUtils.addField(sb, MAX_ACK_PENDING, maxAckPending);
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubject);
JsonUtils.addField(sb, REPLAY_POLICY, GetOrDefault(replayPolicy).toString());
JsonUtils.addField(sb, SAMPLE_FREQ, sampleFrequency);
JsonUtils.addField(sb, RATE_LIMIT_BPS, rateLimit);
JsonUtils.addFieldWhenGtZero(sb, RATE_LIMIT_BPS, rateLimit);
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
JsonUtils.addFldWhenTrue(sb, FLOW_CONTROL, flowControl);
JsonUtils.addField(sb, ApiConstants.MAX_WAITING, maxPullWaiting);
Expand Down Expand Up @@ -241,7 +240,7 @@ public DeliverPolicy getDeliverPolicy() {
* @return the start sequence.
*/
public long getStartSequence() {
return CcChangeHelper.START_SEQ.valueOrDefault(startSeq);
return UlongChangeHelper.START_SEQ.getOrUnset(startSeq);
}

/**
Expand Down Expand Up @@ -273,7 +272,7 @@ public Duration getAckWait() {
* @return the max delivery amount.
*/
public long getMaxDeliver() {
return CcChangeHelper.MAX_DELIVER.valueOrDefault(maxDeliver);
return LongChangeHelper.MAX_DELIVER.getOrUnset(maxDeliver);
}

/**
Expand All @@ -297,15 +296,15 @@ public ReplayPolicy getReplayPolicy() {
* @return the rate limit in bits per second
*/
public long getRateLimit() {
return CcChangeHelper.RATE_LIMIT.valueOrDefault(rateLimit);
return UlongChangeHelper.RATE_LIMIT.getOrUnset(rateLimit);
}

/**
* Gets the maximum ack pending configuration.
* @return maximum ack pending.
*/
public long getMaxAckPending() {
return CcChangeHelper.MAX_ACK_PENDING.valueOrDefault(maxAckPending);
return LongChangeHelper.MAX_ACK_PENDING.getOrUnset(maxAckPending);
}

/**
Expand Down Expand Up @@ -338,7 +337,7 @@ public boolean isFlowControl() {
* @return the max pull waiting
*/
public long getMaxPullWaiting() {
return CcChangeHelper.MAX_PULL_WAITING.valueOrDefault(maxPullWaiting);
return LongChangeHelper.MAX_PULL_WAITING.getOrUnset(maxPullWaiting);
}

/**
Expand All @@ -354,7 +353,7 @@ public boolean isHeadersOnly() {
* @return the max batch size
*/
public Long getMaxBatch() {
return CcChangeHelper.MAX_BATCH.valueOrDefault(maxBatch);
return LongChangeHelper.MAX_BATCH.getOrUnset(maxBatch);
}

/**
Expand Down Expand Up @@ -614,7 +613,7 @@ public Builder deliverGroup(String group) {
* @return Builder
*/
public Builder startSequence(Long sequence) {
this.startSeq = sequence;
this.startSeq = UlongChangeHelper.START_SEQ.forBuilder(sequence);
return this;
}

Expand All @@ -624,7 +623,7 @@ public Builder startSequence(Long sequence) {
* @return Builder
*/
public Builder startSequence(long sequence) {
this.startSeq = sequence;
this.startSeq = UlongChangeHelper.START_SEQ.forBuilder(sequence);
return this;
}

Expand Down Expand Up @@ -654,7 +653,7 @@ public Builder ackPolicy(AckPolicy policy) {
* @return Builder
*/
public Builder ackWait(Duration timeout) {
this.ackWait = validateDurationNotRequiredNotLessThanMin(timeout, MIN_ACK_WAIT);
this.ackWait = DurationChangeHelper.ACK_WAIT.forBuilder(timeout);
return this;
}

Expand All @@ -664,7 +663,7 @@ public Builder ackWait(Duration timeout) {
* @return Builder
*/
public Builder ackWait(long timeoutMillis) {
this.ackWait = validateDurationNotRequiredNotLessThanMin(timeoutMillis, MIN_ACK_WAIT);
this.ackWait = DurationChangeHelper.ACK_WAIT.forBuilder(Duration.ofMillis(timeoutMillis));
return this;
}

Expand All @@ -674,7 +673,7 @@ public Builder ackWait(long timeoutMillis) {
* @return Builder
*/
public Builder maxDeliver(Long maxDeliver) {
this.maxDeliver = maxDeliver;
this.maxDeliver = LongChangeHelper.MAX_DELIVER.forBuilder(maxDeliver);
return this;
}

Expand All @@ -684,7 +683,7 @@ public Builder maxDeliver(Long maxDeliver) {
* @return Builder
*/
public Builder maxDeliver(long maxDeliver) {
this.maxDeliver = maxDeliver;
this.maxDeliver = LongChangeHelper.MAX_DELIVER.forBuilder(maxDeliver);
return this;
}

Expand Down Expand Up @@ -724,7 +723,7 @@ public Builder sampleFrequency(String frequency) {
* @return Builder
*/
public Builder rateLimit(Long bitsPerSecond) {
this.rateLimit = bitsPerSecond;
this.rateLimit = UlongChangeHelper.RATE_LIMIT.forBuilder(bitsPerSecond);
return this;
}

Expand All @@ -734,7 +733,7 @@ public Builder rateLimit(Long bitsPerSecond) {
* @return Builder
*/
public Builder rateLimit(long bitsPerSecond) {
this.rateLimit = bitsPerSecond;
this.rateLimit = UlongChangeHelper.RATE_LIMIT.forBuilder(bitsPerSecond);
return this;
}

Expand All @@ -744,7 +743,7 @@ public Builder rateLimit(long bitsPerSecond) {
* @return Builder
*/
public Builder maxAckPending(Long maxAckPending) {
this.maxAckPending = maxAckPending;
this.maxAckPending = LongChangeHelper.MAX_ACK_PENDING.forBuilder(maxAckPending);
return this;
}

Expand All @@ -754,7 +753,7 @@ public Builder maxAckPending(Long maxAckPending) {
* @return Builder
*/
public Builder maxAckPending(long maxAckPending) {
this.maxAckPending = maxAckPending;
this.maxAckPending = LongChangeHelper.MAX_ACK_PENDING.forBuilder(maxAckPending);
return this;
}

Expand Down Expand Up @@ -845,7 +844,7 @@ public Builder inactiveThreshold(long inactiveThreshold) {
* @return Builder
*/
public Builder maxPullWaiting(Long maxPullWaiting) {
this.maxPullWaiting = maxPullWaiting;
this.maxPullWaiting = LongChangeHelper.MAX_PULL_WAITING.forBuilder(maxPullWaiting);
return this;
}

Expand All @@ -855,7 +854,7 @@ public Builder maxPullWaiting(Long maxPullWaiting) {
* @return Builder
*/
public Builder maxPullWaiting(long maxPullWaiting) {
this.maxPullWaiting = maxPullWaiting;
this.maxPullWaiting = LongChangeHelper.MAX_PULL_WAITING.forBuilder(maxPullWaiting);
return this;
}

Expand All @@ -865,7 +864,7 @@ public Builder maxPullWaiting(long maxPullWaiting) {
* @return Builder
*/
public Builder maxBatch(Long maxBatch) {
this.maxBatch = maxBatch;
this.maxBatch = LongChangeHelper.MAX_BATCH.forBuilder(maxBatch);
return this;
}

Expand All @@ -875,7 +874,7 @@ public Builder maxBatch(Long maxBatch) {
* @return Builder
*/
public Builder maxBatch(long maxBatch) {
this.maxBatch = maxBatch;
this.maxBatch = LongChangeHelper.MAX_BATCH.forBuilder(maxBatch);
return this;
}

Expand Down Expand Up @@ -974,39 +973,97 @@ public String toString() {
'}';
}

private static DeliverPolicy GetOrDefault(DeliverPolicy p) { return p == null ? DEFAULT_DELIVER_POLICY : p; }
private static AckPolicy GetOrDefault(AckPolicy p) { return p == null ? DEFAULT_ACK_POLICY : p; }
private static ReplayPolicy GetOrDefault(ReplayPolicy p) { return p == null ? DEFAULT_REPLAY_POLICY : p; }

/**
* INTERNAL CLASS ONLY, SUBJECT TO CHANGE
* Helper class to identify the initial or default value of a field
* and to make it easy to compare to other instances where the server provides
* a default value.
*/
public enum CcChangeHelper {
START_SEQ(1, -1),
MAX_DELIVER(1, -1),
RATE_LIMIT(1, -1),
MAX_ACK_PENDING(0, 0),
MAX_PULL_WAITING(0, 0),
MAX_BATCH(1, -1),
ACK_WAIT(Duration.ZERO.toNanos(), Duration.ofSeconds(30).toNanos());
* Helper class to manage min / default / unset / server values.
*/
public enum LongChangeHelper {
MAX_DELIVER(1, -1), // 0 is treated the same as -1 on the server, which is why the server doesn't omit this
MAX_ACK_PENDING(0, -1),
MAX_PULL_WAITING(0, -1),
MAX_BATCH(0, -1);

public final long Min;
public final long Default;
public final long Unset;

LongChangeHelper(long min, long unset) {
Min = min;
Unset = unset;
}

public long getOrUnset(Long val) {
return val == null ? Unset : val;
}

public boolean wouldBeChange(Long user, Long server) {
return user != null && !user.equals(getOrUnset(server));
}

public Long forBuilder(Long proposed) {
return proposed == null || proposed < Min ? Unset : proposed;
}
}

/**
* INTERNAL CLASS ONLY, SUBJECT TO CHANGE
* Helper class to manage min / default / unset / server values.
*/
public enum UlongChangeHelper {
START_SEQ(1, 0),
RATE_LIMIT(1, 0);

public final long Min;
public final long Unset;

UlongChangeHelper(long min, long unset) {
Min = min;
Unset = unset;
}

public long getOrUnset(Long val) {
return val == null ? Unset : val;
}

public boolean wouldBeChange(Long user, Long server) {
return user != null && !user.equals(getOrUnset(server));
}

public Long forBuilder(Long proposed) {
return proposed == null || proposed < Min ? Unset : proposed;
}
}

/**
* INTERNAL CLASS ONLY, SUBJECT TO CHANGE
* Helper class to manage min / default / unset / server values.
*/
public enum DurationChangeHelper {
ACK_WAIT(); // Nanos

public final Duration Min;
public final Duration Unset;
public final long MinNanos;

CcChangeHelper(long min, long dflt) {
this.Min = min;
this.Default = dflt;
DurationChangeHelper() {
Min = Duration.ofNanos(1);
Unset = Duration.ZERO;
MinNanos = 1;
}

public long valueOrDefault(Long val) {
return val == null ? START_SEQ.Default : val;
public Duration getOrUnset(Duration val) {
return val == null ? Unset : val;
}

public boolean wouldBeChange(Long user, Long srvr) {
return user != null && user != Default && !user.equals(srvr);
public boolean wouldBeChange(Duration user, Duration server) {
return user != null && !user.equals(getOrUnset(server));
}

public boolean wouldBeChange(Duration user, Duration srvr) {
return user != null && !user.equals(srvr);
public Duration forBuilder(Duration proposed) {
return proposed == null || proposed.toNanos() < MinNanos ? Unset : proposed;
}
}
}
Loading

0 comments on commit ce2c647

Please sign in to comment.