From d6cb76ed04e3dfe56bb46ca12a608f9c735e055c Mon Sep 17 00:00:00 2001 From: rickardoberg Date: Tue, 17 Dec 2024 15:00:16 +0800 Subject: [PATCH] Change the blocking strategy to use a reactive backpressure implementation where requests are sent to the server. Amortize the cost by only sending requests at most every 512 request, unless the requests are all small. --- .../com/eventstore/dbclient/AbstractRead.java | 4 +- .../eventstore/dbclient/ReadSubscription.java | 46 +++++++++++++------ 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java index 3f9df65a..8fd05e80 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/AbstractRead.java @@ -35,10 +35,9 @@ protected AbstractRead(GrpcClient client, OptionsBase options) { @SuppressWarnings("unchecked") public void subscribe(Subscriber subscriber) { ReadSubscription readSubscription = new ReadSubscription(subscriber); - subscriber.onSubscribe(readSubscription); - CompletableFuture result = new CompletableFuture<>(); this.client.run(channel -> { + StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder() .setOptions(createOptions()) .build(); @@ -108,6 +107,7 @@ private void handleError(Throwable t) { readSubscription.onError(t); } }); + subscriber.onSubscribe(readSubscription); return result; }).exceptionally(t -> { readSubscription.onError(t); diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ReadSubscription.java b/db-client-java/src/main/java/com/eventstore/dbclient/ReadSubscription.java index df4ddf5b..b5b538ff 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ReadSubscription.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ReadSubscription.java @@ -8,17 +8,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; + +import static java.lang.Math.min; class ReadSubscription implements Subscription { private final Subscriber subscriber; private ClientCallStreamObserver streamObserver; private final AtomicLong requested = new AtomicLong(0); + private final AtomicLong outstandingRequested = new AtomicLong(0); private final AtomicBoolean terminated = new AtomicBoolean(false); - private final Lock lock = new ReentrantLock(); - private final Condition hasRequested = lock.newCondition(); ReadSubscription(Subscriber subscriber) { this.subscriber = subscriber; @@ -26,6 +24,7 @@ class ReadSubscription implements Subscription { public void setStreamObserver(ClientCallStreamObserver streamObserver) { this.streamObserver = streamObserver; + streamObserver.disableAutoRequestWithInitial(0); } public void onError(Throwable error) { @@ -42,15 +41,11 @@ public void onError(Throwable error) { } public void onNext(ReadMessage message) { - lock.lock(); - while (requested.get() == 0 && !terminated.get()) { - hasRequested.awaitUninterruptibly(); - } if (!terminated.get()) { + outstandingRequested.decrementAndGet(); subscriber.onNext(message); - requested.decrementAndGet(); + request0(0); } - lock.unlock(); } public void onCompleted() { @@ -64,10 +59,31 @@ public void request(long n) { if (n <= 0) { subscriber.onError(new IllegalArgumentException("non-positive subscription request: " + n)); } - lock.lock(); - requested.updateAndGet(current -> current + n); - hasRequested.signal(); - lock.unlock(); + + request0(n); + } + + private void request0(long n) + { + long bufferRequestSize = 512*3/4; + long currentRequested = requested.addAndGet(n); + long toRequest = outstandingRequested.get(); + if (currentRequested > 0) + { + if (toRequest == 0) + { + toRequest = min(currentRequested, 512); + } else if (toRequest <= bufferRequestSize) + { + toRequest = min(currentRequested, 512-toRequest); + } else { + return; + } + + requested.addAndGet(-toRequest); + outstandingRequested.addAndGet(toRequest); + streamObserver.request((int)toRequest); + } } @Override