Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#25887] fix(JmsIO): issue with multiple connection open #25887 #25945

Merged
merged 4 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .test-infra/jenkins/job_PreCommit_Java_IOs.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def additionalTasks = [
snowflake: [
':sdks:java:io:snowflake:expansion-service:build',
],
jms: [
':sdks:java:io:jms:integrationTest',
],
]

ioModules.forEach {
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)).
* Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase (Java) ([#25830](https://github.com/apache/beam/issues/25830)).
* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) ([#25779](https://github.com/apache/beam/issues/25779)).
* Add integration test for JmsIO + fix issue with multiple connections (Java) ([#25887](https://github.com/apache/beam/issues/25887)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ class BeamModulePlugin implements Plugin<Project> {
def powermock_version = "2.0.9"
// Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom
def protobuf_version = "3.21.12"
def qpid_jms_client_version = "0.61.0"
def quickcheck_version = "1.0"
def sbe_tool_version = "1.25.1"
def singlestore_jdbc_version = "1.1.4"
Expand All @@ -565,6 +566,7 @@ class BeamModulePlugin implements Plugin<Project> {
def testcontainers_version = "1.17.3"
def arrow_version = "5.0.0"
def jmh_version = "1.34"
def jupiter_version = "5.7.0"

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
Expand Down Expand Up @@ -746,6 +748,9 @@ class BeamModulePlugin implements Plugin<Project> {
json_org : "org.json:json:20220320", // Keep in sync with everit-json-schema / google_cloud_platform_libraries_bom transitive deps.
everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}",
junit : "junit:junit:4.13.1",
jupiter_api : "org.junit.jupiter:junit-jupiter-api:$jupiter_version",
jupiter_engine : "org.junit.jupiter:junit-jupiter-engine:$jupiter_version",
jupiter_params : "org.junit.jupiter:junit-jupiter-params:$jupiter_version",
kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
log4j : "log4j:log4j:1.2.17",
Expand Down Expand Up @@ -778,6 +783,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",
sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version",
singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version",
slf4j_api : "org.slf4j:slf4j-api:$slf4j_version",
Expand Down
7 changes: 7 additions & 0 deletions sdks/java/io/jms/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.jms',
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: JMS"
ext.summary = """IO to read and write to JMS (Java Messaging Service)
Expand All @@ -31,13 +33,18 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1"
testImplementation library.java.activemq_amqp
testImplementation library.java.activemq_broker
testImplementation library.java.activemq_jaas
testImplementation library.java.activemq_kahadb_store
testImplementation library.java.activemq_client
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.mockito_inline
testImplementation library.java.qpid_jms_client
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -988,17 +989,16 @@ private static class JmsConnection<T> implements Serializable {
private transient @Initialized Destination destination;
private transient @Initialized MessageProducer producer;

private boolean isProducerNeedsToBeCreated = true;
private final JmsIO.Write<T> spec;
private final Counter connectionErrors =
Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME);

public JmsConnection(Write<T> spec) {
JmsConnection(Write<T> spec) {
this.spec = spec;
}

public void start() throws JMSException {
if (isProducerNeedsToBeCreated) {
void connect() throws JMSException {
if (this.producer == null) {
ConnectionFactory connectionFactory = spec.getConnectionFactory();
if (spec.getUsername() != null) {
this.connection =
Expand All @@ -1008,7 +1008,6 @@ public void start() throws JMSException {
}
this.connection.setExceptionListener(
exception -> {
this.isProducerNeedsToBeCreated = true;
this.connectionErrors.inc();
});
this.connection.start();
Expand All @@ -1021,12 +1020,11 @@ public void start() throws JMSException {
this.destination = session.createTopic(spec.getTopic());
}
// Create producer with null destination. Destination will be set with producer.send().
this.producer = this.session.createProducer(null);
this.isProducerNeedsToBeCreated = false;
startProducer();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may replace the direct assignment of producer with startProducer() thus make it clear that producer is opened in single code path.

Also, I see " isProducerNeedsToBeCreated" is removed in several places and connect() is only called in DoFn's setup. Can we get rid of this flag now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added the flag in case of a failed connection it will create a new connection based on exceptionListener. Do you think it would be better to check with the producer if it's null ?

if (producer == null) {
// open connection
connection.setExceptionListener(exception -> {
    connectionCounter.inc();
    // if there is an issue with the connection, we will close session, connection & producer so it can be recreated it
    close();
});
// create new producer
}

Copy link
Contributor

@Abacn Abacn Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both is fine, if the flag is used then it should be consistent. For now if closeProducer() is called, producer is closed and set to null, but isProducerNeedsToBeCreated is still false.

}

public void publishMessage(T input) throws JMSException, JmsIOException {
void publishMessage(T input) throws JMSException, JmsIOException {
Destination destinationToSendTo = destination;
try {
Message message = spec.getValueMapper().apply(input, session);
Expand All @@ -1043,24 +1041,30 @@ public void publishMessage(T input) throws JMSException, JmsIOException {
}
}

public void close() throws JMSException {
isProducerNeedsToBeCreated = true;
void startProducer() throws JMSException {
this.producer = this.session.createProducer(null);
}

void closeProducer() throws JMSException {
if (producer != null) {
producer.close();
producer = null;
}
if (session != null) {
session.close();
session = null;
}
if (connection != null) {
try {
// If the connection failed, stopping the connection will throw a JMSException
connection.stop();
} catch (JMSException exception) {
LOG.warn("The connection couldn't be closed", exception);
}

void close() {
try {
closeProducer();
if (session != null) {
session.close();
}
connection.close();
if (connection != null) {
connection.close();
}
} catch (JMSException exception) {
LOG.warn("The connection couldn't be closed", exception);
} finally {
session = null;
connection = null;
}
}
Expand All @@ -1083,8 +1087,10 @@ static class JmsIOProducerFn<T> extends DoFn<T, T> {
}

@Setup
public void setup() {
RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration());
public void setup() throws JMSException {
this.jmsConnection.connect();
RetryConfiguration retryConfiguration =
MoreObjects.firstNonNull(spec.getRetryConfiguration(), RetryConfiguration.create());
retryBackOff =
FluentBackoff.DEFAULT
.withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
Expand All @@ -1094,7 +1100,7 @@ public void setup() {

@StartBundle
public void startBundle() throws JMSException {
this.jmsConnection.start();
this.jmsConnection.startProducer();
}

@ProcessElement
Expand Down Expand Up @@ -1130,11 +1136,11 @@ private void publishMessage(T input)

@FinishBundle
public void finishBundle() throws JMSException {
this.jmsConnection.close();
this.jmsConnection.closeProducer();
}

@Teardown
public void tearDown() throws JMSException {
public void tearDown() {
this.jmsConnection.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

@AutoValue
public abstract class RetryConfiguration implements Serializable {
private static final Integer DEFAULT_MAX_ATTEMPTS = 5;
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(15);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);

Expand All @@ -35,6 +36,10 @@ public abstract class RetryConfiguration implements Serializable {

abstract @Nullable Duration getInitialDuration();

public static RetryConfiguration create() {
return create(DEFAULT_MAX_ATTEMPTS, null, null);
}

public static RetryConfiguration create(int maxAttempts) {
return create(maxAttempts, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jms;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.amqp.AmqpTransportFactory;

/**
* A common test fixture to create a broker and connection factories for {@link JmsIOIT} & {@link
* JmsIOTest}.
*/
public class CommonJms implements Serializable {
Amraneze marked this conversation as resolved.
Show resolved Hide resolved
private static final String BROKER_WITHOUT_PREFETCH_PARAM = "?jms.prefetchPolicy.all=0&";

static final String USERNAME = "test_user";
static final String PASSWORD = "test_password";
static final String QUEUE = "test_queue";
static final String TOPIC = "test_topic";

private final String brokerUrl;
private final Integer brokerPort;
private final String forceAsyncAcksParam;
private transient BrokerService broker;

protected ConnectionFactory connectionFactory;
protected final Class<? extends ConnectionFactory> connectionFactoryClass;
protected ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;

public CommonJms(
String brokerUrl,
Integer brokerPort,
String forceAsyncAcksParam,
Class<? extends ConnectionFactory> connectionFactoryClass) {
this.brokerUrl = brokerUrl;
this.brokerPort = brokerPort;
this.forceAsyncAcksParam = forceAsyncAcksParam;
this.connectionFactoryClass = connectionFactoryClass;
}

void startBroker() throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
TransportFactory.registerTransportFactory("amqp", new AmqpTransportFactory());
if (connectionFactoryClass != ActiveMQConnectionFactory.class) {
broker.addConnector(String.format("%s:%d?transport.transformer=jms", brokerUrl, brokerPort));
} else {
broker.addConnector(brokerUrl);
}
broker.setBrokerName("localhost");
broker.setPopulateJMSXUserID(true);
broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
broker.getManagementContext().setCreateConnector(false);

// enable authentication
List<AuthenticationUser> users = new ArrayList<>();
// username and password to use to connect to the broker.
// This user has users privilege (able to browse, consume, produce, list destinations)
users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users);
BrokerPlugin[] plugins = new BrokerPlugin[] {plugin};
broker.setPlugins(plugins);

broker.start();
broker.waitUntilStarted();

// create JMS connection factory
connectionFactory = connectionFactoryClass.getConstructor(String.class).newInstance(brokerUrl);
connectionFactoryWithSyncAcksAndWithoutPrefetch =
connectionFactoryClass
.getConstructor(String.class)
.newInstance(brokerUrl + BROKER_WITHOUT_PREFETCH_PARAM + forceAsyncAcksParam);
}

void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
broker = null;
}

Class<? extends ConnectionFactory> getConnectionFactoryClass() {
return this.connectionFactoryClass;
}

ConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}

ConnectionFactory getConnectionFactoryWithSyncAcksAndWithoutPrefetch() {
return this.connectionFactoryWithSyncAcksAndWithoutPrefetch;
}

/** A test class that maps a {@link javax.jms.BytesMessage} into a {@link String}. */
public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper<String> {

@Override
public String mapMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage) message;

byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];

return new String(bytes, StandardCharsets.UTF_8);
}
}
}
Loading