Skip to content

Commit

Permalink
Solace Read connector: integration tests with testcontainers (#31543)
Browse files Browse the repository at this point in the history
* Add integration tests

* Use random exposed ports for testcontainers

* Add info flag to gradle integrationTest command for extensive info
  • Loading branch information
bzablocki committed Jul 9, 2024
1 parent 1db2373 commit 88aa253
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/beam_PreCommit_Java_Solace_IO_Direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
- name: run Solace IO IT script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:solace:integrationTest
gradle-command: :sdks:java:io:solace:integrationTest --info
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ class BeamModulePlugin implements Plugin<Project> {
testcontainers_oracle : "org.testcontainers:oracle-xe:$testcontainers_version",
testcontainers_postgresql : "org.testcontainers:postgresql:$testcontainers_version",
testcontainers_rabbitmq : "org.testcontainers:rabbitmq:$testcontainers_version",
testcontainers_solace : "org.testcontainers:solace:$testcontainers_version",
truth : "com.google.truth:truth:1.1.5",
threetenbp : "org.threeten:threetenbp:1.6.8",
vendored_grpc_1_60_1 : "org.apache.beam:beam-vendor-grpc-1_60_1:0.2",
Expand Down
2 changes: 2 additions & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies {
implementation library.java.avro
permitUnusedDeclared library.java.avro
implementation library.java.google_api_common
implementation library.java.threetenbp
implementation library.java.gax
implementation library.java.threetenbp
implementation library.java.google_http_client
Expand All @@ -52,5 +53,6 @@ dependencies {
testImplementation project(path: ":sdks:java:io:common")
testImplementation project(path: ":sdks:java:testing:test-utils")
testRuntimeOnly library.java.slf4j_jdk14
testImplementation library.java.testcontainers_solace
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.it;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.solace.Service;
import org.testcontainers.solace.SolaceContainer;
import org.testcontainers.utility.DockerImageName;

public class SolaceContainerManager {

public static final String VPN_NAME = "default";
public static final String PASSWORD = "password";
public static final String USERNAME = "username";
public static final String TOPIC_NAME = "test_topic";
private static final Logger LOG = LoggerFactory.getLogger(SolaceContainerManager.class);
private final SolaceContainer container;
int jcsmpPortMapped = findAvailablePort();
int sempPortMapped = findAvailablePort();

public SolaceContainerManager() throws IOException {
this.container =
new SolaceContainer(DockerImageName.parse("solace/solace-pubsub-standard:10.7")) {
{
addFixedExposedPort(jcsmpPortMapped, 55555);
addFixedExposedPort(sempPortMapped, 8080);
}
}.withVpn(VPN_NAME)
.withCredentials(USERNAME, PASSWORD)
.withTopic(TOPIC_NAME, Service.SMF)
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public void start() {
container.start();
}

void createQueueWithSubscriptionTopic(String queueName) {
executeCommand(
"curl",
"http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/topicEndpoints",
"-X",
"GET",
"-u",
"admin:admin");
executeCommand(
"curl",
"http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/topicEndpoints",
"-X",
"POST",
"-u",
"admin:admin",
"-H",
"Content-Type:application/json",
"-d",
"{\"topicEndpointName\":\""
+ TOPIC_NAME
+ "\",\"accessType\":\"exclusive\",\"permission\":\"modify-topic\",\"ingressEnabled\":true,\"egressEnabled\":true}");
executeCommand(
"curl",
"http://localhost:8080/SEMP/v2/config/msgVpns/" + VPN_NAME + "/queues",
"-X",
"POST",
"-u",
"admin:admin",
"-H",
"Content-Type:application/json",
"-d",
"{\"queueName\":\""
+ queueName
+ "\",\"accessType\":\"non-exclusive\",\"maxMsgSpoolUsage\":200,\"permission\":\"consume\",\"ingressEnabled\":true,\"egressEnabled\":true}");
executeCommand(
"curl",
"http://localhost:8080/SEMP/v2/config/msgVpns/"
+ VPN_NAME
+ "/queues/"
+ queueName
+ "/subscriptions",
"-X",
"POST",
"-u",
"admin:admin",
"-H",
"Content-Type:application/json",
"-d",
"{\"subscriptionTopic\":\"" + TOPIC_NAME + "\"}");
}

private void executeCommand(String... command) {
try {
ExecResult execResult = container.execInContainer(command);
if (execResult.getExitCode() != 0) {
logCommandError(execResult.getStderr(), command);
} else {
LOG.info(execResult.getStdout());
}
} catch (IOException | InterruptedException e) {
logCommandError(e.getMessage(), command);
}
}

private void logCommandError(String error, String... command) {
LOG.error("Could not execute command {}: {}", command, error);
}

public void stop() {
if (container != null) {
container.stop();
}
}

public void getQueueDetails(String queueName) {
executeCommand(
"curl",
"http://localhost:8080/SEMP/v2/monitor/msgVpns/"
+ VPN_NAME
+ "/queues/"
+ queueName
+ "/msgs",
"-X",
"GET",
"-u",
"admin:admin");
}

public void sendToTopic(String payload, List<String> additionalHeaders) {
// https://docs.solace.com/API/RESTMessagingPrtl/Solace-REST-Message-Encoding.htm

List<String> command =
new ArrayList<>(
Arrays.asList(
"curl",
"http://localhost:9000/TOPIC/" + TOPIC_NAME,
"-X",
"POST",
"-u",
USERNAME + ":" + PASSWORD,
"--header",
"Content-Type:application/json",
"-d",
payload));

for (String additionalHeader : additionalHeaders) {
command.add("--header");
command.add(additionalHeader);
}

executeCommand(command.toArray(new String[0]));
}

private static int findAvailablePort() throws IOException {
ServerSocket s = new ServerSocket(0);
try {
return s.getLocalPort();
} finally {
s.close();
try {
// Some systems don't free the port for future use immediately.
Thread.sleep(100);
} catch (InterruptedException exn) {
// ignore
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.it;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.solace.SolaceIO;
import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory;
import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory;
import org.apache.beam.sdk.io.solace.data.Solace.Queue;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

public class SolaceIOIT {
private static final String NAMESPACE = SolaceIOIT.class.getName();
private static final String READ_COUNT = "read_count";
private static SolaceContainerManager solaceContainerManager;
private static final TestPipelineOptions readPipelineOptions;

static {
readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class);
readPipelineOptions.setBlockOnRun(false);
readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false);
readPipelineOptions.as(StreamingOptions.class).setStreaming(false);
}

@Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions);

@BeforeClass
public static void setup() throws IOException {
solaceContainerManager = new SolaceContainerManager();
solaceContainerManager.start();
}

@AfterClass
public static void afterClass() {
if (solaceContainerManager != null) {
solaceContainerManager.stop();
}
}

@Test
public void testRead() {
String queueName = "test_queue";
solaceContainerManager.createQueueWithSubscriptionTopic(queueName);

// todo this is very slow, needs to be replaced with the SolaceIO.write connector.
int publishMessagesCount = 20;
for (int i = 0; i < publishMessagesCount; i++) {
solaceContainerManager.sendToTopic(
"{\"field_str\":\"value\",\"field_int\":123}",
ImmutableList.of("Solace-Message-ID:m" + i));
}

readPipeline
.apply(
"Read from Solace",
SolaceIO.read()
.from(Queue.fromName(queueName))
.withMaxNumConnections(1)
.withSempClientFactory(
BasicAuthSempClientFactory.builder()
.host("http://localhost:" + solaceContainerManager.sempPortMapped)
.username("admin")
.password("admin")
.vpnName(SolaceContainerManager.VPN_NAME)
.build())
.withSessionServiceFactory(
BasicAuthJcsmpSessionServiceFactory.builder()
.host("localhost:" + solaceContainerManager.jcsmpPortMapped)
.username(SolaceContainerManager.USERNAME)
.password(SolaceContainerManager.PASSWORD)
.vpnName(SolaceContainerManager.VPN_NAME)
.build()))
.apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT)));

PipelineResult pipelineResult = readPipeline.run();
pipelineResult.waitUntilFinish(Duration.standardSeconds(15));

MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE);
long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT);
assertEquals(publishMessagesCount, actualRecordsCount);
}

private static class CountingFn<T> extends DoFn<T, T> {

private final Counter elementCounter;

CountingFn(String namespace, String name) {
elementCounter = Metrics.counter(namespace, name);
}

@ProcessElement
public void processElement(@Element T record, OutputReceiver<T> c) {
elementCounter.inc(1L);
c.output(record);
}
}
}

0 comments on commit 88aa253

Please sign in to comment.