diff --git a/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy b/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy index 8de6a386aae4c..a248231102f27 100644 --- a/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy @@ -185,6 +185,9 @@ def additionalTasks = [ snowflake: [ ':sdks:java:io:snowflake:expansion-service:build', ], + jms: [ + ':sdks:java:io:jms:integrationTest', + ], ] // In case the test suite name is different from the project folder name diff --git a/CHANGES.md b/CHANGES.md index c5a592114d180..a89c7f271267c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ * BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)). * Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase (Java) ([#25830](https://github.com/apache/beam/issues/25830)). * Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) ([#25779](https://github.com/apache/beam/issues/25779)). +* Add integration test for JmsIO + fix issue with multiple connections (Java) ([#25887](https://github.com/apache/beam/issues/25887)). ## New Features / Improvements diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 2c224ee153606..f461579d5ca22 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -555,6 +555,7 @@ class BeamModulePlugin implements Plugin { def powermock_version = "2.0.9" // Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom def protobuf_version = "3.21.12" + def qpid_jms_client_version = "0.61.0" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1" def singlestore_jdbc_version = "1.1.4" @@ -565,6 +566,7 @@ class BeamModulePlugin implements Plugin { def testcontainers_version = "1.17.3" def arrow_version = "5.0.0" def jmh_version = "1.34" + def jupiter_version = "5.7.0" // Export Spark versions, so they are defined in a single place only project.ext.spark3_version = spark3_version @@ -746,6 +748,9 @@ class BeamModulePlugin implements Plugin { json_org : "org.json:json:20220320", // Keep in sync with everit-json-schema / google_cloud_platform_libraries_bom transitive deps. everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}", junit : "junit:junit:4.13.1", + jupiter_api : "org.junit.jupiter:junit-jupiter-api:$jupiter_version", + jupiter_engine : "org.junit.jupiter:junit-jupiter-engine:$jupiter_version", + jupiter_params : "org.junit.jupiter:junit-jupiter-params:$jupiter_version", kafka : "org.apache.kafka:kafka_2.11:$kafka_version", kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version", log4j : "log4j:log4j:1.2.17", @@ -778,6 +783,7 @@ class BeamModulePlugin implements Plugin { proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version + qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version", sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version", singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version", slf4j_api : "org.slf4j:slf4j-api:$slf4j_version", diff --git a/sdks/java/io/jms/build.gradle b/sdks/java/io/jms/build.gradle index 63db9be4f045d..5ecc0ec19d575 100644 --- a/sdks/java/io/jms/build.gradle +++ b/sdks/java/io/jms/build.gradle @@ -20,6 +20,8 @@ plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.jms', ) +provideIntegrationTestingDependencies() +enableJavaPerformanceTesting() description = "Apache Beam :: SDKs :: Java :: IO :: JMS" ext.summary = """IO to read and write to JMS (Java Messaging Service) @@ -31,6 +33,7 @@ dependencies { implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1" + testImplementation library.java.activemq_amqp testImplementation library.java.activemq_broker testImplementation library.java.activemq_jaas testImplementation library.java.activemq_kahadb_store @@ -38,6 +41,10 @@ dependencies { testImplementation library.java.hamcrest testImplementation library.java.junit testImplementation library.java.mockito_core + testImplementation library.java.mockito_inline + testImplementation library.java.qpid_jms_client + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") + testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration") testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index adfa919808c75..b9249fc758399 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -988,17 +989,16 @@ private static class JmsConnection implements Serializable { private transient @Initialized Destination destination; private transient @Initialized MessageProducer producer; - private boolean isProducerNeedsToBeCreated = true; private final JmsIO.Write spec; private final Counter connectionErrors = Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME); - public JmsConnection(Write spec) { + JmsConnection(Write spec) { this.spec = spec; } - public void start() throws JMSException { - if (isProducerNeedsToBeCreated) { + void connect() throws JMSException { + if (this.producer == null) { ConnectionFactory connectionFactory = spec.getConnectionFactory(); if (spec.getUsername() != null) { this.connection = @@ -1008,7 +1008,6 @@ public void start() throws JMSException { } this.connection.setExceptionListener( exception -> { - this.isProducerNeedsToBeCreated = true; this.connectionErrors.inc(); }); this.connection.start(); @@ -1021,12 +1020,11 @@ public void start() throws JMSException { this.destination = session.createTopic(spec.getTopic()); } // Create producer with null destination. Destination will be set with producer.send(). - this.producer = this.session.createProducer(null); - this.isProducerNeedsToBeCreated = false; + startProducer(); } } - public void publishMessage(T input) throws JMSException, JmsIOException { + void publishMessage(T input) throws JMSException, JmsIOException { Destination destinationToSendTo = destination; try { Message message = spec.getValueMapper().apply(input, session); @@ -1043,24 +1041,30 @@ public void publishMessage(T input) throws JMSException, JmsIOException { } } - public void close() throws JMSException { - isProducerNeedsToBeCreated = true; + void startProducer() throws JMSException { + this.producer = this.session.createProducer(null); + } + + void closeProducer() throws JMSException { if (producer != null) { producer.close(); producer = null; } - if (session != null) { - session.close(); - session = null; - } - if (connection != null) { - try { - // If the connection failed, stopping the connection will throw a JMSException - connection.stop(); - } catch (JMSException exception) { - LOG.warn("The connection couldn't be closed", exception); + } + + void close() { + try { + closeProducer(); + if (session != null) { + session.close(); } - connection.close(); + if (connection != null) { + connection.close(); + } + } catch (JMSException exception) { + LOG.warn("The connection couldn't be closed", exception); + } finally { + session = null; connection = null; } } @@ -1083,8 +1087,10 @@ static class JmsIOProducerFn extends DoFn { } @Setup - public void setup() { - RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration()); + public void setup() throws JMSException { + this.jmsConnection.connect(); + RetryConfiguration retryConfiguration = + MoreObjects.firstNonNull(spec.getRetryConfiguration(), RetryConfiguration.create()); retryBackOff = FluentBackoff.DEFAULT .withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration())) @@ -1094,7 +1100,7 @@ public void setup() { @StartBundle public void startBundle() throws JMSException { - this.jmsConnection.start(); + this.jmsConnection.startProducer(); } @ProcessElement @@ -1130,11 +1136,11 @@ private void publishMessage(T input) @FinishBundle public void finishBundle() throws JMSException { - this.jmsConnection.close(); + this.jmsConnection.closeProducer(); } @Teardown - public void tearDown() throws JMSException { + public void tearDown() { this.jmsConnection.close(); } } diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java index b98b6e15343e0..013ac278e56e8 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/RetryConfiguration.java @@ -26,6 +26,7 @@ @AutoValue public abstract class RetryConfiguration implements Serializable { + private static final Integer DEFAULT_MAX_ATTEMPTS = 5; private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(15); private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000); @@ -35,6 +36,10 @@ public abstract class RetryConfiguration implements Serializable { abstract @Nullable Duration getInitialDuration(); + public static RetryConfiguration create() { + return create(DEFAULT_MAX_ATTEMPTS, null, null); + } + public static RetryConfiguration create(int maxAttempts) { return create(maxAttempts, null, null); } diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java new file mode 100644 index 0000000000000..9670e60e345cd --- /dev/null +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/CommonJms.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.amqp.AmqpTransportFactory; + +/** + * A common test fixture to create a broker and connection factories for {@link JmsIOIT} & {@link + * JmsIOTest}. + */ +public class CommonJms implements Serializable { + private static final String BROKER_WITHOUT_PREFETCH_PARAM = "?jms.prefetchPolicy.all=0&"; + + static final String USERNAME = "test_user"; + static final String PASSWORD = "test_password"; + static final String QUEUE = "test_queue"; + static final String TOPIC = "test_topic"; + + private final String brokerUrl; + private final Integer brokerPort; + private final String forceAsyncAcksParam; + private transient BrokerService broker; + + protected ConnectionFactory connectionFactory; + protected final Class connectionFactoryClass; + protected ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch; + + public CommonJms( + String brokerUrl, + Integer brokerPort, + String forceAsyncAcksParam, + Class connectionFactoryClass) { + this.brokerUrl = brokerUrl; + this.brokerPort = brokerPort; + this.forceAsyncAcksParam = forceAsyncAcksParam; + this.connectionFactoryClass = connectionFactoryClass; + } + + void startBroker() throws Exception { + broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + TransportFactory.registerTransportFactory("amqp", new AmqpTransportFactory()); + if (connectionFactoryClass != ActiveMQConnectionFactory.class) { + broker.addConnector(String.format("%s:%d?transport.transformer=jms", brokerUrl, brokerPort)); + } else { + broker.addConnector(brokerUrl); + } + broker.setBrokerName("localhost"); + broker.setPopulateJMSXUserID(true); + broker.setUseAuthenticatedPrincipalForJMSXUserID(true); + broker.getManagementContext().setCreateConnector(false); + + // enable authentication + List users = new ArrayList<>(); + // username and password to use to connect to the broker. + // This user has users privilege (able to browse, consume, produce, list destinations) + users.add(new AuthenticationUser(USERNAME, PASSWORD, "users")); + SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users); + BrokerPlugin[] plugins = new BrokerPlugin[] {plugin}; + broker.setPlugins(plugins); + + broker.start(); + broker.waitUntilStarted(); + + // create JMS connection factory + connectionFactory = connectionFactoryClass.getConstructor(String.class).newInstance(brokerUrl); + connectionFactoryWithSyncAcksAndWithoutPrefetch = + connectionFactoryClass + .getConstructor(String.class) + .newInstance(brokerUrl + BROKER_WITHOUT_PREFETCH_PARAM + forceAsyncAcksParam); + } + + void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + + Class getConnectionFactoryClass() { + return this.connectionFactoryClass; + } + + ConnectionFactory getConnectionFactory() { + return this.connectionFactory; + } + + ConnectionFactory getConnectionFactoryWithSyncAcksAndWithoutPrefetch() { + return this.connectionFactoryWithSyncAcksAndWithoutPrefetch; + } + + /** A test class that maps a {@link javax.jms.BytesMessage} into a {@link String}. */ + public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper { + + @Override + public String mapMessage(Message message) throws Exception { + BytesMessage bytesMessage = (BytesMessage) message; + + byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; + + return new String(bytes, StandardCharsets.UTF_8); + } + } +} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java new file mode 100644 index 0000000000000..d7cd01f815861 --- /dev/null +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOIT.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.jms; + +import static org.apache.beam.sdk.io.jms.CommonJms.PASSWORD; +import static org.apache.beam.sdk.io.jms.CommonJms.QUEUE; +import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.Serializable; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import javax.jms.ConnectionFactory; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.common.IOITHelper; +import org.apache.beam.sdk.io.common.IOTestPipelineOptions; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testutils.NamedTestResult; +import org.apache.beam.sdk.testutils.metrics.IOITMetrics; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.testutils.metrics.TimeMonitor; +import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * A performance test of {@link JmsIO} on a Jms Broker. + * + *

Usage: + * + *

+ *  ./gradlew integrationTest -p sdks/java/io/jms -DintegrationTestPipelineOptions='[ \
+ *    "--jmsBrokerHost=amqp://host", \
+ *    "--jmsBrokerPort=5672", \
+ *    "--localJmsBrokerEnabled=false", \
+ *    "--readTimeout=10" \
+ *    ]' \
+ *  --tests org.apache.beam.sdk.io.jms.JmsIOIT \
+ *  -DintegrationTestRunner=direct
+ * 
+ * + * The default values for each parameter are: - jmsBrokerHost: amqp://localhost - jmsBrokerPort: + * 5672 - localJmsBrokerEnabled: true - readTimeout: 30 + */ +@RunWith(Parameterized.class) +public class JmsIOIT implements Serializable { + + private static final String NAMESPACE = JmsIOIT.class.getName(); + private static final String READ_TIME_METRIC = "read_time"; + private static final String WRITE_TIME_METRIC = "write_time"; + private static final String READ_ELEMENT_METRIC_NAME = "jms_read_element_count"; + + /** JmsIO options. */ + public interface JmsIOITOptions extends IOTestPipelineOptions, StreamingOptions { + @Description("Host name for Jms Broker. By default, 'amqp://localhost'") + @Default.String("amqp://localhost") + String getJmsBrokerHost(); + + void setJmsBrokerHost(String host); + + @Description("Port for Jms Broker") + @Default.Integer(5672) + Integer getJmsBrokerPort(); + + void setJmsBrokerPort(Integer port); + + @Description("Enabling Jms Broker locally") + @Default.Boolean(true) + boolean isLocalJmsBrokerEnabled(); + + void setLocalJmsBrokerEnabled(boolean isEnabled); + + @Description("JMS Read Timeout in seconds") + @Default.Integer(30) + Integer getReadTimeout(); + + void setReadTimeout(Integer timeout); + } + + private static final JmsIOITOptions OPTIONS = + IOITHelper.readIOTestPipelineOptions(JmsIOITOptions.class); + + private static final InfluxDBSettings settings = + InfluxDBSettings.builder() + .withHost(OPTIONS.getInfluxHost()) + .withDatabase(OPTIONS.getInfluxDatabase()) + .withMeasurement(OPTIONS.getInfluxMeasurement()) + .get(); + + @Rule public transient TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public transient TestPipeline pipelineRead = TestPipeline.create(); + + @Parameterized.Parameters(name = "with client class {3}") + public static Collection connectionFactories() { + return Arrays.asList( + new Object[] { + "vm://localhost", 5672, "jms.sendAcksAsync=false", ActiveMQConnectionFactory.class + }, + new Object[] { + "amqp://localhost", 5672, "jms.forceAsyncAcks=false", JmsConnectionFactory.class + }); + } + + private final CommonJms commonJms; + private ConnectionFactory connectionFactory; + private Class connectionFactoryClass; + + public JmsIOIT( + String brokerUrl, + Integer brokerPort, + String forceAsyncAcksParam, + Class connectionFactoryClass) { + this.commonJms = + new CommonJms( + OPTIONS.isLocalJmsBrokerEnabled() ? brokerUrl : OPTIONS.getJmsBrokerHost(), + OPTIONS.isLocalJmsBrokerEnabled() ? brokerPort : OPTIONS.getJmsBrokerPort(), + forceAsyncAcksParam, + connectionFactoryClass); + } + + @Before + public void setup() throws Exception { + if (OPTIONS.isLocalJmsBrokerEnabled()) { + this.commonJms.startBroker(); + connectionFactory = this.commonJms.getConnectionFactory(); + connectionFactoryClass = this.commonJms.getConnectionFactoryClass(); + } + } + + @After + public void tearDown() throws Exception { + if (OPTIONS.isLocalJmsBrokerEnabled()) { + this.commonJms.stopBroker(); + connectionFactory = null; + connectionFactoryClass = null; + } + } + + @Test + public void testPublishingThenReadingAll() throws IOException { + PipelineResult writeResult = publishingMessages(); + PipelineResult.State writeState = writeResult.waitUntilFinish(); + assertNotEquals(PipelineResult.State.FAILED, writeState); + + PipelineResult readResult = readMessages(); + PipelineResult.State readState = + readResult.waitUntilFinish(Duration.standardSeconds(OPTIONS.getReadTimeout())); + // A workaround to stop the pipeline for waiting for too long + cancelIfTimeouted(readResult, readState); + assertNotEquals(PipelineResult.State.FAILED, readState); + + MetricsReader metricsReader = new MetricsReader(readResult, NAMESPACE); + long actualRecords = metricsReader.getCounterMetric(READ_ELEMENT_METRIC_NAME); + + assertTrue( + String.format( + "actual number of records %d smaller than expected: %d.", + actualRecords, OPTIONS.getNumberOfRecords()), + OPTIONS.getNumberOfRecords() <= actualRecords); + collectAndPublishMetrics(writeResult, readResult); + } + + private void cancelIfTimeouted(PipelineResult readResult, PipelineResult.State readState) + throws IOException { + if (readState == null) { + readResult.cancel(); + } + } + + private PipelineResult readMessages() { + pipelineRead.getOptions().as(JmsIOITOptions.class).setStreaming(true); + pipelineRead.getOptions().as(JmsIOITOptions.class).setBlockOnRun(false); + pipelineRead + .apply( + "Read Messages", + JmsIO.readMessage() + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withCoder(SerializableCoder.of(String.class)) + .withConnectionFactory(connectionFactory) + .withMessageMapper(getJmsMessageMapper())) + .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC))) + .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME))); + return pipelineRead.run(); + } + + private PipelineResult publishingMessages() { + pipelineWrite + .apply("Generate Sequence Data", GenerateSequence.from(0).to(OPTIONS.getNumberOfRecords())) + .apply("Convert to String", ParDo.of(new ToString())) + .apply("Collect write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC))) + .apply( + "Publish to Jms Broker", + JmsIO.write() + .withQueue(QUEUE) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withValueMapper(new TextMessageMapper()) + .withConnectionFactory(connectionFactory)); + return pipelineWrite.run(); + } + + private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) { + String uuid = UUID.randomUUID().toString(); + String timestamp = Instant.now().toString(); + + Set> readSuppliers = + getMetricsSuppliers(uuid, timestamp, READ_TIME_METRIC); + Set> writeSuppliers = + getMetricsSuppliers(uuid, timestamp, WRITE_TIME_METRIC); + + IOITMetrics readMetrics = + new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp); + IOITMetrics writeMetrics = + new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp); + + readMetrics.publishToInflux(settings); + writeMetrics.publishToInflux(settings); + } + + private Set> getMetricsSuppliers( + String uuid, String timestamp, String metric) { + Set> suppliers = new HashSet<>(); + suppliers.add(getTimeMetric(uuid, timestamp, metric)); + return suppliers; + } + + private Function getTimeMetric( + final String uuid, final String timestamp, final String metricName) { + return reader -> { + long startTime = reader.getStartTimeMetric(metricName); + long endTime = reader.getEndTimeMetric(metricName); + return NamedTestResult.create(uuid, timestamp, metricName, (endTime - startTime) / 1e3); + }; + } + + static class ToString extends DoFn { + @ProcessElement + public void processElement(@Element Long element, OutputReceiver outputReceiver) { + outputReceiver.output(String.format("Message %d", element)); + } + } + + private JmsIO.MessageMapper getJmsMessageMapper() { + return rawMessage -> + connectionFactoryClass == JmsConnectionFactory.class + ? ((TextMessage) rawMessage).getText() + : ((ActiveMQTextMessage) rawMessage).getText(); + } + + private static class CountingFn extends DoFn { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement() { + elementCounter.inc(1L); + } + } +} diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index b55e58bfae708..10f3ec7317cb5 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -18,6 +18,10 @@ package org.apache.beam.sdk.io.jms; import static org.apache.beam.sdk.io.UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN; +import static org.apache.beam.sdk.io.jms.CommonJms.PASSWORD; +import static org.apache.beam.sdk.io.jms.CommonJms.QUEUE; +import static org.apache.beam.sdk.io.jms.CommonJms.TOPIC; +import static org.apache.beam.sdk.io.jms.CommonJms.USERNAME; import static org.apache.beam.sdk.io.jms.JmsIO.Writer.JMS_IO_PRODUCER_METRIC_NAME; import static org.apache.beam.sdk.io.jms.JmsIO.Writer.PUBLICATION_RETRIES_METRIC_NAME; import static org.hamcrest.CoreMatchers.allOf; @@ -51,6 +55,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Enumeration; import java.util.List; @@ -68,12 +73,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.security.AuthenticationUser; -import org.apache.activemq.security.SimpleAuthenticationPlugin; -import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.util.Callback; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.Coder; @@ -92,70 +92,73 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; +import org.apache.qpid.jms.JmsAcknowledgeCallback; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.message.JmsTextMessage; import org.joda.time.Duration; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Tests of {@link JmsIO}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) @SuppressWarnings({ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class JmsIOTest { - private static final String BROKER_URL = "vm://localhost"; + private static final Logger LOG = LoggerFactory.getLogger(JmsIOTest.class); + private final RetryConfiguration retryConfiguration = + RetryConfiguration.create(1, Duration.standardSeconds(1), null); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - private static final String USERNAME = "test_user"; - private static final String PASSWORD = "test_password"; - private static final String QUEUE = "test_queue"; - private static final String TOPIC = "test_topic"; + @Parameterized.Parameters(name = "with client class {3}") + public static Collection connectionFactories() { + return Arrays.asList( + new Object[] { + "vm://localhost", 5672, "jms.sendAcksAsync=false", ActiveMQConnectionFactory.class + }, + new Object[] { + "amqp://localhost", 5672, "jms.forceAsyncAcks=false", JmsConnectionFactory.class + }); + } - private BrokerService broker; + private final CommonJms commonJms; private ConnectionFactory connectionFactory; + private Class connectionFactoryClass; private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch; - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - - private final RetryConfiguration retryConfiguration = - RetryConfiguration.create(1, Duration.standardSeconds(1), null); + public JmsIOTest( + String brokerUrl, + Integer brokerPort, + String forceAsyncAcksParam, + Class connectionFactoryClass) { + this.commonJms = + new CommonJms(brokerUrl, brokerPort, forceAsyncAcksParam, connectionFactoryClass); + } @Before - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); - broker.addConnector(BROKER_URL); - broker.setBrokerName("localhost"); - broker.setPopulateJMSXUserID(true); - broker.setUseAuthenticatedPrincipalForJMSXUserID(true); - - // enable authentication - List users = new ArrayList<>(); - // username and password to use to connect to the broker. - // This user has users privilege (able to browse, consume, produce, list destinations) - users.add(new AuthenticationUser(USERNAME, PASSWORD, "users")); - SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users); - BrokerPlugin[] plugins = new BrokerPlugin[] {plugin}; - broker.setPlugins(plugins); - - broker.start(); - - // create JMS connection factory - connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); + public void beforeEeach() throws Exception { + this.commonJms.startBroker(); + connectionFactory = this.commonJms.getConnectionFactory(); + connectionFactoryClass = this.commonJms.getConnectionFactoryClass(); connectionFactoryWithSyncAcksAndWithoutPrefetch = - new ActiveMQConnectionFactory( - BROKER_URL + "?jms.prefetchPolicy.all=0&jms.sendAcksAsync=false"); + this.commonJms.getConnectionFactoryWithSyncAcksAndWithoutPrefetch(); } @After - public void stopBroker() throws Exception { - broker.stop(); + public void tearDown() throws Exception { + this.commonJms.stopBroker(); + connectionFactory = null; + connectionFactoryClass = null; + connectionFactoryWithSyncAcksAndWithoutPrefetch = null; } private void runPipelineExpectingJmsConnectException(String innerMessage) { @@ -170,8 +173,11 @@ private void runPipelineExpectingJmsConnectException(String innerMessage) { @Test public void testAuthenticationRequired() { pipeline.apply(JmsIO.read().withConnectionFactory(connectionFactory).withQueue(QUEUE)); - - runPipelineExpectingJmsConnectException("User name [null] or password is invalid."); + String errorMessage = + this.connectionFactoryClass == ActiveMQConnectionFactory.class + ? "User name [null] or password is invalid." + : "Client failed to authenticate using SASL: ANONYMOUS"; + runPipelineExpectingJmsConnectException(errorMessage); } @Test @@ -183,7 +189,11 @@ public void testAuthenticationWithBadPassword() { .withUsername(USERNAME) .withPassword("BAD")); - runPipelineExpectingJmsConnectException("User name [" + USERNAME + "] or password is invalid."); + String errorMessage = + this.connectionFactoryClass == ActiveMQConnectionFactory.class + ? "User name [" + USERNAME + "] or password is invalid." + : "Client failed to authenticate using SASL: PLAIN"; + runPipelineExpectingJmsConnectException(errorMessage); } @Test @@ -248,7 +258,7 @@ public void testReadBytesMessages() throws Exception { .withPassword(PASSWORD) .withMaxNumRecords(1) .withCoder(SerializableCoder.of(String.class)) - .withMessageMapper(new BytesMessageToStringMessageMapper())); + .withMessageMapper(new CommonJms.BytesMessageToStringMessageMapper())); PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L); pipeline.run(); @@ -462,6 +472,39 @@ public void testCheckpointMark() throws Exception { assertEquals(0, count(QUEUE)); } + private Function getJmsMessageAck(Class connectorClass) { + final int delay = 10; + return connectorClass == JmsConnectionFactory.class + ? (JmsTextMessage message) -> { + final JmsAcknowledgeCallback originalCallback = message.getAcknowledgeCallback(); + JmsAcknowledgeCallback jmsAcknowledgeCallbackMock = + Mockito.mock(JmsAcknowledgeCallback.class); + try { + Mockito.doAnswer( + invocation -> { + Thread.sleep(delay); + originalCallback.acknowledge(); + return null; + }) + .when(jmsAcknowledgeCallbackMock) + .acknowledge(); + } catch (JMSException exception) { + LOG.error("An exception occurred while adding 10s delay", exception); + } + message.setAcknowledgeCallback(jmsAcknowledgeCallbackMock); + return message; + } + : (ActiveMQMessage message) -> { + final Callback originalCallback = message.getAcknowledgeCallback(); + message.setAcknowledgeCallback( + () -> { + Thread.sleep(delay); + originalCallback.execute(); + }); + return message; + }; + } + @Test public void testCheckpointMarkSafety() throws Exception { @@ -489,13 +532,15 @@ public void testCheckpointMarkSafety() throws Exception { session.close(); connection.close(); + Function jmsMessageAck = getJmsMessageAck(this.connectionFactoryClass); + // create a JmsIO.Read with a decorated ConnectionFactory which will introduce a delay in // sending // acknowledgements - this should help uncover threading issues around checkpoint management. JmsIO.Read spec = JmsIO.read() .withConnectionFactory( - withSlowAcks(connectionFactoryWithSyncAcksAndWithoutPrefetch, 10)) + withSlowAcks(connectionFactoryWithSyncAcksAndWithoutPrefetch, jmsMessageAck)) .withUsername(USERNAME) .withPassword(PASSWORD) .withQueue(QUEUE); @@ -839,7 +884,34 @@ public void testWriteMessagesWithErrors() throws Exception { count++; } assertEquals(3, count); - System.out.println(count); + } + + @Test + public void testWriteMessageToStaticTopicWithoutRetryPolicy() throws Exception { + Instant now = Instant.now(); + String messageText = now.toString(); + List data = Collections.singletonList(messageText); + + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createTopic(TOPIC)); + + WriteJmsResult output = + pipeline + .apply(Create.of(data)) + .apply( + JmsIO.write() + .withConnectionFactory(connectionFactory) + .withValueMapper(new TextMessageMapper()) + .withTopic(TOPIC) + .withUsername(USERNAME) + .withPassword(PASSWORD)); + PAssert.that(output.getFailedMessages()).empty(); + pipeline.run(); + Message message = consumer.receive(1000); + assertNotNull(message); + assertNull(consumer.receiveNoWait()); } private int count(String queue) throws Exception { @@ -856,25 +928,13 @@ private int count(String queue) throws Exception { return count; } - /** A test class that maps a {@link javax.jms.BytesMessage} into a {@link String}. */ - public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper { - - @Override - public String mapMessage(Message message) throws Exception { - BytesMessage bytesMessage = (BytesMessage) message; - - byte[] bytes = new byte[(int) bytesMessage.getBodyLength()]; - - return new String(bytes, StandardCharsets.UTF_8); - } - } - /* * A utility method which replaces a ConnectionFactory with one where calling receiveNoWait() -- i.e. pulling a * message -- will return a message with its acknowledgement callback decorated to include a sleep for a specified * duration. This gives the effect of ensuring messages take at least {@code delay} milliseconds to be processed. */ - private ConnectionFactory withSlowAcks(ConnectionFactory factory, long delay) { + private ConnectionFactory withSlowAcks( + ConnectionFactory factory, Function resultTransformer) { return proxyMethod( factory, ConnectionFactory.class, @@ -894,16 +954,7 @@ private ConnectionFactory withSlowAcks(ConnectionFactory factory, long delay) { consumer, MessageConsumer.class, "receiveNoWait", - (ActiveMQMessage message) -> { - final Callback originalCallback = - message.getAcknowledgeCallback(); - message.setAcknowledgeCallback( - () -> { - Thread.sleep(delay); - originalCallback.execute(); - }); - return message; - })))); + resultTransformer)))); } /* diff --git a/sdks/java/io/pulsar/build.gradle b/sdks/java/io/pulsar/build.gradle index 9ee14b0933ed8..73d34f71b8c32 100644 --- a/sdks/java/io/pulsar/build.gradle +++ b/sdks/java/io/pulsar/build.gradle @@ -38,8 +38,9 @@ dependencies { permitUsedUndeclared "org.apache.pulsar:pulsar-client-admin-api:$pulsar_version" implementation project(path: ":sdks:java:core", configuration: "shadow") - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' - testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0' + + testImplementation library.java.jupiter_api + testRuntimeOnly library.java.jupiter_engine testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation "org.testcontainers:pulsar:1.15.3" testImplementation "org.assertj:assertj-core:2.9.1" diff --git a/sdks/java/io/splunk/build.gradle b/sdks/java/io/splunk/build.gradle index 2e3c486d5ab9b..8f719cdf06786 100644 --- a/sdks/java/io/splunk/build.gradle +++ b/sdks/java/io/splunk/build.gradle @@ -40,7 +40,7 @@ dependencies { testImplementation library.java.junit testImplementation group: 'org.mock-server', name: 'mockserver-junit-rule', version: '5.10.0' testImplementation group: 'org.mock-server', name: 'mockserver-client-java', version: '5.10.0' - testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.6.2' + testImplementation library.java.jupiter_api testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") }