From 47461f8cb18e8d66af277eac35051e154840d94e Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 18 Jun 2024 18:02:59 +0200 Subject: [PATCH] Add interfaces for broker-helper classes --- .../sdk/io/solace/broker/MessageReceiver.java | 59 +++++++++++++++++++ .../beam/sdk/io/solace/broker/SempClient.java | 49 +++++++++++++++ .../io/solace/broker/SempClientFactory.java | 11 +++- .../sdk/io/solace/broker/SessionService.java | 50 ++++++++++++++++ .../solace/broker/SessionServiceFactory.java | 9 ++- 5 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java new file mode 100644 index 000000000000..199a83e322bd --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.io.IOException; + +/** + * Interface for receiving messages from a Solace broker. + * + *

Implementations of this interface are responsible for managing the connection to the broker + * and for receiving messages from the broker. + */ +public interface MessageReceiver { + /** + * Starts the message receiver. + * + *

This method is called in the {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method. + */ + void start(); + + /** + * Returns {@literal true} if the message receiver is closed, {@literal false} otherwise. + * + *

A message receiver is closed when it is no longer able to receive messages. + */ + boolean isClosed(); + + /** + * Receives a message from the broker. + * + *

This method will block until a message is received. + */ + BytesXMLMessage receive() throws IOException; + + /** + * Test clients may return {@literal true} to signal that all expected messages have been pulled + * and the test may complete. Real clients should always return {@literal false}. + */ + default boolean isEOF() { + return false; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java new file mode 100644 index 000000000000..465f37c14036 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.Queue; +import java.io.IOException; +import java.io.Serializable; + +/** + * This interface defines methods for interacting with a Solace message broker using the Solace + * Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of + * the broker, including queues and topics. + */ +public interface SempClient extends Serializable { + + /** + * Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow + * multiple consumers to receive messages from the queue. + */ + boolean isQueueNonExclusive(String queueName) throws IOException; + + /** + * This is only called when a user requests to read data from a topic. This method creates a new + * queue on the Solace broker and associates it with the specified topic. This ensures that + * messages published to the topic are delivered to the queue, allowing consumers to receive them. + */ + Queue createQueueForTopic(String queueName, String topicName) throws IOException; + + /** + * Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents + * the amount of data in messages that are waiting to be delivered to consumers. + */ + long getBacklogBytes(String queueName) throws IOException; +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java index b5cb53e14b39..79f690fde175 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java @@ -23,4 +23,13 @@ * This interface serves as a blueprint for creating SempClient objects, which are used to interact * with a Solace message broker using the Solace Element Management Protocol (SEMP). */ -public interface SempClientFactory extends Serializable {} +public interface SempClientFactory extends Serializable { + + /** + * This method is the core of the factory interface. It defines how to construct and return a + * SempClient object. Implementations of this interface will provide the specific logic for + * creating a client instance, which might involve connecting to the broker, handling + * authentication, and configuring other settings. + */ + SempClient create(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java new file mode 100644 index 000000000000..cd368865f0c3 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import java.io.Serializable; + +/** + * The SessionService interface provides a set of methods for managing a session with the Solace + * messaging system. It allows for establishing a connection, creating a message-receiver object, + * checking if the connection is closed or not, and gracefully closing the session. + */ +public interface SessionService extends Serializable { + + /** + * Establishes a connection to the service. This could involve providing connection details like + * host, port, VPN name, username, and password. + */ + void connect(); + + /** Gracefully closes the connection to the service. */ + void close(); + + /** + * Checks whether the connection to the service is currently closed. This method is called when an + * `UnboundedSolaceReader` is starting to read messages - a session will be created if this + * returns true. + */ + boolean isClosed(); + + /** + * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is + * created from the session instance. + */ + MessageReceiver createReceiver(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index ab1f55ae7a9c..9b4ef99eba77 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -23,4 +23,11 @@ * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a * queue property and mandates the implementation of a create() method in concrete subclasses. */ -public abstract class SessionServiceFactory implements Serializable {} +public abstract class SessionServiceFactory implements Serializable { + + /** + * This is the core method that subclasses must implement. It defines how to construct and return + * a SessionService object. + */ + public abstract SessionService create(); +}