-
Notifications
You must be signed in to change notification settings - Fork 644
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Converted from Alpakka JMS * Jakarta Messaging 3.1 * jakarta.jms package instead of javax.jms
- Loading branch information
Showing
69 changed files
with
11,927 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
# Browse | ||
|
||
## Browsing messages | ||
|
||
The browse source streams the messages in a queue **without consuming them**. | ||
|
||
Unlike the other sources, the browse source will complete after browsing all the messages currently on the queue. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #browse-source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #browse-source } | ||
|
||
A JMS `selector` can be used to filter the messages. Otherwise it will browse the entire content of the queue. | ||
|
||
|
||
**Notes:** | ||
|
||
* Messages may be arriving and expiring while the scan is done. | ||
* The JMS API does not require the content of an enumeration to be a static snapshot of queue content. Whether these changes are visible or not depends on the JMS provider. | ||
* A message must not be returned by a QueueBrowser before its delivery time has been reached. | ||
|
||
|
||
|
||
## Configure JMS browse | ||
|
||
To connect to the JMS broker, first define an appropriate @javadoc[jakarta.jms.ConnectionFactory](jakarta.jms.ConnectionFactory). The Alpakka tests and all examples use Active MQ. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #connection-factory } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #connection-factory } | ||
|
||
|
||
The created @javadoc[ConnectionFactory](jakarta.jms.ConnectionFactory) is then used for the creation of the different JMS sources. | ||
|
||
|
||
The `JmsBrowseSettings` factories allow for passing the actor system to read from the default `alpakka.jakarta-jms.browse` section, or you may pass a `Config` instance which is resolved to a section of the same structure. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsSettingsSpec.scala) { #browse-settings } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsSettingsTest.java) { #consumer-settings } | ||
|
||
|
||
The Alpakka Jakarta Messaging browse soruce is configured via default settings in the [HOCON](https://github.com/lightbend/config#using-hocon-the-json-superset) config file section `alpakka.jakarta-jms.browse` in your `application.conf`, and settings may be tweaked in the code using the `withXyz` methods. On the second tab the section from `reference.conf` shows the structure to use for configuring multiple set-ups. | ||
|
||
Table | ||
: Setting | Description | Default Value | | ||
------------------------|----------------------------------------------------------------------|---------------------| | ||
connectionFactory | Factory to use for creating JMS connections | Must be set in code | | ||
destination | The queue to browse | Must be set in code | | ||
credentials | JMS broker credentials | Empty | | ||
connectionRetrySettings | Retry characteristics if the connection failed to be established or is taking a long time. | See @ref[Connection Retries](producer.md#connection-retries) | ||
|
||
reference.conf | ||
: @@snip [snip](/jakarta-jms/src/main/resources/reference.conf) { #browse } | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,209 @@ | ||
# Consumer | ||
|
||
The Alpakka Jakarta Messaging connector offers consuming JMS messages from topics or queues: | ||
|
||
* Read `jakarta.jms.Message`s from an Akka Streams source | ||
* Allow for client acknowledgement to the JMS broker | ||
* Allow for JMS transactions | ||
* Read raw JVM types from an Akka Streams Source | ||
|
||
The JMS message model supports several types of message bodies in (see @javadoc[jakarta.jms.Message](jakarta.jms.Message)), which may be created directly from the Akka Stream elements, or in wrappers to access more advanced features. | ||
|
||
|
||
## Receiving messages | ||
|
||
@apidoc[jakartajms.*.JmsConsumer$] offers factory methods to consume JMS messages in a number of ways. | ||
|
||
This examples shows how to listen to a JMS queue and emit @javadoc[jakarta.jms.Message](jakarta.jms.Message) elements into the stream. | ||
|
||
The materialized value @apidoc[jakartajms.*.JmsConsumerControl] is used to shut down the consumer (it is a @apidoc[KillSwitch]) and offers the possibility to inspect the connectivity state of the consumer. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #jms-source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #jms-source } | ||
|
||
|
||
## Configure JMS consumers | ||
|
||
To connect to the JMS broker, first define an appropriate @javadoc[jakarta.jms.ConnectionFactory](jakarta.jms.ConnectionFactory). The Alpakka tests and all examples use Active MQ. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #connection-factory } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #connection-factory } | ||
|
||
|
||
The created @javadoc[ConnectionFactory](jakarta.jms.ConnectionFactory) is then used for the creation of the different JMS sources. | ||
|
||
The @apidoc[jakartajms.JmsConsumerSettings$] factories allow for passing the actor system to read from the default `alpakka.jakarta-jms.consumer` section, or you may pass a `Config` instance which is resolved to a section of the same structure. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsSettingsSpec.scala) { #consumer-settings } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsSettingsTest.java) { #consumer-settings } | ||
|
||
The Alpakka Jakarta Messaging consumer is configured via default settings in the [HOCON](https://github.com/lightbend/config#using-hocon-the-json-superset) config file section `alpakka.jakarta-jms.consumer` in your `application.conf`, and settings may be tweaked in the code using the `withXyz` methods. On the second tab the section from `reference.conf` shows the structure to use for configuring multiple set-ups. | ||
|
||
Table | ||
: Setting | Description | Default Value | | ||
------------------------|----------------------------------------------------------------------|---------------------| | ||
connectionFactory | Factory to use for creating JMS connections | Must be set in code | | ||
destination | Destination (queue or topic) to send JMS messages to | Must be set in code | | ||
credentials | JMS broker credentials | Empty | | ||
connectionRetrySettings | Retry characteristics if the connection failed to be established or is taking a long time. | See @ref[Connection Retries](producer.md#connection-retries) | ||
sessionCount | Number of parallel sessions to use for receiving JMS messages. | defaults to `1` | | ||
bufferSize | Maximum number of messages to prefetch before applying backpressure. | 100 | | ||
ackTimeout | For use with JMS transactions, only: maximum time given to a message to be committed or rolled back. | 1 second | | ||
maxAckInterval | For use with AckSource, only: The max duration before the queued acks are sent to the broker | Empty | | ||
maxPendingAcks | For use with AckSource, only: The amount of acks that get queued before being sent to the broker | 100 | | ||
selector | JMS selector expression (see [below](#using-jms-selectors)) | Empty | | ||
connectionStatusSubscriptionTimeout | 5 seconds | Time to wait for subscriber of connection status events before starting to discard them | | ||
|
||
reference.conf | ||
: @@snip [snip](/jakarta-jms/src/main/resources/reference.conf) { #consumer } | ||
|
||
|
||
### Broker specific destinations | ||
|
||
To reach out to special features of the JMS broker, destinations can be created as `CustomDestination` which takes a factory method for creating destinations. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #custom-destination } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #custom-destination } | ||
|
||
|
||
## Using JMS client acknowledgement | ||
|
||
Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below). | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsBufferedAckConnectorsSpec.scala) { #source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsBufferedAckConnectorsTest.java) { #source } | ||
|
||
The `sessionCount` parameter controls the number of JMS sessions to run in parallel. | ||
|
||
|
||
**Notes:** | ||
|
||
* Using multiple sessions increases throughput, especially if acknowledging message by message is desired. | ||
* Messages may arrive out of order if `sessionCount` is larger than 1. | ||
* Message-by-message acknowledgement can be achieved by setting `bufferSize` to 0, thus disabling buffering. The outstanding messages before backpressure will be the `sessionCount`. | ||
* If buffering is enabled then it's possible for messages to remain in the buffer and never be acknowledged (or acknowledged after a long time) when no new elements arrive to reach the `maxPendingAcks` threshold. By setting `maxAckInterval` messages will be acknowledged after the defined interval or number of pending acks, whichever comes first. | ||
* The default `AcknowledgeMode` is `ClientAcknowledge` but can be overridden to custom `AcknowledgeMode`s, even implementation-specific ones by setting the `AcknowledgeMode` in the `JmsConsumerSettings` when creating the stream. | ||
|
||
@@@ warning | ||
|
||
Using a regular `JmsConsumer` with `AcknowledgeMode.ClientAcknowledge` and using `message.acknowledge()` from the stream is not compliant with the JMS specification and can cause issues for some message brokers. `message.acknowledge()` in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe. | ||
|
||
Use this `JmsConsumer.ackSource` as shown above instead. | ||
|
||
@@@ | ||
|
||
|
||
## Using JMS transactions | ||
|
||
JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsTxConnectorsSpec.scala) { #source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsTxConnectorsTest.java) { #source } | ||
|
||
The `sessionCount` parameter controls the number of JMS sessions to run in parallel. | ||
|
||
The `ackTimeout` parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a `decider`. | ||
|
||
**Notes:** | ||
|
||
* Higher throughput is achieved by increasing the `sessionCount`. | ||
* Messages will arrive out of order if `sessionCount` is larger than 1. | ||
* Buffering is not supported in transaction mode. The `bufferSize` is ignored. | ||
* The default `AcknowledgeMode` is `SessionTransacted` but can be overridden to custom `AcknowledgeMode`s, even implementation-specific ones by setting the `AcknowledgeMode` in the `JmsConsumerSettings` when creating the stream. | ||
|
||
|
||
|
||
## Using JMS selectors | ||
|
||
Create a @javadoc[jakarta.jms.Message](jakarta.jms.Message) source specifying a [JMS selector expression](https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html): | ||
Verify that we are only receiving messages according to the selector: | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #source-with-selector } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #source-with-selector } | ||
|
||
|
||
## Raw JVM type sources | ||
|
||
| Stream element type | Alpakka source factory | | ||
|-----------------------------------------------------------|--------------------------| | ||
| `String` | [`JmsConsumer.textSource`](#text-sources) | | ||
| @scala[`Array[Byte]`]@java[`byte[]`] | [`JmsConsumer.bytesSource`](#byte-array-sources) | | ||
| @scala[`Map[String, AnyRef]`]@java[`Map<String, Object>`] | [`JmsConsumer.mapSource`](#map-messages-sources) | | ||
| `Object` (`java.io.Serializable`) | [`JmsConsumer.objectSource`](#object-sources) | | ||
|
||
### Text sources | ||
|
||
The `textSource` emits the received message body as String: | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #text-source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #text-source } | ||
|
||
|
||
### Byte array sources | ||
|
||
The `bytesSource` emits the received message body as byte array: | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #bytearray-source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #bytearray-source } | ||
|
||
|
||
### Map sources | ||
|
||
The `mapSource` emits the received message body as @scala[Map[String, Object]]@java[Map<String, Object>]: | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #map-source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #map-source } | ||
|
||
|
||
### Object sources | ||
|
||
The `objectSource` emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See [ActiveMQ Security](https://activemq.apache.org/objectmessage.html) for more information on this. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #object-source } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #object-source } | ||
|
||
|
||
## Request / Reply | ||
|
||
The request / reply pattern can be implemented by streaming a @apidoc[jakartajms.*.JmsConsumer$] | ||
to a @apidoc[jakartajms.*.JmsProducer$], | ||
with a stage in between that extracts the `ReplyTo` and `CorrelationID` from the original message and adds them to the response. | ||
|
||
Scala | ||
: @@snip [snip](/jakarta-jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala) { #request-reply } | ||
|
||
Java | ||
: @@snip [snip](/jakarta-jms/src/test/java/docs/javadsl/JmsConnectorsTest.java) { #request-reply } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# Jakarta Messaging (JMS) | ||
|
||
@@@ note { title="Jakarta Messaging (JMS)" } | ||
|
||
The Jakarta Messaging API (formerly Java Message Service or JMS API) is a Java application programming interface (API) for message-oriented middleware. It provides generic messaging models, able to handle the producer–consumer problem, that can be used to facilitate the sending and receiving of messages between software systems. Jakarta Messaging is a part of [Jakarta EE]((https://jakarta.ee)) and was originally defined by a specification developed at Sun Microsystems before being guided by the Java Community Process. | ||
|
||
-- [Wikipedia](https://en.wikipedia.org/wiki/Jakarta_Messaging) | ||
|
||
@@@ | ||
|
||
The Alpakka Jakarta Messaging connector provides Akka Stream sources and sinks to connect to Jakarta Messaging providers. | ||
|
||
@@project-info{ projectId="jakarta-jms" } | ||
|
||
## Artifacts | ||
|
||
The Akka dependencies are available from Akka's library repository. To access them there, you need to configure the URL for this repository. | ||
|
||
@@repository [sbt,Maven,Gradle] { | ||
id="akka-repository" | ||
name="Akka library repository" | ||
url="https://repo.akka.io/maven" | ||
} | ||
|
||
Additionally, add the dependencies as below. | ||
|
||
@@dependency [sbt,Maven,Gradle] { | ||
group=com.lightbend.akka | ||
artifact=akka-stream-alpakka-jakarta-jms_$scala.binary.version$ | ||
version=$project.version$ | ||
} | ||
|
||
@@toc { depth=2 } | ||
|
||
@@@ index | ||
|
||
* [p](producer.md) | ||
* [c](consumer.md) | ||
* [c](browse.md) | ||
|
||
@@@ |
Oops, something went wrong.