receiverAsyncClient = sessionReceiver.acceptNextSession();
```
### Create a dead-letter queue Receiver
@@ -328,7 +329,7 @@ The dead-letter queue doesn't need to be explicitly created and can't be deleted
of the main entity. For session enabled or non-session queue or topic subscriptions, the dead-letter receiver can be
created the same way as shown below. Learn more about dead-letter queue [here][dead-letter-queue].
-
+
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java
index bc388dfc4232b..a3748b714d98d 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java
@@ -3,7 +3,6 @@
package com.azure.messaging.servicebus;
-import com.azure.core.util.CoreUtils;
import com.azure.messaging.servicebus.models.ReceiveMode;
import java.time.Duration;
@@ -16,28 +15,22 @@ class ReceiverOptions {
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
- private final boolean isRollingSessionReceiver;
private final Integer maxConcurrentSessions;
- private final boolean isSessionReceiver;
private final Duration maxLockRenewDuration;
ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
- this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, false, null);
+ this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
}
ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
- boolean enableAutoComplete, String sessionId, boolean isRollingSessionReceiver,
- Integer maxConcurrentSessions) {
-
+ boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.enableAutoComplete = enableAutoComplete;
this.sessionId = sessionId;
- this.isRollingSessionReceiver = isRollingSessionReceiver;
this.maxConcurrentSessions = maxConcurrentSessions;
this.maxLockRenewDuration = maxLockRenewDuration;
- this.isSessionReceiver = !CoreUtils.isNullOrEmpty(sessionId) || isRollingSessionReceiver;
}
/**
@@ -90,7 +83,7 @@ boolean isAutoLockRenewEnabled() {
* @return true if it is a session-aware receiver; false otherwise.
*/
boolean isSessionReceiver() {
- return isSessionReceiver;
+ return sessionId != null || maxConcurrentSessions != null;
}
/**
@@ -100,7 +93,7 @@ boolean isSessionReceiver() {
* false} otherwise.
*/
public boolean isRollingSessionReceiver() {
- return isRollingSessionReceiver;
+ return maxConcurrentSessions != null && maxConcurrentSessions > 0 && sessionId == null;
}
/**
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
index 5a97bf81f9912..98b47ad619e04 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
@@ -628,7 +628,6 @@ public final class ServiceBusSessionReceiverClientBuilder {
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
- private String sessionId;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
@@ -674,7 +673,7 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
*/
- public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
+ ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"maxConcurrentSessions cannot be less than 1."));
@@ -728,18 +727,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod
return this;
}
- /**
- * Sets the session id.
- *
- * @param sessionId session id.
- *
- * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
- */
- public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) {
- this.sessionId = sessionId;
- return this;
- }
-
/**
* Sets the name of the subscription in the topic to listen to. {@link #topicName(String)} must also be set.
*
@@ -780,8 +767,8 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
- public ServiceBusReceiverAsyncClient buildAsyncClient() {
- return buildAsyncClient(true);
+ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
+ return buildAsyncClientForProcessor(true);
}
/**
@@ -797,11 +784,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
- public ServiceBusReceiverClient buildClient() {
- return new ServiceBusReceiverClient(buildAsyncClient(false), retryOptions.getTryTimeout());
+ ServiceBusReceiverClient buildClientForProcessor() {
+ return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout());
}
- private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
+ private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
@@ -822,7 +809,7 @@ private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAll
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
- maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(),
+ maxAutoLockRenewDuration, enableAutoComplete, null,
maxConcurrentSessions);
final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
@@ -834,22 +821,66 @@ maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceive
}
/**
- * This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If
- * there is a sessionId, this is going to be a single, named session receiver.
+ * Creates an asynchronous, session-aware Service Bus receiver responsible for reading {@link
+ * ServiceBusMessage messages} from a specific queue or topic.
+ *
+ * @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or topic.
+ * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
+ * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
+ * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
+ * {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
+ * #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
+ * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
+ * queueName()} or {@link #topicName(String) topicName()}, respectively.
+ */
+ public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
+ return buildAsyncClient(true);
+ }
+
+ /**
+ * Creates a synchronous, session-aware Service Bus receiver responsible for reading {@link
+ * ServiceBusMessage messages} from a specific queue or topic.
*
- * @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise.
+ * @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic.
+ * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
+ * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
+ * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
+ * {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
+ * #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
+ * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
+ * queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
- private boolean isRollingSessionReceiver() {
- if (maxConcurrentSessions == null) {
- return false;
+ public ServiceBusSessionReceiverClient buildClient() {
+ return new ServiceBusSessionReceiverClient(buildAsyncClient(false),
+ retryOptions.getTryTimeout());
+ }
+
+ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
+ final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
+ queueName);
+ final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
+ SubQueue.NONE);
+
+ if (!isAutoCompleteAllowed && enableAutoComplete) {
+ logger.warning(
+ "'enableAutoComplete' is not supported in synchronous client except through callback receive.");
+ enableAutoComplete = false;
+ } else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
+ throw logger.logExceptionAsError(new IllegalStateException(
+ "'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
}
- if (maxConcurrentSessions < 1) {
- throw logger.logExceptionAsError(
- new IllegalArgumentException("Maximum number of concurrent sessions must be positive."));
+ if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
+ maxAutoLockRenewDuration = Duration.ZERO;
}
- return CoreUtils.isNullOrEmpty(sessionId);
+ final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
+ final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
+ maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);
+
+ return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
+ entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
+ ServiceBusClientBuilder.this::onClientClose);
}
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
index 6151a7387dc25..d7daa14710447 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
@@ -63,21 +63,17 @@
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode}
*
* Receive messages from a specific session
- * To fetch messages from a specific session, set {@link ServiceBusSessionReceiverClientBuilder#sessionId(String)}.
+ *
To fetch messages from a specific session, switch to {@link ServiceBusSessionReceiverClientBuilder} and
+ * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptSession(String)} to create a
+ * session-bound {@link ServiceBusReceiverAsyncClient}.
*
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
*
- * Process messages from multiple sessions
- * To process messages from multiple sessions, set
- * {@link ServiceBusSessionReceiverClientBuilder#maxConcurrentSessions(int)}. This will process in parallel at most
- * {@code maxConcurrentSessions}. In addition, when all the messages in a session have been consumed, it will find the
- * next available session to process.
- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions}
- *
* Process messages from the first available session
* To process messages from the first available session, switch to {@link ServiceBusSessionReceiverClientBuilder} and
- * build the receiver client. It will find the first available session to process messages from.
- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession}
+ * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptNextSession()} to
+ * find the first available session to process messages from.
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession}
*
* Rate limiting consumption of messages from Service Bus resource
* For message receivers that need to limit the number of messages they receive at a given time, they can use
@@ -374,28 +370,13 @@ public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOption
}
/**
- * Gets the state of a session given its identifier.
- *
- * @param sessionId Identifier of session to get.
+ * Gets the state of the session if this receiver is a session receiver.
*
* @return The session state or an empty Mono if there is no state set for the session.
* @throws IllegalStateException if the receiver is a non-session receiver.
*/
- public Mono getSessionState(String sessionId) {
- if (isDisposed.get()) {
- return monoError(logger, new IllegalStateException(
- String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState")));
- } else if (!receiverOptions.isSessionReceiver()) {
- return monoError(logger, new IllegalStateException("Cannot get session state on a non-session receiver."));
- }
-
- if (sessionManager != null) {
- return sessionManager.getSessionState(sessionId);
- } else {
- return connectionProcessor
- .flatMap(connection -> connection.getManagementNode(entityPath, entityType))
- .flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId)));
- }
+ public Mono getSessionState() {
+ return getSessionState(receiverOptions.getSessionId());
}
/**
@@ -421,12 +402,11 @@ public Mono peekMessage() {
* @throws IllegalStateException if the receiver is disposed.
* @see Message browsing
*/
- public Mono peekMessage(String sessionId) {
+ Mono peekMessage(String sessionId) {
if (isDisposed.get()) {
return monoError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peek")));
}
-
return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(channel -> {
@@ -467,12 +447,11 @@ public Mono peekMessageAt(long sequenceNumber) {
* @return A peeked {@link ServiceBusReceivedMessage}.
* @see Message browsing
*/
- public Mono peekMessageAt(long sequenceNumber, String sessionId) {
+ Mono peekMessageAt(long sequenceNumber, String sessionId) {
if (isDisposed.get()) {
return monoError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt")));
}
-
return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(node -> node.peek(sequenceNumber, sessionId, getLinkName(sessionId)));
@@ -501,7 +480,7 @@ public Flux peekMessages(int maxMessages) {
* @throws IllegalArgumentException if {@code maxMessages} is not a positive integer.
* @see Message browsing
*/
- public Flux peekMessages(int maxMessages, String sessionId) {
+ Flux peekMessages(int maxMessages, String sessionId) {
if (isDisposed.get()) {
return fluxError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch")));
@@ -564,7 +543,7 @@ public Flux peekMessagesAt(int maxMessages, long sequ
* @throws IllegalArgumentException if {@code maxMessages} is not a positive integer.
* @see Message browsing
*/
- public Flux peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) {
+ Flux peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) {
if (isDisposed.get()) {
return fluxError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt")));
@@ -637,7 +616,7 @@ public Mono receiveDeferredMessage(long sequenceNumbe
*
* @return A deferred message with the matching {@code sequenceNumber}.
*/
- public Mono receiveDeferredMessage(long sequenceNumber, String sessionId) {
+ Mono receiveDeferredMessage(long sequenceNumber, String sessionId) {
return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMap(node -> node.receiveDeferredMessages(receiverOptions.getReceiveMode(),
@@ -677,13 +656,12 @@ public Flux receiveDeferredMessages(Iterable se
*
* @return An {@link IterableStream} of deferred {@link ServiceBusReceivedMessage messages}.
*/
- public Flux receiveDeferredMessages(Iterable sequenceNumbers,
+ Flux receiveDeferredMessages(Iterable sequenceNumbers,
String sessionId) {
if (isDisposed.get()) {
return fluxError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch")));
}
-
return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
.flatMapMany(node -> node.receiveDeferredMessages(receiverOptions.getReceiveMode(),
@@ -794,34 +772,18 @@ public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration m
}
/**
- * Renews the session lock.
- *
- * @param sessionId Identifier of session to get.
+ * Renews the session lock if this receiver is a session receiver.
*
* @return The next expiration time for the session lock.
* @throws IllegalStateException if the receiver is a non-session receiver.
*/
- public Mono renewSessionLock(String sessionId) {
- if (isDisposed.get()) {
- return monoError(logger, new IllegalStateException(
- String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
- } else if (!receiverOptions.isSessionReceiver()) {
- return monoError(logger, new IllegalStateException("Cannot renew session lock on a non-session receiver."));
- }
-
- final String linkName = sessionManager != null
- ? sessionManager.getLinkName(sessionId)
- : null;
-
- return connectionProcessor
- .flatMap(connection -> connection.getManagementNode(entityPath, entityType))
- .flatMap(channel -> channel.renewSessionLock(sessionId, linkName));
+ public Mono renewSessionLock() {
+ return renewSessionLock(receiverOptions.getSessionId());
}
/**
- * Starts the auto lock renewal for a session id.
+ * Starts the auto lock renewal for the session this receiver works for.
*
- * @param sessionId Id for the session to renew.
* @param maxLockRenewalDuration Maximum duration to keep renewing the session lock.
*
* @return A lock renewal operation for the message.
@@ -829,55 +791,20 @@ public Mono renewSessionLock(String sessionId) {
* @throws IllegalArgumentException if {@code sessionId} is an empty string.
* @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed.
*/
- public Mono renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
- if (isDisposed.get()) {
- return monoError(logger, new IllegalStateException(
- String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock")));
- } else if (!receiverOptions.isSessionReceiver()) {
- return monoError(logger, new IllegalStateException(
- "Cannot renew session lock on a non-session receiver."));
- } else if (maxLockRenewalDuration == null) {
- return monoError(logger, new NullPointerException("'maxLockRenewalDuration' cannot be null."));
- } else if (maxLockRenewalDuration.isNegative()) {
- return monoError(logger, new IllegalArgumentException(
- "'maxLockRenewalDuration' cannot be negative."));
- } else if (Objects.isNull(sessionId)) {
- return monoError(logger, new NullPointerException("'sessionId' cannot be null."));
- } else if (sessionId.isEmpty()) {
- return monoError(logger, new IllegalArgumentException("'sessionId' cannot be empty."));
- }
-
- final LockRenewalOperation operation = new LockRenewalOperation(sessionId, maxLockRenewalDuration, true,
- this::renewSessionLock);
-
- renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
- return operation.getCompletionOperation();
+ public Mono renewSessionLock(Duration maxLockRenewalDuration) {
+ return this.renewSessionLock(receiverOptions.getSessionId(), maxLockRenewalDuration);
}
/**
- * Sets the state of a session given its identifier.
+ * Sets the state of the session this receiver works for.
*
- * @param sessionId Identifier of session to get.
* @param sessionState State to set on the session.
*
* @return A Mono that completes when the session is set
* @throws IllegalStateException if the receiver is a non-session receiver.
*/
- public Mono setSessionState(String sessionId, byte[] sessionState) {
- if (isDisposed.get()) {
- return monoError(logger, new IllegalStateException(
- String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState")));
- } else if (!receiverOptions.isSessionReceiver()) {
- return monoError(logger, new IllegalStateException("Cannot set session state on a non-session receiver."));
- }
-
- final String linkName = sessionManager != null
- ? sessionManager.getLinkName(sessionId)
- : null;
-
- return connectionProcessor
- .flatMap(connection -> connection.getManagementNode(entityPath, entityType))
- .flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName));
+ public Mono setSessionState(byte[] sessionState) {
+ return this.setSessionState(receiverOptions.getSessionId(), sessionState);
}
/**
@@ -1136,4 +1063,76 @@ private String getLinkName(String sessionId) {
return existing != null ? existing.getLinkName() : null;
}
}
+
+ Mono renewSessionLock(String sessionId) {
+ if (isDisposed.get()) {
+ return monoError(logger, new IllegalStateException(
+ String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
+ } else if (!receiverOptions.isSessionReceiver()) {
+ return monoError(logger, new IllegalStateException("Cannot renew session lock on a non-session receiver."));
+ }
+ final String linkName = sessionManager != null
+ ? sessionManager.getLinkName(sessionId)
+ : null;
+
+ return connectionProcessor
+ .flatMap(connection -> connection.getManagementNode(entityPath, entityType))
+ .flatMap(channel -> channel.renewSessionLock(sessionId, linkName));
+ }
+
+ Mono renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
+ if (isDisposed.get()) {
+ return monoError(logger, new IllegalStateException(
+ String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock")));
+ } else if (!receiverOptions.isSessionReceiver()) {
+ return monoError(logger, new IllegalStateException(
+ "Cannot renew session lock on a non-session receiver."));
+ } else if (maxLockRenewalDuration == null) {
+ return monoError(logger, new NullPointerException("'maxLockRenewalDuration' cannot be null."));
+ } else if (maxLockRenewalDuration.isNegative()) {
+ return monoError(logger, new IllegalArgumentException(
+ "'maxLockRenewalDuration' cannot be negative."));
+ } else if (Objects.isNull(sessionId)) {
+ return monoError(logger, new NullPointerException("'sessionId' cannot be null."));
+ } else if (sessionId.isEmpty()) {
+ return monoError(logger, new IllegalArgumentException("'sessionId' cannot be empty."));
+ }
+ final LockRenewalOperation operation = new LockRenewalOperation(sessionId,
+ maxLockRenewalDuration, true, this::renewSessionLock);
+
+ renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
+ return operation.getCompletionOperation();
+ }
+
+ Mono setSessionState(String sessionId, byte[] sessionState) {
+ if (isDisposed.get()) {
+ return monoError(logger, new IllegalStateException(
+ String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState")));
+ } else if (!receiverOptions.isSessionReceiver()) {
+ return monoError(logger, new IllegalStateException("Cannot set session state on a non-session receiver."));
+ }
+ final String linkName = sessionManager != null
+ ? sessionManager.getLinkName(sessionId)
+ : null;
+
+ return connectionProcessor
+ .flatMap(connection -> connection.getManagementNode(entityPath, entityType))
+ .flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName));
+ }
+
+ Mono getSessionState(String sessionId) {
+ if (isDisposed.get()) {
+ return monoError(logger, new IllegalStateException(
+ String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState")));
+ } else if (!receiverOptions.isSessionReceiver()) {
+ return monoError(logger, new IllegalStateException("Cannot get session state on a non-session receiver."));
+ }
+ if (sessionManager != null) {
+ return sessionManager.getSessionState(sessionId);
+ } else {
+ return connectionProcessor
+ .flatMap(connection -> connection.getManagementNode(entityPath, entityType))
+ .flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId)));
+ }
+ }
}
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
index 38c550a96a151..66acf0c326d57 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
@@ -210,15 +210,13 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions opti
}
/**
- * Gets the state of a session given its identifier.
- *
- * @param sessionId Identifier of session to get.
+ * Gets the state of the session if this receiver is a session receiver.
*
* @return The session state or null if there is no state set for the session.
* @throws IllegalStateException if the receiver is a non-session receiver.
*/
- public byte[] getSessionState(String sessionId) {
- return asyncClient.getSessionState(sessionId).block(operationTimeout);
+ public byte[] getSessionState() {
+ return this.getSessionState(asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -230,7 +228,7 @@ public byte[] getSessionState(String sessionId) {
* @see Message browsing
*/
public ServiceBusReceivedMessage peekMessage() {
- return asyncClient.peekMessage().block(operationTimeout);
+ return this.peekMessage(asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -243,10 +241,9 @@ public ServiceBusReceivedMessage peekMessage() {
* @return A peeked {@link ServiceBusReceivedMessage}.
* @see Message browsing
*/
- public ServiceBusReceivedMessage peekMessage(String sessionId) {
+ ServiceBusReceivedMessage peekMessage(String sessionId) {
return asyncClient.peekMessage(sessionId).block(operationTimeout);
}
-
/**
* Starting from the given sequence number, reads next the active message without changing the state of the receiver
* or the message source.
@@ -257,7 +254,7 @@ public ServiceBusReceivedMessage peekMessage(String sessionId) {
* @see Message browsing
*/
public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) {
- return asyncClient.peekMessageAt(sequenceNumber).block(operationTimeout);
+ return this.peekMessageAt(sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -270,7 +267,7 @@ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) {
* @return A peeked {@link ServiceBusReceivedMessage}.
* @see Message browsing
*/
- public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessionId) {
+ ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessionId) {
return asyncClient.peekMessageAt(sequenceNumber, sessionId).block(operationTimeout);
}
@@ -284,18 +281,7 @@ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessi
* @see Message browsing
*/
public IterableStream peekMessages(int maxMessages) {
- if (maxMessages <= 0) {
- throw logger.logExceptionAsError(new IllegalArgumentException(
- "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
- }
-
- final Flux messages = asyncClient.peekMessages(maxMessages)
- .timeout(operationTimeout);
-
- // Subscribe so we can kick off this operation.
- messages.subscribe();
-
- return new IterableStream<>(messages);
+ return this.peekMessages(maxMessages, asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -308,7 +294,7 @@ public IterableStream peekMessages(int maxMessages) {
* @throws IllegalArgumentException if {@code maxMessages} is not a positive integer.
* @see Message browsing
*/
- public IterableStream peekMessages(int maxMessages, String sessionId) {
+ IterableStream peekMessages(int maxMessages, String sessionId) {
if (maxMessages <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
@@ -335,18 +321,7 @@ public IterableStream peekMessages(int maxMessages, S
* @see Message browsing
*/
public IterableStream peekMessagesAt(int maxMessages, long sequenceNumber) {
- if (maxMessages <= 0) {
- throw logger.logExceptionAsError(new IllegalArgumentException(
- "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
- }
-
- final Flux messages = asyncClient.peekMessagesAt(maxMessages, sequenceNumber)
- .timeout(operationTimeout);
-
- // Subscribe so we can kick off this operation.
- messages.subscribe();
-
- return new IterableStream<>(messages);
+ return this.peekMessagesAt(maxMessages, sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -361,7 +336,7 @@ public IterableStream peekMessagesAt(int maxMessages,
* @throws IllegalArgumentException if {@code maxMessages} is not a positive integer.
* @see Message browsing
*/
- public IterableStream peekMessagesAt(int maxMessages, long sequenceNumber,
+ IterableStream peekMessagesAt(int maxMessages, long sequenceNumber,
String sessionId) {
if (maxMessages <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(
@@ -431,7 +406,7 @@ public IterableStream receiveMessages(int maxM
* @return A deferred message with the matching {@code sequenceNumber}.
*/
public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber) {
- return asyncClient.receiveDeferredMessage(sequenceNumber).block(operationTimeout);
+ return this.receiveDeferredMessage(sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -444,7 +419,7 @@ public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber) {
*
* @return A deferred message with the matching {@code sequenceNumber}.
*/
- public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, String sessionId) {
+ ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, String sessionId) {
return asyncClient.receiveDeferredMessage(sequenceNumber, sessionId).block(operationTimeout);
}
@@ -457,13 +432,7 @@ public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, Str
* @return An {@link IterableStream} of deferred {@link ServiceBusReceivedMessage messages}.
*/
public IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers) {
- final Flux messages = asyncClient.receiveDeferredMessages(sequenceNumbers)
- .timeout(operationTimeout);
-
- // Subscribe so we can kick off this operation.
- messages.subscribe();
-
- return new IterableStream<>(messages);
+ return this.receiveDeferredMessageBatch(sequenceNumbers, asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -475,7 +444,7 @@ public IterableStream