From c3bd8ca4521c5693fa2b24531d952a2759c7f885 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 2 Jul 2024 12:17:56 +0200 Subject: [PATCH 1/3] Add integration tests --- .../beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/io/solace/build.gradle | 2 + .../io/solace/it/SolaceContainerManager.java | 168 ++++++++++++++++++ .../beam/sdk/io/solace/it/SolaceIOIT.java | 128 +++++++++++++ 4 files changed, 299 insertions(+) create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index ba6279a13490..4b7dd4ed781e 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -900,6 +900,7 @@ class BeamModulePlugin implements Plugin { testcontainers_oracle : "org.testcontainers:oracle-xe:$testcontainers_version", testcontainers_postgresql : "org.testcontainers:postgresql:$testcontainers_version", testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version", + testcontainers_solace : "org.testcontainers:solace:$testcontainers_version", truth : "com.google.truth:truth:1.1.5", threetenbp : "org.threeten:threetenbp:1.6.8", vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2", diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 2d720ab7d929..e22016c79075 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation library.java.avro permitUnusedDeclared library.java.avro implementation library.java.google_api_common + implementation library.java.threetenbp implementation library.java.gax implementation library.java.threetenbp implementation library.java.google_http_client @@ -52,5 +53,6 @@ dependencies { testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") testRuntimeOnly library.java.slf4j_jdk14 + testImplementation library.java.testcontainers_solace testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java new file mode 100644 index 000000000000..e9c3fe7dfcb6 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.solace.Service; +import org.testcontainers.solace.SolaceContainer; +import org.testcontainers.utility.DockerImageName; + +public class SolaceContainerManager { + + public static final String VPN_NAME = "default"; + public static final String PASSWORD = "password"; + public static final String USERNAME = "username"; + public static final String TOPIC_NAME = "test_topic"; + private static final Logger LOG = LoggerFactory.getLogger(SolaceContainerManager.class); + private final SolaceContainer container; + + public SolaceContainerManager() { + this.container = + new SolaceContainer(DockerImageName.parse("solace/solace-pubsub-standard:10.7")) { + { + addFixedExposedPort(55555, 55555); + addFixedExposedPort(9000, 9000); + addFixedExposedPort(8080, 8080); + addFixedExposedPort(80, 80); + } + }.withVpn(VPN_NAME) + .withCredentials(USERNAME, PASSWORD) + // .withExposedPorts(Service.SMF.getPort()); + .withTopic(TOPIC_NAME, Service.SMF) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + container.addExposedPort(8080); + container.addExposedPort(55555); + } + + public void start() { + container.start(); + } + + void createQueueWithSubscriptionTopic(String queueName) { + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/topicEndpoints", + "-X", + "POST", + "-u", + "admin:admin", + "-H", + "Content-Type:application/json", + "-d", + "{\"topicEndpointName\":\"" + + TOPIC_NAME + + "\",\"accessType\":\"exclusive\",\"permission\":\"modify-topic\",\"ingressEnabled\":true,\"egressEnabled\":true}"); + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/queues", + "-X", + "POST", + "-u", + "admin:admin", + "-H", + "Content-Type:application/json", + "-d", + "{\"queueName\":\"" + + queueName + + "\",\"accessType\":\"non-exclusive\",\"maxMsgSpoolUsage\":200,\"permission\":\"consume\",\"ingressEnabled\":true,\"egressEnabled\":true}"); + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + + VPN_NAME + + "/queues/" + + queueName + + "/subscriptions", + "-X", + "POST", + "-u", + "admin:admin", + "-H", + "Content-Type:application/json", + "-d", + "{\"subscriptionTopic\":\"" + TOPIC_NAME + "\"}"); + } + + private void executeCommand(String... command) { + try { + org.testcontainers.containers.Container.ExecResult execResult = + container.execInContainer(command); + if (execResult.getExitCode() != 0) { + logCommandError(execResult.getStderr(), command); + } else { + LOG.info(execResult.getStdout()); + } + } catch (IOException | InterruptedException e) { + logCommandError(e.getMessage(), command); + } + } + + private void logCommandError(String error, String... command) { + LOG.error("Could not execute command {}: {}", command, error); + } + + public void stop() { + if (container != null) { + container.stop(); + } + } + + public void getQueueDetails(String queueName) { + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/monitor/msgVpns/" + + VPN_NAME + + "/queues/" + + queueName + + "/msgs", + "-X", + "GET", + "-u", + "admin:admin"); + } + + public void sendToTopic(String payload, List additionalHeaders) { + // https://docs.solace.com/API/RESTMessagingPrtl/Solace-REST-Message-Encoding.htm + + List command = + new ArrayList<>( + Arrays.asList( + "curl", + "http://localhost:9000/TOPIC/" + TOPIC_NAME, + "-X", + "POST", + "-u", + USERNAME + ":" + PASSWORD, + "--header", + "Content-Type:application/json", + "-d", + payload)); + + for (String additionalHeader : additionalHeaders) { + command.add("--header"); + command.add(additionalHeader); + } + + executeCommand(command.toArray(new String[0])); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java new file mode 100644 index 000000000000..7dcd1829609a --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.it; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; +import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory; +import org.apache.beam.sdk.io.solace.data.Solace.Queue; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testutils.metrics.MetricsReader; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +public class SolaceIOIT { + private static final String NAMESPACE = SolaceIOIT.class.getName(); + private static final String READ_COUNT = "read_count"; + private static SolaceContainerManager solaceContainerManager; + private static final TestPipelineOptions readPipelineOptions; + + static { + readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); + readPipelineOptions.setBlockOnRun(false); + readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); + readPipelineOptions.as(StreamingOptions.class).setStreaming(false); + } + + @Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions); + + @BeforeClass + public static void setup() { + solaceContainerManager = new SolaceContainerManager(); + solaceContainerManager.start(); + } + + @AfterClass + public static void afterClass() { + if (solaceContainerManager != null) { + solaceContainerManager.stop(); + } + } + + @Test + public void testRead() { + String queueName = "test_queue"; + solaceContainerManager.createQueueWithSubscriptionTopic(queueName); + + // todo this is very slow, needs to be replaced with the SolaceIO.write connector. + int publishMessagesCount = 20; + for (int i = 0; i < publishMessagesCount; i++) { + solaceContainerManager.sendToTopic( + "{\"field_str\":\"value\",\"field_int\":123}", + ImmutableList.of("Solace-Message-ID:m" + i)); + } + + readPipeline + .apply( + "Read from Solace", + SolaceIO.read() + .from(Queue.fromName(queueName)) + .withMaxNumConnections(1) + .withSempClientFactory( + BasicAuthSempClientFactory.builder() + .host("http://localhost:8080") + .username("admin") + .password("admin") + .vpnName(SolaceContainerManager.VPN_NAME) + .build()) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost") + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())) + .apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT))); + + PipelineResult pipelineResult = readPipeline.run(); + pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); + + MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); + long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); + assertEquals(publishMessagesCount, actualRecordsCount); + } + + private static class CountingFn extends DoFn { + + private final Counter elementCounter; + + CountingFn(String namespace, String name) { + elementCounter = Metrics.counter(namespace, name); + } + + @ProcessElement + public void processElement(@Element T record, OutputReceiver c) { + elementCounter.inc(1L); + c.output(record); + } + } +} From d2912c4a2d2777ec1f50f60b6280e5ac0b6b5a06 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Wed, 3 Jul 2024 21:09:47 +0000 Subject: [PATCH 2/3] Use random exposed ports for testcontainers --- .../io/solace/it/SolaceContainerManager.java | 40 ++++++++++++++----- .../beam/sdk/io/solace/it/SolaceIOIT.java | 7 ++-- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java index e9c3fe7dfcb6..6d2b3a27ffd0 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceContainerManager.java @@ -18,11 +18,13 @@ package org.apache.beam.sdk.io.solace.it; import java.io.IOException; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.solace.Service; import org.testcontainers.solace.SolaceContainer; @@ -36,23 +38,20 @@ public class SolaceContainerManager { public static final String TOPIC_NAME = "test_topic"; private static final Logger LOG = LoggerFactory.getLogger(SolaceContainerManager.class); private final SolaceContainer container; + int jcsmpPortMapped = findAvailablePort(); + int sempPortMapped = findAvailablePort(); - public SolaceContainerManager() { + public SolaceContainerManager() throws IOException { this.container = new SolaceContainer(DockerImageName.parse("solace/solace-pubsub-standard:10.7")) { { - addFixedExposedPort(55555, 55555); - addFixedExposedPort(9000, 9000); - addFixedExposedPort(8080, 8080); - addFixedExposedPort(80, 80); + addFixedExposedPort(jcsmpPortMapped, 55555); + addFixedExposedPort(sempPortMapped, 8080); } }.withVpn(VPN_NAME) .withCredentials(USERNAME, PASSWORD) - // .withExposedPorts(Service.SMF.getPort()); .withTopic(TOPIC_NAME, Service.SMF) .withLogConsumer(new Slf4jLogConsumer(LOG)); - container.addExposedPort(8080); - container.addExposedPort(55555); } public void start() { @@ -60,6 +59,13 @@ public void start() { } void createQueueWithSubscriptionTopic(String queueName) { + executeCommand( + "curl", + "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/topicEndpoints", + "-X", + "GET", + "-u", + "admin:admin"); executeCommand( "curl", "http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/topicEndpoints", @@ -105,8 +111,7 @@ void createQueueWithSubscriptionTopic(String queueName) { private void executeCommand(String... command) { try { - org.testcontainers.containers.Container.ExecResult execResult = - container.execInContainer(command); + ExecResult execResult = container.execInContainer(command); if (execResult.getExitCode() != 0) { logCommandError(execResult.getStderr(), command); } else { @@ -165,4 +170,19 @@ public void sendToTopic(String payload, List additionalHeaders) { executeCommand(command.toArray(new String[0])); } + + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java index 7dcd1829609a..35ee7595352d 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; +import java.io.IOException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.solace.SolaceIO; import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; @@ -56,7 +57,7 @@ public class SolaceIOIT { @Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions); @BeforeClass - public static void setup() { + public static void setup() throws IOException { solaceContainerManager = new SolaceContainerManager(); solaceContainerManager.start(); } @@ -89,14 +90,14 @@ public void testRead() { .withMaxNumConnections(1) .withSempClientFactory( BasicAuthSempClientFactory.builder() - .host("http://localhost:8080") + .host("http://localhost:" + solaceContainerManager.sempPortMapped) .username("admin") .password("admin") .vpnName(SolaceContainerManager.VPN_NAME) .build()) .withSessionServiceFactory( BasicAuthJcsmpSessionServiceFactory.builder() - .host("localhost") + .host("localhost:" + solaceContainerManager.jcsmpPortMapped) .username(SolaceContainerManager.USERNAME) .password(SolaceContainerManager.PASSWORD) .vpnName(SolaceContainerManager.VPN_NAME) From 6f3d351975cd463d45a4a5317e78522e57b74d4f Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 9 Jul 2024 14:04:30 +0000 Subject: [PATCH 3/3] Add info flag to gradle integrationTest command for extensive info --- .github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml b/.github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml index 40a533b42a3b..5aeaaec11dec 100644 --- a/.github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml +++ b/.github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml @@ -95,7 +95,7 @@ jobs: - name: run Solace IO IT script uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :sdks:java:io:solace:integrationTest + gradle-command: :sdks:java:io:solace:integrationTest --info arguments: | -PdisableSpotlessCheck=true \ -PdisableCheckStyle=true \