Skip to content

Commit

Permalink
Add and fix some documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
bzablocki committed Jun 10, 2024
1 parent df1eb6c commit 5015b32
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
import org.slf4j.LoggerFactory;

/**
* A {@link PTransform} to read and write from/to Solace event broker.
* A {@link PTransform} to read and write from/to <a href="https://solace.com/">Solace</a> event
* broker.
*
* <p>Note: this API is beta and subject to change.
*
* <h2>Reading</h2>
* <h2>Reading from Solace</h2>
*
* To read from Solace, use the {@link SolaceIO#read()} or {@link SolaceIO#read(TypeDescriptor,
* SerializableFunction, SerializableFunction)}.
Expand Down Expand Up @@ -115,20 +116,21 @@
* PCollection<Solace.Record> events =
* pipeline.apply(
* SolaceIO.read()
* .from(Queue.fromName(options.getSolaceReadQueue()))
* .from(Queue.fromName("your-queue-name"))
* .withSempClientFactory(
* BasicAuthSempClientFactory.builder()
* .host("http://" + options.getSolaceHost() + ":8080")
* .username(options.getSolaceUsername())
* .password(options.getSolacePassword())
* .vpnName(options.getSolaceVpnName())
* .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
* .username("semp-username")
* .password("semp-password")
* .vpnName("vpn-name")
* .build())
* .withSessionServiceFactory(
* BasicAuthJcsmpSessionServiceFactory.builder()
* .host(options.getSolaceHost())
* .username(options.getSolaceUsername())
* .password(options.getSolacePassword())
* .vpnName(options.getSolaceVpnName())
* .host("your-host-name")
* // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
* .username("username")
* .password("password")
* .vpnName("vpn-name")
* .build()));
* }</pre>
*
Expand Down Expand Up @@ -172,7 +174,7 @@
* TypeDescriptor.of(SimpleRecord.class),
* record -> toSimpleRecord(record),
* record -> record.timestamp)
* .from(Topic.fromName(options.getSolaceReadTopic()))
* .from(Topic.fromName("your-topic-name"))
* .withSempClientFactory(...)
* .withSessionServiceFactory(...);
*
Expand Down Expand Up @@ -340,8 +342,8 @@ public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
* </ul>
*
* <p>An existing implementation of the SempClientFactory includes {@link
* org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements the Basic
* Authentication to Solace.
* org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} which implements connection
* to the SEMP with the Basic Authentication method.
*
* <p>To use it, specify the credentials with the builder methods.
*
Expand All @@ -350,11 +352,11 @@ public Read<T> withDeduplicateRecords(boolean deduplicateRecords) {
* <pre>{@code
* .withSempClientFactory(
* BasicAuthSempClientFactory.builder()
* .host("http://" + options.getSolaceHost() + ":8080")
* .username(options.getSolaceUsername())
* .password(options.getSolacePassword())
* .vpnName(options.getSolaceVpnName())
* .build())
* .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
* .username("username")
* .password("password")
* .vpnName("vpn-name")
* .build())
* }</pre>
*/
public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
Expand Down Expand Up @@ -382,15 +384,19 @@ public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
*
* <p>To use it, specify the credentials with the builder methods. *
*
* <p>The format of the host is `[Protocol://]Host[:Port]` *
* <p>The host is the IPv4 or IPv6 or host name of the appliance. IPv5 addresses must be encoded
* in brackets ([]). For example, "12.34.56.78", or "[fe80::1]". If connecting to a non-default
* port, it can be specified here using the "Host:Port" format. For example, "12.34.56.78:4444",
* or "[fe80::1]:4444".
*
* <pre>{@code
* BasicAuthJcsmpSessionServiceFactory.builder()
* .host(options.getSolaceHost())
* .username(options.getSolaceUsername())
* .password(options.getSolacePassword())
* .vpnName(options.getSolaceVpnName())
* .build()));
* .host("your-host-name")
* // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
* .username("semp-username")
* .password("semp-password")
* .vpnName("vpn-name")
* .build()));
* }</pre>
*/
public Read<T> withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,38 @@
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;

/**
* Interface for receiving messages from a Solace broker.
*
* <p>Implementations of this interface are responsible for managing the connection to the broker
* and for receiving messages from the broker.
*/
public interface MessageReceiver {
/**
* Starts the message receiver.
*
* <p>This method is called in the {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method.
*/
void start();

/**
* Returns {@literal true} if the message receiver is closed, {@literal false} otherwise.
*
* <p>A message receiver is closed when it is no longer able to receive messages.
*/
boolean isClosed();

/**
* Receives a message from the broker.
*
* <p>This method will block until a message is received.
*/
BytesXMLMessage receive() throws IOException;

/**
* Test clients may return {@literal true} to signal that all expected messages have been pulled
* and the test may complete. Real clients will return {@literal false}.
* and the test may complete. Real clients should always return {@literal false}.
*/
default boolean isEOF() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,29 @@
import java.io.IOException;
import java.io.Serializable;

/**
* This interface defines methods for interacting with a Solace message broker using the Solace
* Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of
* the broker, including queues and topics.
*/
public interface SempClient extends Serializable {

/**
* Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow
* multiple consumers to receive messages from the queue.
*/
boolean isQueueNonExclusive(String queueName) throws IOException;

/**
* This is only called when a user requests to read data from a topic. This method creates a new
* queue on the Solace broker and associates it with the specified topic. This ensures that
* messages published to the topic are delivered to the queue, allowing consumers to receive them.
*/
Queue createQueueForTopic(String queueName, String topicName) throws IOException;

/**
* Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents
* the amount of data in messages that are waiting to be delivered to consumers.
*/
long getBacklogBytes(String queueName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@

import java.io.Serializable;

/**
* This interface serves as a blueprint for creating SempClient objects, which are used to interact
* with a Solace message broker using the Solace Element Management Protocol (SEMP).
*/
public interface SempClientFactory extends Serializable {

/**
* This method is the core of the factory interface. It defines how to construct and return a
* SempClient object. Implementations of this interface will provide the specific logic for
* creating a client instance, which might involve connecting to the broker, handling
* authentication, and configuring other settings.
*/
SempClient create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,32 @@

import java.io.Serializable;

/**
* The SessionService interface provides a set of methods for managing a session with the Solace
* messaging system. It allows for establishing a connection, creating a message-receiver object,
* checking if the connection is closed or not, and gracefully closing the session.
*/
public interface SessionService extends Serializable {

/**
* Establishes a connection to the service. This could involve providing connection details like
* host, port, VPN name, username, and password.
*/
void connect();

/** Gracefully closes the connection to the service. */
void close();

/**
* Checks whether the connection to the service is currently closed. This method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be created if this
* returns true.
*/
boolean isClosed();

/**
* Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
* created from the session instance.
*/
MessageReceiver createReceiver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,31 @@
import java.io.Serializable;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
* queue property and mandates the implementation of a create() method in concrete subclasses.
*/
public abstract class SessionServiceFactory implements Serializable {

/**
* A reference to a Queue object. This is set when the pipline is constructed (in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method).
* This could be used to associate the created SessionService with a specific queue for message
* handling.
*/
@Nullable Queue queue;

/**
* This is the core method that subclasses must implement. It defines how to construct and return
* a SessionService object.
*/
public abstract SessionService create();

/**
* This method is called in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
* to set the Queue reference.
*/
public void setQueue(Queue queue) {
this.queue = queue;
}
Expand Down

0 comments on commit 5015b32

Please sign in to comment.