Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End to End TLS SSL step #8 - Add support for PEM based certificates #17019

Merged
merged 38 commits into from
Nov 12, 2020
Merged
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e61d0ab
Enable revapi plugin
mnriem Oct 28, 2020
cc52f55
Enable revapi plugin
mnriem Oct 28, 2020
adb4daf
Adding initial plumbing for user-assigned managed identity
mnriem Oct 29, 2020
b86e7fa
Added support for user-assigned managed identity
mnriem Oct 30, 2020
46b637c
Communication - Purchase Search Follow-up PR (#16887)
jbeauregardb Oct 28, 2020
82cc38f
[Service Bus] Prepare tracing methods for message processor and sched…
YijunXieMS Oct 28, 2020
3307b60
Prepare Azure Core for November 2020 Release (#16924)
alzimmermsft Oct 28, 2020
f6a2000
Sb track2 schedule multiple message validate batch size (#16767)
hemanttanwar Oct 29, 2020
1ff1621
Mgmt: generate avs 2020 03 (#16954)
ChenTanyi Oct 29, 2020
c7a0679
mgmt rename session records by azure-core-test changes (#16921)
xseeseesee Oct 29, 2020
66d9527
Fixing live test failures in Event Hubs. (#16934)
conniey Oct 29, 2020
2b73ce8
Sb t2 schedule multiple message validate batch size (#16959)
hemanttanwar Oct 29, 2020
e2fd457
update MSI usage doc for service bus multi-binder sample (#16736)
Oct 29, 2020
23213a9
Fix redactor to skip redaction for empty key-value pairs values (#16943)
samvaity Oct 29, 2020
ecc9d52
Remove JsonPatchDocument.getOperations(), JsonPatchOperation, JsonPat…
alzimmermsft Oct 29, 2020
8be3526
Delete unused pipeline (#16945)
mikeharder Oct 29, 2020
3d6f053
Update CHANGELOG dates and added new CHANGELOG updates (#16967)
alzimmermsft Oct 29, 2020
321ccb2
Update CommunicationClientCredential.java (#16966)
chrwhit Oct 29, 2020
595b139
Communication - Added release phone number LRO (#16821)
jbeauregardb Oct 29, 2020
e2d4503
[Service Bus] Change getter of boolean values to isXyz() (#15890)
YijunXieMS Oct 29, 2020
16ba326
Fixed diagnostics information and other APIs on cosmos stored procedu…
kushagraThapar Oct 29, 2020
cc20791
Sync eng/common directory with azure-sdk-tools for PR 1146 (#16968)
azure-sdk Oct 29, 2020
6ae58f4
Update VM OS Image (#16976)
alzimmermsft Oct 29, 2020
18c3030
Fix digital twins client not deserializing date times correctly (#16975)
timtay-microsoft Oct 29, 2020
a300aa4
Addressing SpotBugs issues (#16894)
mnriem Oct 30, 2020
a717801
Add manual merge instructions to eng/common workflow (#16971)
azure-sdk Oct 30, 2020
e42ac84
Add PR Validation for Long Paths (#16980)
alzimmermsft Oct 30, 2020
996f3d1
Add etag property to BasicRelationship (#16981)
timtay-microsoft Oct 30, 2020
945d7dd
Test the common Generate_docindex scripts in each lang repo. (#16974)
sima-zhu Oct 30, 2020
4ed9c17
mgmt, appservice onedeploy (#16957)
weidongxu-microsoft Oct 30, 2020
8152105
Increment version for core releases (#17004)
azure-sdk Oct 30, 2020
f7796e2
Close client and check for link id when link is stolen (#16977)
srnagar Oct 30, 2020
bd750d5
Add ServiceBus Session Receiver Client (#16690)
YijunXieMS Oct 30, 2020
0106095
API ServiceBusErrorSource to represent source of error (#16710)
hemanttanwar Oct 30, 2020
895ee2a
[Service Bus] Migration Guide (#17003)
ramya-rao-a Oct 30, 2020
aad67fe
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
mnriem Oct 30, 2020
5f2c8b9
Add PEM support
mnriem Nov 2, 2020
047fcf8
Merge branch 'master' into end-to-end-tls-ssl-8
Nov 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add ServiceBus Session Receiver Client (#16690)
  • Loading branch information
YijunXieMS authored and mnriem committed Oct 30, 2020
commit bd750d5fe4a84bf5e255bf3558412927f13acbaa
Original file line number Diff line number Diff line change
@@ -167,6 +167,10 @@
the main ServiceBusClientBuilder. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="ServiceBusClientBuilder.java"/>

<!-- Suppress method name "acceptNextSession" and "acceptSession" for ServiceBusSessionReceiverAsyncClient and ServiceBusSessionReceiverClient-->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiverAsyncClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiverClient.java"/>

<!-- Some classes are named *Builder but are not @ServiceClientBuilder -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.core.http.netty.NettyAsyncHttpClientBuilder"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" files="com.azure.core.http.okhttp.OkHttpAsyncHttpClientBuilder"/>
29 changes: 15 additions & 14 deletions sdk/servicebus/azure-messaging-servicebus/README.md
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ Both the asynchronous and synchronous Service Bus sender and receiver clients ar
`ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver,
respectively.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L29-L33 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L30-L34 -->
```java
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
@@ -66,7 +66,7 @@ ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.buildClient();
```

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L40-L45 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L41-L46 -->
```java
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
@@ -102,7 +102,7 @@ refer to [the associated documentation][aad_authorization].

Use the returned token credential to authenticate the client:

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L52-L58 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L53-L59 -->
```java
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
@@ -152,7 +152,7 @@ a topic.
The snippet below creates a synchronous [`ServiceBusSenderClient`][ServiceBusSenderClient] to publish a message to a
queue.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L65-L77 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L66-L78 -->
```java
ServiceBusSenderClient sender = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
@@ -187,7 +187,7 @@ queue or topic/subscriber.
The snippet below creates a [`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages from a topic
subscription.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L84-L101 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L85-L102 -->
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
@@ -214,7 +214,7 @@ receiver.close();
The asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] continuously fetches messages until
the `subscription` is disposed.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L108-L130 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L109-L131 -->
```java
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
@@ -247,7 +247,7 @@ When a message is received, it can be settled using any of the `complete()`, `ab
overloads. The sample below completes a received message from synchronous
[`ServiceBusReceiverClient`][ServiceBusReceiverClient].

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L145-L151 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L146-L152 -->
```java
// This fetches a batch of 10 messages or until the default operation timeout has elapsed.
receiver.receiveMessages(10).forEach(context -> {
@@ -287,7 +287,7 @@ Create a [`ServiceBusSenderClient`][ServiceBusSenderClient] for a session enable
`ServiceBusMessage.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the
session does not exist, it is created.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L164-L168 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L165-L169 -->
```java
// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
@@ -300,25 +300,26 @@ sender.sendMessage(message);

Receivers can fetch messages from a specific session or the first available, unlocked session.

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L175-L181 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L176-L182 -->
```java
// Creates a session-enabled receiver that gets messages from the session "greetings".
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sessionReceiver()
.queueName("<< QUEUE NAME >>")
.sessionId("greetings")
.buildAsyncClient();
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptSession("greetings");
```

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L188-L193 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L189-L195 -->
```java
// Creates a session-enabled receiver that gets messages from the first available session.
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
.sessionReceiver()
.queueName("<< QUEUE NAME >>")
.buildAsyncClient();
Mono<ServiceBusReceiverAsyncClient> receiverAsyncClient = sessionReceiver.acceptNextSession();
```

### Create a dead-letter queue Receiver
@@ -328,7 +329,7 @@ The dead-letter queue doesn't need to be explicitly created and can't be deleted
of the main entity. For session enabled or non-session queue or topic subscriptions, the dead-letter receiver can be
created the same way as shown below. Learn more about dead-letter queue [here][dead-letter-queue].

<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L200-L206 -->
<!-- embedme ./src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java#L202-L208 -->
```java
ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
.connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@

package com.azure.messaging.servicebus;

import com.azure.core.util.CoreUtils;
import com.azure.messaging.servicebus.models.ReceiveMode;

import java.time.Duration;
@@ -16,28 +15,22 @@ class ReceiverOptions {
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
private final boolean isRollingSessionReceiver;
private final Integer maxConcurrentSessions;
private final boolean isSessionReceiver;
private final Duration maxLockRenewDuration;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, false, null);
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
}

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, boolean isRollingSessionReceiver,
Integer maxConcurrentSessions) {

boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.enableAutoComplete = enableAutoComplete;
this.sessionId = sessionId;
this.isRollingSessionReceiver = isRollingSessionReceiver;
this.maxConcurrentSessions = maxConcurrentSessions;
this.maxLockRenewDuration = maxLockRenewDuration;
this.isSessionReceiver = !CoreUtils.isNullOrEmpty(sessionId) || isRollingSessionReceiver;
}

/**
@@ -90,7 +83,7 @@ boolean isAutoLockRenewEnabled() {
* @return true if it is a session-aware receiver; false otherwise.
*/
boolean isSessionReceiver() {
return isSessionReceiver;
return sessionId != null || maxConcurrentSessions != null;
}

/**
@@ -100,7 +93,7 @@ boolean isSessionReceiver() {
* false} otherwise.
*/
public boolean isRollingSessionReceiver() {
return isRollingSessionReceiver;
return maxConcurrentSessions != null && maxConcurrentSessions > 0 && sessionId == null;
}

/**
Original file line number Diff line number Diff line change
@@ -628,7 +628,6 @@ public final class ServiceBusSessionReceiverClientBuilder {
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private String sessionId;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
@@ -674,7 +673,7 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
*/
public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"maxConcurrentSessions cannot be less than 1."));
@@ -728,18 +727,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod
return this;
}

/**
* Sets the session id.
*
* @param sessionId session id.
*
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}

/**
* Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set.
* </b>
@@ -780,8 +767,8 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
return buildAsyncClientForProcessor(true);
}

/**
@@ -797,11 +784,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusReceiverClient buildClient() {
return new ServiceBusReceiverClient(buildAsyncClient(false), retryOptions.getTryTimeout());
ServiceBusReceiverClient buildClientForProcessor() {
return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout());
}

private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
@@ -822,7 +809,7 @@ private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAll

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(),
maxAutoLockRenewDuration, enableAutoComplete, null,
maxConcurrentSessions);

final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
@@ -834,22 +821,66 @@ maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceive
}

/**
* This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If
* there is a sessionId, this is going to be a single, named session receiver.
* Creates an <b>asynchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
*
* @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or topic.
* @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
* topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
return buildAsyncClient(true);
}

/**
* Creates a <b>synchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
*
* @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise.
* @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic.
* @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
* topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
* #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
* {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
* #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
* @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
* queueName()} or {@link #topicName(String) topicName()}, respectively.
*/
private boolean isRollingSessionReceiver() {
if (maxConcurrentSessions == null) {
return false;
public ServiceBusSessionReceiverClient buildClient() {
return new ServiceBusSessionReceiverClient(buildAsyncClient(false),
retryOptions.getTryTimeout());
}

private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
queueName);
final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
SubQueue.NONE);

if (!isAutoCompleteAllowed && enableAutoComplete) {
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
throw logger.logExceptionAsError(new IllegalStateException(
"'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
}

if (maxConcurrentSessions < 1) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Maximum number of concurrent sessions must be positive."));
if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

return CoreUtils.isNullOrEmpty(sessionId);
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions);

return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(),
entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer,
ServiceBusClientBuilder.this::onClientClose);
}
}

Loading