From 5015b32f14cfb546ce3267f3e0fc597ec51a13f7 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Mon, 10 Jun 2024 16:02:37 +0200 Subject: [PATCH] Add and fix some documentation --- .../apache/beam/sdk/io/solace/SolaceIO.java | 56 ++++++++++--------- .../sdk/io/solace/broker/MessageReceiver.java | 24 +++++++- .../beam/sdk/io/solace/broker/SempClient.java | 18 ++++++ .../io/solace/broker/SempClientFactory.java | 11 ++++ .../sdk/io/solace/broker/SessionService.java | 20 +++++++ .../solace/broker/SessionServiceFactory.java | 20 +++++++ 6 files changed, 123 insertions(+), 26 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index 8c4d1d7d44bd6..5e5eb93ff1b3b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -51,11 +51,12 @@ import org.slf4j.LoggerFactory; /** - * A {@link PTransform} to read and write from/to Solace event broker. + * A {@link PTransform} to read and write from/to Solace event + * broker. * *

Note: this API is beta and subject to change. * - *

Reading

+ *

Reading from Solace

* * To read from Solace, use the {@link SolaceIO#read()} or {@link SolaceIO#read(TypeDescriptor, * SerializableFunction, SerializableFunction)}. @@ -115,20 +116,21 @@ * PCollection events = * pipeline.apply( * SolaceIO.read() - * .from(Queue.fromName(options.getSolaceReadQueue())) + * .from(Queue.fromName("your-queue-name")) * .withSempClientFactory( * BasicAuthSempClientFactory.builder() - * .host("http://" + options.getSolaceHost() + ":8080") - * .username(options.getSolaceUsername()) - * .password(options.getSolacePassword()) - * .vpnName(options.getSolaceVpnName()) + * .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080" + * .username("semp-username") + * .password("semp-password") + * .vpnName("vpn-name") * .build()) * .withSessionServiceFactory( * BasicAuthJcsmpSessionServiceFactory.builder() - * .host(options.getSolaceHost()) - * .username(options.getSolaceUsername()) - * .password(options.getSolacePassword()) - * .vpnName(options.getSolaceVpnName()) + * .host("your-host-name") + * // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444" + * .username("username") + * .password("password") + * .vpnName("vpn-name") * .build())); * } * @@ -172,7 +174,7 @@ * TypeDescriptor.of(SimpleRecord.class), * record -> toSimpleRecord(record), * record -> record.timestamp) - * .from(Topic.fromName(options.getSolaceReadTopic())) + * .from(Topic.fromName("your-topic-name")) * .withSempClientFactory(...) * .withSessionServiceFactory(...); * @@ -340,8 +342,8 @@ public Read withDeduplicateRecords(boolean deduplicateRecords) { * * *

An existing implementation of the SempClientFactory includes {@link - * org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements the Basic - * Authentication to Solace. + * org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements connection + * to the SEMP with the Basic Authentication method. * *

To use it, specify the credentials with the builder methods. * @@ -350,11 +352,11 @@ public Read withDeduplicateRecords(boolean deduplicateRecords) { *

{@code
      * .withSempClientFactory(
      *         BasicAuthSempClientFactory.builder()
-     *                 .host("http://" + options.getSolaceHost() + ":8080")
-     *                 .username(options.getSolaceUsername())
-     *                 .password(options.getSolacePassword())
-     *                 .vpnName(options.getSolaceVpnName())
-     *                 .build())
+     *               .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
+     *               .username("username")
+     *               .password("password")
+     *               .vpnName("vpn-name")
+     *               .build())
      * }
*/ public Read withSempClientFactory(SempClientFactory sempClientFactory) { @@ -382,15 +384,19 @@ public Read withSempClientFactory(SempClientFactory sempClientFactory) { * *

To use it, specify the credentials with the builder methods. * * - *

The format of the host is `[Protocol://]Host[:Port]` * + *

The host is the IPv4 or IPv6 or host name of the appliance. IPv5 addresses must be encoded + * in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default + * port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444", + * or "[fe80::1]:4444". * *

{@code
      * BasicAuthJcsmpSessionServiceFactory.builder()
-     *         .host(options.getSolaceHost())
-     *         .username(options.getSolaceUsername())
-     *         .password(options.getSolacePassword())
-     *         .vpnName(options.getSolaceVpnName())
-     *         .build()));
+     *     .host("your-host-name")
+     *           // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
+     *     .username("semp-username")
+     *     .password("semp-password")
+     *     .vpnName("vpn-name")
+     *     .build()));
      * }
*/ public Read withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java index baf9e3beb038e..199a83e322bdc 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -20,16 +20,38 @@ import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.IOException; +/** + * Interface for receiving messages from a Solace broker. + * + *

Implementations of this interface are responsible for managing the connection to the broker + * and for receiving messages from the broker. + */ public interface MessageReceiver { + /** + * Starts the message receiver. + * + *

This method is called in the {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method. + */ void start(); + /** + * Returns {@literal true} if the message receiver is closed, {@literal false} otherwise. + * + *

A message receiver is closed when it is no longer able to receive messages. + */ boolean isClosed(); + /** + * Receives a message from the broker. + * + *

This method will block until a message is received. + */ BytesXMLMessage receive() throws IOException; /** * Test clients may return {@literal true} to signal that all expected messages have been pulled - * and the test may complete. Real clients will return {@literal false}. + * and the test may complete. Real clients should always return {@literal false}. */ default boolean isEOF() { return false; diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java index 571010316ee80..465f37c140363 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java @@ -21,11 +21,29 @@ import java.io.IOException; import java.io.Serializable; +/** + * This interface defines methods for interacting with a Solace message broker using the Solace + * Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of + * the broker, including queues and topics. + */ public interface SempClient extends Serializable { + /** + * Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow + * multiple consumers to receive messages from the queue. + */ boolean isQueueNonExclusive(String queueName) throws IOException; + /** + * This is only called when a user requests to read data from a topic. This method creates a new + * queue on the Solace broker and associates it with the specified topic. This ensures that + * messages published to the topic are delivered to the queue, allowing consumers to receive them. + */ Queue createQueueForTopic(String queueName, String topicName) throws IOException; + /** + * Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents + * the amount of data in messages that are waiting to be delivered to consumers. + */ long getBacklogBytes(String queueName) throws IOException; } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java index 3967ed1ab8842..79f690fde1752 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java @@ -19,6 +19,17 @@ import java.io.Serializable; +/** + * This interface serves as a blueprint for creating SempClient objects, which are used to interact + * with a Solace message broker using the Solace Element Management Protocol (SEMP). + */ public interface SempClientFactory extends Serializable { + + /** + * This method is the core of the factory interface. It defines how to construct and return a + * SempClient object. Implementations of this interface will provide the specific logic for + * creating a client instance, which might involve connecting to the broker, handling + * authentication, and configuring other settings. + */ SempClient create(); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index af276c2f3af64..cd368865f0c39 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -19,12 +19,32 @@ import java.io.Serializable; +/** + * The SessionService interface provides a set of methods for managing a session with the Solace + * messaging system. It allows for establishing a connection, creating a message-receiver object, + * checking if the connection is closed or not, and gracefully closing the session. + */ public interface SessionService extends Serializable { + + /** + * Establishes a connection to the service. This could involve providing connection details like + * host, port, VPN name, username, and password. + */ void connect(); + /** Gracefully closes the connection to the service. */ void close(); + /** + * Checks whether the connection to the service is currently closed. This method is called when an + * `UnboundedSolaceReader` is starting to read messages - a session will be created if this + * returns true. + */ boolean isClosed(); + /** + * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is + * created from the session instance. + */ MessageReceiver createReceiver(); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index d4f698d8299cc..7d1dee7a11874 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -21,11 +21,31 @@ import java.io.Serializable; import org.checkerframework.checker.nullness.qual.Nullable; +/** + * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a + * queue property and mandates the implementation of a create() method in concrete subclasses. + */ public abstract class SessionServiceFactory implements Serializable { + + /** + * A reference to a Queue object. This is set when the pipline is constructed (in the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method). + * This could be used to associate the created SessionService with a specific queue for message + * handling. + */ @Nullable Queue queue; + /** + * This is the core method that subclasses must implement. It defines how to construct and return + * a SessionService object. + */ public abstract SessionService create(); + /** + * This method is called in the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method + * to set the Queue reference. + */ public void setQueue(Queue queue) { this.queue = queue; }