Skip to content

Commit

Permalink
Simplification Tuning and Review (#927)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 20, 2023
1 parent fad76bd commit ae69edb
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,10 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m

printExplanation(label, consumerName, maxMessages, maxBytes);

long start = System.currentTimeMillis();

// create the consumer then use it
int receivedMessages = 0;
long receivedBytes = 0;
long start = System.currentTimeMillis();
try {
FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions);
Message msg = consumer.nextMessage();
Expand Down Expand Up @@ -150,7 +149,7 @@ private static String generateConsumerName(int maxMessages, int maxBytes) {
}

private static void printSummary(int receivedMessages, long receivedBytes, long elapsed) {
System.out.println("+++ " + receivedBytes + "/" + receivedMessages + " bytes/message(s) were received in " + elapsed + "ms\n");
System.out.println("+++ Fetch executed and " + receivedBytes + "/" + receivedMessages + " bytes/message(s) were received in " + elapsed + "ms\n");
}

private static void printExplanation(String label, String name, int maxMessages, int maxBytes) {
Expand All @@ -159,7 +158,7 @@ private static void printExplanation(String label, String name, int maxMessages,
switch (label) {
case "A":
System.out.println("=== Max bytes (" + maxBytes + ") threshold will be met since the next message would put the byte count over " + maxBytes + " bytes");
System.out.println("=== nextMessage() will return null when consume is done");
System.out.println("=== nextMessage() will return null when consume is done.");
break;
case "B":
System.out.println("=== Fetch max messages (" + maxMessages + ") will be reached before max bytes (" + maxBytes + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,20 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m

printExplanation(label, consumerName, maxMessages);

long start = System.currentTimeMillis();

// create the consumer then use it
int receivedMessages = 0;
long start = System.currentTimeMillis();
try {
FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions);
Message msg = consumer.nextMessage();
while (msg != null) {
msg.ack();
if (++receivedMessages == maxMessages) {
break;
msg = null;
}
else {
msg = consumer.nextMessage();
}
msg = consumer.nextMessage();
}
}
catch (JetStreamApiException | JetStreamStatusCheckedException | IOException | InterruptedException e) {
Expand All @@ -128,7 +129,7 @@ private static void simpleFetch(Connection nc, JetStream js, String label, int m
}

private static void printSummary(int received, long elapsed) {
System.out.println("+++ " + received + " message(s) were received in " + elapsed + "ms\n");
System.out.println("+++ Fetch executed and " + received + " message(s) were received in " + elapsed + "ms\n");
}

private static void printExplanation(String label, String name, int maxMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void main(String[] args) {

Thread consumeThread = new Thread(() -> {
int count = 0;
long start = System.nanoTime();
long start = System.currentTimeMillis();
try {
IterableConsumer consumer = consumerContext.consume();
System.out.println("Starting main loop.");
Expand All @@ -79,11 +79,11 @@ public static void main(String[] args) {
Thread.sleep(JITTER * 2); // allows more messages to come across
consumer.stop(1000);

System.out.println("Starting post-drain loop.");
System.out.println("Starting post-stop loop.");
Message msg = consumer.nextMessage(1000);
while (msg != null) {
msg.ack();
report("Post Drain Loop Running", System.nanoTime() - start, ++count);
report("Post-stop loop running", System.nanoTime() - start, ++count);
msg = consumer.nextMessage(1000);
}
}
Expand All @@ -96,7 +96,7 @@ public static void main(String[] args) {
// developer interrupted this thread?
return;
}
report("Done", System.nanoTime() - start, count);
report("Done", System.currentTimeMillis() - start, count);
});
consumeThread.start();

Expand All @@ -116,8 +116,7 @@ public static void main(String[] args) {
}
}

private static void report(String label, long elapsedNanos, int count) {
long ms = elapsedNanos / 1_000_000;
private static void report(String label, long ms, int count) {
System.out.println(label + ": Received " + count + " messages in " + ms + "ms.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public static void main(String[] args) {
// once the consumer is stopped, the client will drain messages
System.out.println("Stop the consumer...");
consumer.stop(1000);
Thread.sleep(1000); // enough for messages to drain after stop
}
catch (JetStreamApiException | IOException e) {
// JetStreamApiException:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public static void main(String[] args) {

int received = 0;
while (received < count) {
long start = System.currentTimeMillis();
try {
long start = System.currentTimeMillis();
Message m = consumerContext.next(1000);
long elapsed = System.currentTimeMillis() - start;
if (m == null) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/nats/client/FetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,14 @@
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public interface FetchConsumer extends MessageConsumer {
/**
* Read the next message. Return null if the fetch has been fulfilled either
* because max messages or bytes max bytes have been reached,
* or because the fetch was not fulfilled in the timeout set by the fetch options.
* @return the next message for this subscriber or null if there is a timeout
* @throws InterruptedException if one is thrown, in order to propagate it up
* @throws JetStreamStatusCheckedException an exception representing a status that requires attention,
* such as the consumer was deleted on the server in the middle of use.
*/
Message nextMessage() throws InterruptedException, JetStreamStatusCheckedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ public class JetStreamStatusCheckedException extends Exception {
public JetStreamStatusCheckedException(JetStreamStatusException cause) {
super(cause);
}
}
}
1 change: 0 additions & 1 deletion src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ protected static abstract class Builder<B, SO> {
ConsumerConfiguration cc;
long messageAlarmTime = -1;
boolean ordered;
boolean raiseStatusWarnings = true;

protected abstract B getThis();

Expand Down
51 changes: 39 additions & 12 deletions src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ public class NatsConsumerContext implements ConsumerContext {

private final NatsStreamContext streamContext;
private final NatsJetStream js;
private final PullSubscribeOptions bindPso;
private ConsumerInfo lastConsumerInfo;

NatsConsumerContext(NatsStreamContext streamContext, ConsumerInfo ci) throws IOException {
this.streamContext = streamContext;
js = new NatsJetStream(streamContext.jsm.conn, streamContext.jsm.jso);
bindPso = PullSubscribeOptions.bind(streamContext.streamName, ci.getName());
lastConsumerInfo = ci;
}

Expand Down Expand Up @@ -67,15 +69,18 @@ public ConsumerInfo getCachedConsumerInfo() {
*/
@Override
public Message next() throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
return next(DEFAULT_EXPIRES_IN_MILLIS);
return new NextSub(DEFAULT_EXPIRES_IN_MILLIS).next();
}

/**
* {@inheritDoc}
*/
@Override
public Message next(Duration maxWait) throws IOException, InterruptedException, JetStreamStatusCheckedException, JetStreamApiException {
return next(maxWait == null ? DEFAULT_EXPIRES_IN_MILLIS : maxWait.toMillis());
if (maxWait == null) {
return new NextSub(DEFAULT_EXPIRES_IN_MILLIS).next();
}
return next(maxWait.toMillis());
}

/**
Expand All @@ -86,16 +91,39 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
if (maxWaitMillis < MIN_EXPIRES_MILLS) {
throw new IllegalArgumentException("Max wait must be at least " + MIN_EXPIRES_MILLS + " milliseconds.");
}
return new NextSub(maxWaitMillis).next();
}

long expires = maxWaitMillis - EXPIRE_ADJUSTMENT;
class NextSub {
private final long maxWaitMillis;
private final NatsJetStreamPullSubscription sub;

public NextSub(long maxWaitMillis) throws JetStreamApiException, IOException {
sub = new SubscriptionMaker().makeSubscription(null);
this.maxWaitMillis = maxWaitMillis;
sub._pull(PullRequestOptions.builder(1).expiresIn(maxWaitMillis - EXPIRE_ADJUSTMENT).build(), false, null);
}

NatsJetStreamPullSubscription sub = new SubscriptionMaker().makeSubscription(null);
sub._pull(PullRequestOptions.builder(1).expiresIn(expires).build(), false, null);
try {
return sub.nextMessage(maxWaitMillis);
Message next() throws JetStreamStatusCheckedException, InterruptedException {
try {
return sub.nextMessage(maxWaitMillis);
}
catch (JetStreamStatusException e) {
throw new JetStreamStatusCheckedException(e);
}
}
catch (JetStreamStatusException e) {
throw new JetStreamStatusCheckedException(e);

@Override
protected void finalize() throws Throwable {
{
try {
sub.unsubscribe();
}
catch (Exception ignore) {
// ignored
}
super.finalize();
}
}
}

Expand Down Expand Up @@ -164,13 +192,12 @@ class SubscriptionMaker {
Dispatcher dispatcher;

public NatsJetStreamPullSubscription makeSubscription(MessageHandler messageHandler) throws IOException, JetStreamApiException {
PullSubscribeOptions pso = PullSubscribeOptions.bind(streamContext.streamName, lastConsumerInfo.getName());
if (messageHandler == null) {
return (NatsJetStreamPullSubscription)js.subscribe(null, pso);
return (NatsJetStreamPullSubscription)js.subscribe(null, bindPso);
}

dispatcher = js.conn.createDispatcher();
return (NatsJetStreamPullSubscription)js.subscribe(null, dispatcher, messageHandler, pso);
return (NatsJetStreamPullSubscription)js.subscribe(null, dispatcher, messageHandler, bindPso);
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/impl/NatsFetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,10 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
catch (JetStreamStatusException e) {
throw new JetStreamStatusCheckedException(e);
}
catch (IllegalStateException i) {
// this happens if the consumer is stopped, since it is
// drained/unsubscribed, so don't pass it on if it's expected
return null;
}
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/impl/NatsIterableConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public Message nextMessage(Duration timeout) throws InterruptedException, JetStr
catch (JetStreamStatusException e) {
throw new JetStreamStatusCheckedException(e);
}
catch (IllegalStateException i) {
// this happens if the consumer is stopped, since it is
// drained/unsubscribed, so don't pass it on if it's expected
return null;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public StreamInfo getStreamInfo(String streamName, StreamInfoOptions options) th
*/
@Override
public PurgeResponse purgeStream(String streamName) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
String subj = String.format(JSAPI_STREAM_PURGE, streamName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
return new PurgeResponse(resp).throwOnHasError();
Expand All @@ -110,6 +111,7 @@ public PurgeResponse purgeStream(String streamName) throws IOException, JetStrea
*/
@Override
public PurgeResponse purgeStream(String streamName, PurgeOptions options) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
validateNotNull(options, "Purge Options");
String subj = String.format(JSAPI_STREAM_PURGE, streamName);
byte[] body = options.toJson().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ protected Message _nextUnmanagedWaitForever(String expectedPullSubject) throws I
}
break;
}
// STATUS_HANDLED, STATUS_TERMINUS and STATUS_ERRORS that aren't for expected pullSubject: check again since waiting forever
// Check again since waiting forever when:
// 1. Any STATUS_HANDLED or STATUS_TERMINUS
// 2. STATUS_ERRORS that aren't for expected pullSubject
}
}
}
Expand All @@ -140,7 +142,9 @@ protected Message _nextUnmanagedNoWait(String expectedPullSubject) throws Interr
}
break;
}
// STATUS_HANDLED: regular messages might have arrived, check again
// Check again when, regular messages might have arrived
// 1. Any STATUS_HANDLED
// 2. STATUS_TERMINUS or STATUS_ERRORS that aren't for expected pullSubject
}
}

Expand Down

0 comments on commit ae69edb

Please sign in to comment.