From 3b8ddda10a01ff640ed4cf1ce746d0c19e003180 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Mon, 1 Jul 2024 23:24:57 +0200 Subject: [PATCH] Solace Read connector: adding Basic Authentication support (#31541) * Add support for BasicAuth to Solace * Address PR comments * Use `checkStateNotNull` --- .../broker/BasicAuthJcsmpSessionService.java | 148 ++++++++++++++++++ .../BasicAuthJcsmpSessionServiceFactory.java | 74 +++++++++ .../solace/broker/SolaceMessageReceiver.java | 72 +++++++++ 3 files changed, 294 insertions(+) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java new file mode 100644 index 0000000000000..7863dbd129ce3 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.solacesystems.jcsmp.ConsumerFlowProperties; +import com.solacesystems.jcsmp.EndpointProperties; +import com.solacesystems.jcsmp.FlowReceiver; +import com.solacesystems.jcsmp.InvalidPropertiesException; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.Queue; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; + +/** + * A class that manages a connection to a Solace broker using basic authentication. + * + *

This class provides a way to connect to a Solace broker and receive messages from a queue. The + * connection is established using basic authentication. + */ +public class BasicAuthJcsmpSessionService implements SessionService { + private final String queueName; + private final String host; + private final String username; + private final String password; + private final String vpnName; + @Nullable private JCSMPSession jcsmpSession; + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + + /** + * Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters. + * + * @param queueName The name of the queue to receive messages from. + * @param host The host name or IP address of the Solace broker. Format: Host[:Port] + * @param username The username to use for authentication. + * @param password The password to use for authentication. + * @param vpnName The name of the VPN to connect to. + */ + public BasicAuthJcsmpSessionService( + String queueName, String host, String username, String password, String vpnName) { + this.queueName = queueName; + this.host = host; + this.username = username; + this.password = password; + this.vpnName = vpnName; + } + + @Override + public void connect() { + retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public void close() { + if (isClosed()) { + return; + } + retryCallableManager.retryCallable( + () -> { + checkStateNotNull(jcsmpSession).closeSession(); + return 0; + }, + ImmutableSet.of(IOException.class)); + } + + @Override + public MessageReceiver createReceiver() { + return retryCallableManager.retryCallable( + this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public boolean isClosed() { + return jcsmpSession == null || jcsmpSession.isClosed(); + } + + private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { + if (isClosed()) { + connectSession(); + } + + Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName); + + ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); + flowProperties.setEndpoint(queue); + flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); + + EndpointProperties endpointProperties = new EndpointProperties(); + endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE); + if (jcsmpSession != null) { + return new SolaceMessageReceiver( + createFlowReceiver(jcsmpSession, flowProperties, endpointProperties)); + } + throw new IOException( + "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null."); + } + + // The `@SuppressWarning` is needed here, because the checkerframework reports an error for the + // first argument of the `createFlow` being null, even though the documentation allows it: + // https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPSession.html#createFlow-com.solacesystems.jcsmp.XMLMessageListener-com.solacesystems.jcsmp.ConsumerFlowProperties-com.solacesystems.jcsmp.EndpointProperties- + @SuppressWarnings("nullness") + private static FlowReceiver createFlowReceiver( + JCSMPSession jcsmpSession, + ConsumerFlowProperties flowProperties, + EndpointProperties endpointProperties) + throws JCSMPException { + return jcsmpSession.createFlow(null, flowProperties, endpointProperties); + } + + private int connectSession() throws JCSMPException { + if (jcsmpSession == null) { + jcsmpSession = createSessionObject(); + } + jcsmpSession.connect(); + return 0; + } + + private JCSMPSession createSessionObject() throws InvalidPropertiesException { + JCSMPProperties properties = new JCSMPProperties(); + properties.setProperty(JCSMPProperties.HOST, host); + properties.setProperty(JCSMPProperties.USERNAME, username); + properties.setProperty(JCSMPProperties.PASSWORD, password); + properties.setProperty(JCSMPProperties.VPN_NAME, vpnName); + + return JCSMPFactory.onlyInstance().createSession(properties); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java new file mode 100644 index 0000000000000..8cb4ff0af0537 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.value.AutoValue; + +/** + * A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link + * SessionServiceFactory}. + * + *

This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with + * authenticate to Solace with Basic Authentication. + */ +@AutoValue +public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { + public abstract String host(); + + public abstract String username(); + + public abstract String password(); + + public abstract String vpnName(); + + public static Builder builder() { + return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + /** + * Set Solace host, format: Host[:Port] e.g. "12.34.56.78", or "[fe80::1]", or + * "12.34.56.78:4444". + */ + public abstract Builder host(String host); + + /** Set Solace username. */ + public abstract Builder username(String username); + /** Set Solace password. */ + public abstract Builder password(String password); + + /** Set Solace vpn name. */ + public abstract Builder vpnName(String vpnName); + + public abstract BasicAuthJcsmpSessionServiceFactory build(); + } + + @Override + public SessionService create() { + return new BasicAuthJcsmpSessionService( + checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(), + host(), + username(), + password(), + vpnName()); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java new file mode 100644 index 0000000000000..e5f129d3ddfcb --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.FlowReceiver; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.StaleSessionException; +import java.io.IOException; +import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolaceMessageReceiver implements MessageReceiver { + private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class); + + public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100; + private final FlowReceiver flowReceiver; + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + + public SolaceMessageReceiver(FlowReceiver flowReceiver) { + this.flowReceiver = flowReceiver; + } + + @Override + public void start() { + startFlowReceiver(); + } + + private void startFlowReceiver() { + retryCallableManager.retryCallable( + () -> { + flowReceiver.start(); + return 0; + }, + ImmutableSet.of(JCSMPException.class)); + } + + @Override + public boolean isClosed() { + return flowReceiver == null || flowReceiver.isClosed(); + } + + @Override + public BytesXMLMessage receive() throws IOException { + try { + return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS); + } catch (StaleSessionException e) { + LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver."); + startFlowReceiver(); + throw new IOException(e); + } catch (JCSMPException e) { + throw new IOException(e); + } + } +}