Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Allow 0 prefetch and dynamically use batch size to request link credits #17546

Merged
merged 33 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fd1843a
Link credit size adjustment
YijunXieMS Nov 12, 2020
deadef5
Add API to block and flow link credit to the service.
YijunXieMS Nov 13, 2020
945f1c8
Use addCreditsBlocking instead of addCredits to avoid too many credit…
YijunXieMS Nov 13, 2020
98d07dd
Request fewer credits in the 2nd request if the 1st request returns f…
YijunXieMS Nov 13, 2020
ef52f71
Rename addCreditsBlocking to addCreditsInstantly
YijunXieMS Nov 13, 2020
36c4302
Update amqp-core dependency version
YijunXieMS Nov 13, 2020
1a5c4c4
Use remaining instead of bufferMessages.size() to calculate num of re…
YijunXieMS Nov 13, 2020
d43457e
Add backpressure in request for async receiveMessages()
YijunXieMS Nov 16, 2020
34c70ce
Use limitRate instead of autoConnect for back pressure.
YijunXieMS Nov 16, 2020
e7aa8bc
rename receiveMessagesNoConnection to receiveMessagesNoBackPressure
YijunXieMS Nov 16, 2020
152e193
Add back pressure, adjust link credits using prefetch and request siz…
YijunXieMS Nov 17, 2020
65cdcf6
Fix unit test
YijunXieMS Nov 18, 2020
c4a14f0
Merge branch 'master' into sb_receiver_tuning
YijunXieMS Nov 18, 2020
41d08ab
Small change (add final to variable)
YijunXieMS Nov 18, 2020
124baab
Add unreleased core-amqp
YijunXieMS Nov 18, 2020
5743dcb
SessionReceiver uses reactor prefetch 1 instead of default 256
YijunXieMS Nov 18, 2020
8986b2f
Dispose SynchronousMessageSubscriber before closing async client in s…
YijunXieMS Nov 18, 2020
39bac24
Prefetch 1 in instead fo default 256
YijunXieMS Nov 19, 2020
5572e15
Add some code comments
YijunXieMS Nov 19, 2020
530e258
set low tide 0 in limitRate() to disable replenish
YijunXieMS Nov 19, 2020
aee463c
use limitRate in async client and remove the limit in processor
YijunXieMS Nov 19, 2020
bafc0e4
autoConnect in receiveMessages() so it can be subscribed multiple times.
YijunXieMS Nov 19, 2020
037cbbc
Enable subscribe twice test.
YijunXieMS Nov 19, 2020
3774d66
Merge branch 'master' into sb_receiver_tuning
YijunXieMS Nov 19, 2020
09fca43
Use publish / autoConnect to support multiple subscribers.
YijunXieMS Nov 20, 2020
fe7ee04
put addCreditInstantly in synchronized block.
YijunXieMS Nov 20, 2020
59aee7b
Format change for checkstyle
YijunXieMS Nov 20, 2020
8ea5ba6
Use addCredits instead of addCreditsInstantly.
YijunXieMS Nov 20, 2020
24e3421
Use addCredits instead of addCreditsInstantly
YijunXieMS Nov 20, 2020
de311a8
Remove autoConnect
YijunXieMS Nov 20, 2020
29085d3
Remove test case that subscribe receiveMessages() twice.
YijunXieMS Nov 20, 2020
4c837b5
Use addCredits instead of addCreditsInstantly (update test)
YijunXieMS Nov 20, 2020
3c23542
Checkstyle
YijunXieMS Nov 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface AmqpReceiveLink extends AmqpLink {
Flux<Message> receive();

/**
* Adds the specified number of credits to the link.
* Schedule to adds the specified number of credits to the link.
*
* The number of link credits initialises to zero. It is the application's responsibility to call this method to
* allow the receiver to receive {@code credits} more deliveries.
Expand All @@ -34,6 +34,21 @@ public interface AmqpReceiveLink extends AmqpLink {
*/
void addCredits(int credits);

/**
* Adds the specified number of credits to the link.
*
* The number of link credits initialises to zero. It is the application's responsibility to call this method to
* allow the receiver to receive {@code credits} more deliveries.
*
* It will update the credits in local memory instantly so {@link #getCredits()} will get
* the updated credits immediately. But the service side may get the credits added with a latency.
* As a contrast, {@link #getCredits()} may return an unchanged value for a short while after
* {@link #addCredits(int)} is called to schedule the credit addition and before the job dispatcher executes it.
*
* @param credits Number of credits to add to the receive link.
*/
void addCreditsInstantly(int credits);

/**
* Gets the current number of credits this link has.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public void addCredits(int credits) {
}
}

@Override
public void addCreditsInstantly(int credits) {
receiver.flow(credits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the calls into Proton-j Reactor is done via ReactorDispatcher because of thread safety,
As commented here ..
https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorDispatcher.java#L22
Would this be okay to call this api directly ? @srnagar Would this be okay here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be tested for thread-safety. If proton-j doesn't support adding credits in a thread-safe manner we might add incorrect number of credits to the link and can potentially result in data loss if the SB mode is RECEIVE_AND_DELETE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply put it into a synchronized block. The performance overhead should be minimal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed back to addCredits()

}

@Override
public int getCredits() {
return receiver.getRemoteCredit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
this.linkProcessor = linkProcessor;
this.messageSerializer = messageSerializer;
this.processor = linkProcessor
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
.publish(receiverOptions.getPrefetchCount())
.autoConnect(1);
Comment on lines -38 to -39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Contributor Author

@YijunXieMS YijunXieMS Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed because the publish( ) method requests the upstream (link processor) with its own request count (the reactor prefetch). With publish(), the SynchronousMessageSubscriber can't pass its own request count to link processor. The request is used to adjust link credits when prefetch = 0 (default value).

Updated to still remove publish() from this place and updated the async client's receiveMessages() to publish and autoConnect. So the user can subscribe it more than once. For SynchronousMessageSubscriber, I use an internal API to avoid the publish() side effect.

.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class ServiceBusClientBuilder {

// Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
// receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
private static final int DEFAULT_PREFETCH_COUNT = 1;
private static final int DEFAULT_PREFETCH_COUNT = 0;
private static final String NAME_KEY = "name";
private static final String VERSION_KEY = "version";
private static final String UNKNOWN = "UNKNOWN";
Expand Down Expand Up @@ -670,11 +670,13 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc

/**
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 0.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application starts the processor.
* Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.
* Using a non-zero prefetch risks of losing messages even though it has better performance.
* @see <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch">Service Bus Prefetch</a>
*
* @param prefetchCount The prefetch count.
*
Expand Down Expand Up @@ -1442,9 +1444,9 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
}

private void validateAndThrow(int prefetchCount) {
if (prefetchCount < 1) {
if (prefetchCount < 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
"prefetchCount (%s) cannot be less than 0.", prefetchCount)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,18 @@ Flux<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNum
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
*/
public Flux<ServiceBusReceivedMessage> receiveMessages() {
return receiveMessagesWithContext()
// Without limitRate(), if the user calls receiveMessages().subscribe(), it will call
// ServiceBusReceiveLinkProcessor.request(long request) where request = Long.MAX_VALUE.
// We turn this one-time non-backpressure request to continuous requests with backpressure.
// If receiverOptions.prefetchCount is set to non-zero, it will be passed to ServiceBusReceiveLinkProcessor
// to auto-refill the prefetch buffer. A request will retrieve one message from this buffer.
// If receiverOptions.prefetchCount is 0 (default value),
// the request will add a link credit so one message is retrieved from the service.
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
Expand All @@ -598,6 +609,10 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
*/
Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
return receiveMessagesWithContext(1);
}

Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
final Flux<ServiceBusMessageContext> messageFlux = sessionManager != null
? sessionManager.receive()
: getOrCreateConsumer().receive().map(ServiceBusMessageContext::new);
Expand All @@ -610,16 +625,19 @@ Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
withAutoLockRenewal = messageFlux;
}

final Flux<ServiceBusMessageContext> withAutoComplete;
Flux<ServiceBusMessageContext> result;
if (receiverOptions.isEnableAutoComplete()) {
withAutoComplete = new FluxAutoComplete(withAutoLockRenewal, completionLock,
result = new FluxAutoComplete(withAutoLockRenewal, completionLock,
context -> context.getMessage() != null ? complete(context.getMessage()) : Mono.empty(),
context -> context.getMessage() != null ? abandon(context.getMessage()) : Mono.empty());
} else {
withAutoComplete = withAutoLockRenewal;
result = withAutoLockRenewal;
}

return withAutoComplete
if (highTide > 0) {
result = result.limitRate(highTide, 0);
}
return result
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,12 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
*/
@Override
public void close() {
asyncClient.close();

SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
messageSubscriber.dispose();
}

asyncClient.close();
}

/**
Expand All @@ -590,19 +590,20 @@ public void close() {
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessage> emitter) {
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);

final int prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
final int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount;
final SynchronousReceiveWork work = new SynchronousReceiveWork(id,
toRequest,
maxWaitTime, emitter);
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
if (messageSubscriber == null) {
long prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work);

SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(toRequest, work);
if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
newSubscriber.dispose();
SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get();
existing.queueWork(work);
} else {
asyncClient.receiveMessages().subscribeWith(newSubscriber);
asyncClient.receiveMessagesNoBackPressure().subscribeWith(newSubscriber);
}
} else {
messageSubscriber.queueWork(work);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
onSessionRequest(1L);
}
}))
.publishOn(scheduler);
.publishOn(scheduler, 1);
}

private Mono<ServiceBusManagementNode> getManagementNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,23 @@ class ServiceBusSessionReceiver implements AutoCloseable {
this.receiveLink = receiveLink;
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);

receiveLink.setEmptyCreditListener(() -> 1);
receiveLink.setEmptyCreditListener(() -> 0);

final Flux<ServiceBusMessageContext> receivedMessagesFlux = receiveLink
.receive()
.publishOn(scheduler)
.doOnSubscribe(subscription -> {
logger.verbose("Adding prefetch to receive link.");
receiveLink.addCredits(prefetch);
if (prefetch > 0) {
receiveLink.addCredits(prefetch);
}
})
.doOnRequest(request -> { // request is of type long.
if (prefetch == 0) { // add "request" number of credits
receiveLink.addCredits((int) request);
} else { // keep total credits "prefetch" if prefetch is not 0.
receiveLink.addCredits(Math.max(0, prefetch - receiveLink.getCredits()));
}
})
.takeUntilOther(cancelReceiveProcessor)
.map(message -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ protected void hookOnSubscribe(Subscription subscription) {

if (Operators.setOnce(UPSTREAM, this, subscription)) {
this.subscription = subscription;
remaining.addAndGet(requested);
subscription.request(requested);
subscriberInitialized = true;
drain();
} else {
Expand Down Expand Up @@ -140,7 +138,6 @@ private void drainQueue() {

while ((currentWork = workQueue.peek()) != null
&& (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) {

// Additional check for safety, but normally this work should never be terminal
if (currentWork.isTerminal()) {
// This work already finished by either timeout or no more messages to send, process next work.
Expand All @@ -155,6 +152,9 @@ private void drainQueue() {
// timer to complete the currentWork in case of timeout trigger
currentTimeoutOperation = getTimeoutOperation(currentWork);
currentWork.startedProcessing();
final long calculatedRequest = currentWork.getNumberOfEvents() - remaining.get();
remaining.addAndGet(calculatedRequest);
subscription.request(calculatedRequest);
}

// Send messages to currentWork from buffer
Expand All @@ -174,15 +174,6 @@ private void drainQueue() {
currentTimeoutOperation.dispose();
}
logger.verbose("The work [{}] is complete.", currentWork.getId());
} else {
// Since this work is not complete, find out how much we should request from upstream
long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size());
if (creditToAdd > 0) {
remaining.addAndGet(creditToAdd);
subscription.request(creditToAdd);
logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd,
currentWork.getId());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
private final Object queueLock = new Object();
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final AtomicInteger retryAttempts = new AtomicInteger();
private final AtomicBoolean linkCreditsAdded = new AtomicBoolean();
private final AtomicReference<String> linkName = new AtomicReference<>();

// Queue containing all the prefetched messages.
Expand Down Expand Up @@ -200,12 +199,7 @@ public void onNext(ServiceBusReceiveLink next) {
oldSubscription = currentLinkSubscriptions;

currentLink = next;
next.setEmptyCreditListener(() -> {
final int creditsToAdd = getCreditsToAdd(0);
linkCreditsAdded.set(creditsToAdd > 0);

return creditsToAdd;
});
next.setEmptyCreditListener(() -> 0);
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved

currentLinkSubscriptions = Disposables.composite(
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
Expand Down Expand Up @@ -499,6 +493,9 @@ private void drainQueue() {
if (receiveMode != ServiceBusReceiveMode.PEEK_LOCK) {
pendingMessages.decrementAndGet();
}
if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured.
checkAndAddCredits(currentLink);
}
} catch (Exception e) {
logger.error("Exception occurred while handling downstream onNext operation.", e);
throw logger.logExceptionAsError(Exceptions.propagate(
Expand Down Expand Up @@ -545,18 +542,14 @@ private void checkAndAddCredits(AmqpReceiveLink link) {
return;
}

// Credits have already been added to the link. We won't try again.
if (linkCreditsAdded.getAndSet(true)) {
return;
}

final int credits = getCreditsToAdd(link.getCredits());
linkCreditsAdded.set(credits > 0);

logger.info("Link credits to add. Credits: '{}'", credits);
synchronized (lock) {
final int linkCredits = link.getCredits();
final int credits = getCreditsToAdd(linkCredits);
logger.info("Link credits='{}', Link credits to add: '{}'", linkCredits, credits);

if (credits > 0) {
link.addCredits(credits);
if (credits > 0) {
link.addCredits(credits);
}
}
}

Expand All @@ -571,22 +564,40 @@ private int getCreditsToAdd(int linkCredits) {
}

final int creditsToAdd;
if (messageQueue.isEmpty() && !hasBackpressure) {
creditsToAdd = prefetch;
final int expectedTotalCredit;
if (prefetch == 0) {
if (r <= Integer.MAX_VALUE) {
expectedTotalCredit = (int) r;
} else {
//This won't really happen in reality.
//For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);".
//So it will request one by one from this link processor, even though the user's request has no
//back pressure.
//For sync client, the sync subscriber has back pressure.
//The request count uses the the argument of method receiveMessages(int maxMessages).
//It's at most Integer.MAX_VALUE.
expectedTotalCredit = Integer.MAX_VALUE;
YijunXieMS marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;

if (hasBackpressure) {
creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(prefetch - pending, 1)
: 0;
}
expectedTotalCredit = prefetch;
}
logger.info("linkCredits: '{}', expectedTotalCredit: '{}'", linkCredits, expectedTotalCredit);

synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;

if (hasBackpressure) {
creditsToAdd = Math.max(expectedTotalCredit - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(expectedTotalCredit - pending, 0)
: 0;
}
logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:"
+ "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'", getPrefetch(), r, linkCredits,
expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size());
}

return creditsToAdd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void invalidPrefetch() {
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK);

// Act & Assert
assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(0));
assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(-1));
}

@MethodSource("getProxyConfigurations")
Expand Down
Loading