diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index c317b5666189..506145f35290 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -35,8 +35,16 @@ dependencies { implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.solace + implementation library.java.vendored_guava_32_1_2_jre implementation project(":sdks:java:extensions:avro") implementation library.java.vendored_grpc_1_60_1 implementation library.java.avro permitUnusedDeclared library.java.avro + implementation library.java.vendored_grpc_1_60_1 + + testImplementation library.java.junit + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") + testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration") + testRuntimeOnly library.java.slf4j_jdk14 + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index ca8cd615ac68..e6b0dd34b184 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -17,26 +17,196 @@ */ package org.apache.beam.sdk.io.solace; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.Queue; import com.solacesystems.jcsmp.Topic; +import java.io.IOException; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.solace.broker.SempClientFactory; +import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper; +import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * A {@link PTransform} to read and write from/to Solace event + * broker. + * + *

Note: Internal use only; this API is beta and subject to change. + * + *

Reading from Solace

+ * + * To read from Solace, use the {@link SolaceIO#read()} or {@link SolaceIO#read(TypeDescriptor, + * SerializableFunction, SerializableFunction)}. + * + *

No-argument {@link SolaceIO#read()} top-level method

+ * + *

This method returns a PCollection of {@link Solace.Record} objects. It uses a default mapper + * ({@link SolaceRecordMapper#map(BytesXMLMessage)}) to map from the received {@link + * BytesXMLMessage} from Solace, to the {@link Solace.Record} objects. + * + *

By default, it also uses a {@link BytesXMLMessage#getSenderTimestamp()} for watermark + * estimation. This {@link SerializableFunction} can be overridden with {@link + * Read#withTimestampFn(SerializableFunction)} method. + * + *

When using this method, the Coders are inferred automatically. + * + *

Advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction, SerializableFunction)} + * top-level method

+ * + *

With this method, the user can: + * + *

+ * + *

Reading from a queue ({@link Read#from(Solace.Queue)}} or a topic ({@link + * Read#from(Solace.Topic)})

+ * + *

Regardless of the top-level read method choice, the user can specify whether to read from a + * Queue - {@link Read#from(Solace.Queue)}, or a Topic {@link Read#from(Solace.Topic)}. + * + *

Note: when a user specifies to read from a Topic, the connector will create a matching Queue + * and a Subscription. The user must ensure that the SEMP API is reachable from the driver program + * and must provide credentials that have `write` permission to the SEMP Config API. The created Queue + * will be non-exclusive. The Queue will not be deleted when the pipeline is terminated. + * + *

Note: If the user specifies to read from a Queue, the driver program + * will execute a call to the SEMP API to check if the Queue is `exclusive` or `non-exclusive`. The + * user must ensure that the SEMP API is reachable from the driver program and provide credentials + * with `read` permission to the {@link Read#withSempClientFactory(SempClientFactory)}. + * + *

Usage example

+ * + *

The no-arg {@link SolaceIO#read()} method

+ * + *

The minimal example - reading from an existing Queue, using the no-arg {@link SolaceIO#read()} + * method, with all the default configuration options. + * + *

{@code
+ * PCollection events =
+ *   pipeline.apply(
+ *     SolaceIO.read()
+ *         .from(Queue.fromName("your-queue-name"))
+ *         .withSempClientFactory(
+ *                 BasicAuthSempClientFactory.builder()
+ *                         .host("your-host-name-with-protocol") // e.g. "http://12.34.56.78:8080"
+ *                         .username("semp-username")
+ *                         .password("semp-password")
+ *                         .vpnName("vpn-name")
+ *                         .build())
+ *         .withSessionServiceFactory(
+ *                 BasicAuthJcsmpSessionServiceFactory.builder()
+ *                         .host("your-host-name")
+ *                               // e.g. "12.34.56.78", or "[fe80::1]", or "12.34.56.78:4444"
+ *                         .username("username")
+ *                         .password("password")
+ *                         .vpnName("vpn-name")
+ *                         .build()));
+ * }
+ * + *

The advanced {@link SolaceIO#read(TypeDescriptor, SerializableFunction, + * SerializableFunction)} method

+ * + *

When using this method you can specify a custom output PCollection type and a custom timestamp + * function. + * + *

{@code
+ * @DefaultSchema(JavaBeanSchema.class)
+ * public static class SimpleRecord {
+ *    public String payload;
+ *    public String messageId;
+ *    public Instant timestamp;
+ *
+ *    public SimpleRecord() {}
+ *
+ *    public SimpleRecord(String payload, String messageId, Instant timestamp) {
+ *        this.payload = payload;
+ *        this.messageId = messageId;
+ *        this.timestamp = timestamp;
+ *    }
+ * }
+ *
+ * private static SimpleRecord toSimpleRecord(BytesXMLMessage record) {
+ *    if (record == null) {
+ *        return null;
+ *    }
+ *    return new SimpleRecord(
+ *            new String(record.getBytes(), StandardCharsets.UTF_8),
+ *            record.getApplicationMessageId(),
+ *            record.getSenderTimestamp() != null
+ *                    ? Instant.ofEpochMilli(record.getSenderTimestamp())
+ *                    : Instant.now());
+ * }
+ *
+ * PCollection events =
+ *  pipeline.apply(
+ *      SolaceIO.read(
+ *                      TypeDescriptor.of(SimpleRecord.class),
+ *                      record -> toSimpleRecord(record),
+ *                      record -> record.timestamp)
+ *              .from(Topic.fromName("your-topic-name"))
+ *              .withSempClientFactory(...)
+ *              .withSessionServiceFactory(...);
+ *
+ *
+ * }
+ * + *

Authentication

+ * + *

When reading from Solace, the user must use {@link + * Read#withSessionServiceFactory(SessionServiceFactory)} to create a JCSMP session and {@link + * Read#withSempClientFactory(SempClientFactory)} to authenticate to the SEMP API. + * + *

See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for session authentication. + * The connector provides implementation of the {@link SessionServiceFactory} using the Basic + * Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}. + * + *

For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)}) + * the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to + * authenticate using the Basic Authentication. + */ +@Internal public class SolaceIO { + public static final SerializableFunction SENDER_TIMESTAMP_FUNCTION = + (record) -> { + Long senderTimestamp = record != null ? record.getSenderTimestamp() : null; + if (senderTimestamp != null) { + return Instant.ofEpochMilli(senderTimestamp); + } else { + return Instant.now(); + } + }; private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; /** Get a {@link Topic} object from the topic name. */ @@ -49,17 +219,84 @@ static Queue queueFromName(String queueName) { return JCSMPFactory.onlyInstance().createQueue(queueName); } - @AutoValue - public abstract static class Read extends PTransform> { + /** + * Convert to a JCSMP destination from a schema-enabled {@link + * org.apache.beam.sdk.io.solace.data.Solace.Destination}. + * + *

This method returns a {@link Destination}, which may be either a {@link Topic} or a {@link + * Queue} + */ + public static Destination convertToJcsmpDestination(Solace.Destination destination) { + if (destination.getType().equals(Solace.DestinationType.TOPIC)) { + return topicFromName(checkNotNull(destination.getName())); + } else if (destination.getType().equals(Solace.DestinationType.QUEUE)) { + return queueFromName(checkNotNull(destination.getName())); + } else { + throw new IllegalArgumentException( + "SolaceIO.Write: Unknown destination type: " + destination.getType()); + } + } + + /** + * Create a {@link Read} transform, to read from Solace. The ingested records will be mapped to + * the {@link Solace.Record} objects. + */ + public static Read read() { + return new Read( + Read.Configuration.builder() + .setTypeDescriptor(TypeDescriptor.of(Solace.Record.class)) + .setParseFn(SolaceRecordMapper::map) + .setTimestampFn(SENDER_TIMESTAMP_FUNCTION) + .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)); + } + /** + * Create a {@link Read} transform, to read from Solace. Specify a {@link SerializableFunction} to + * map incoming {@link BytesXMLMessage} records, to the object of your choice. You also need to + * specify a {@link TypeDescriptor} for your class and the timestamp function which returns an + * {@link Instant} from the record. + * + *

The type descriptor will be used to infer a coder from CoderRegistry or Schema Registry. You + * can initialize a new TypeDescriptor in the following manner: + * + *

{@code
+   * TypeDescriptor typeDescriptor = TypeDescriptor.of(YourOutputType.class);
+   * }
+ */ + public static Read read( + TypeDescriptor typeDescriptor, + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn, + SerializableFunction timestampFn) { + checkState(typeDescriptor != null, "SolaceIO.Read: typeDescriptor must not be null"); + checkState(parseFn != null, "SolaceIO.Read: parseFn must not be null"); + checkState(timestampFn != null, "SolaceIO.Read: timestampFn must not be null"); + return new Read( + Read.Configuration.builder() + .setTypeDescriptor(typeDescriptor) + .setParseFn(parseFn) + .setTimestampFn(timestampFn) + .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)); + } + + public static class Read extends PTransform> { + + private static final Logger LOG = LoggerFactory.getLogger(Read.class); + + @VisibleForTesting final Configuration.Builder configurationBuilder; + + public Read(Configuration.Builder configurationBuilder) { + this.configurationBuilder = configurationBuilder; + } /** Set the queue name to read from. Use this or the `from(Topic)` method. */ public Read from(Solace.Queue queue) { - return toBuilder().setQueue(queueFromName(queue.getName())).build(); + configurationBuilder.setQueue(queueFromName(queue.getName())); + return this; } /** Set the topic name to read from. Use this or the `from(Queue)` method. */ public Read from(Solace.Topic topic) { - return toBuilder().setTopic(topicFromName(topic.getName())).build(); + configurationBuilder.setTopic(topicFromName(topic.getName())); + return this; } /** @@ -74,9 +311,10 @@ public Read from(Solace.Topic topic) { public Read withTimestampFn(SerializableFunction timestampFn) { checkState( timestampFn != null, - "SolaceIO.Read: timestamp function must be set or use the" - + " `Read.readSolaceRecords()` method"); - return toBuilder().setTimestampFn(timestampFn).build(); + "SolaceIO.Read: timestampFn must not be null. This function must be set or " + + "use the no-argument `Read.read()` method"); + configurationBuilder.setTimestampFn(timestampFn); + return this; } /** @@ -87,7 +325,8 @@ public Read withTimestampFn(SerializableFunction timestampFn) { * `desiredNumberOfSplits` is set by the runner. */ public Read withMaxNumConnections(Integer maxNumConnections) { - return toBuilder().setMaxNumConnections(maxNumConnections).build(); + configurationBuilder.setMaxNumConnections(maxNumConnections); + return this; } /** @@ -97,7 +336,8 @@ public Read withMaxNumConnections(Integer maxNumConnections) { * which is always set by Solace. */ public Read withDeduplicateRecords(boolean deduplicateRecords) { - return toBuilder().setDeduplicateRecords(deduplicateRecords).build(); + configurationBuilder.setDeduplicateRecords(deduplicateRecords); + return this; } /** @@ -134,7 +374,8 @@ public Read withDeduplicateRecords(boolean deduplicateRecords) { */ public Read withSempClientFactory(SempClientFactory sempClientFactory) { checkState(sempClientFactory != null, "SolaceIO.Read: sempClientFactory must not be null."); - return toBuilder().setSempClientFactory(sempClientFactory).build(); + configurationBuilder.setSempClientFactory(sempClientFactory); + return this; } /** @@ -175,63 +416,157 @@ public Read withSempClientFactory(SempClientFactory sempClientFactory) { public Read withSessionServiceFactory(SessionServiceFactory sessionServiceFactory) { checkState( sessionServiceFactory != null, "SolaceIO.Read: sessionServiceFactory must not be null."); - return toBuilder().setSessionServiceFactory(sessionServiceFactory).build(); + configurationBuilder.setSessionServiceFactory(sessionServiceFactory); + return this; } - abstract @Nullable Queue getQueue(); + @AutoValue + abstract static class Configuration { - abstract @Nullable Topic getTopic(); + abstract @Nullable Queue getQueue(); - abstract @Nullable SerializableFunction getTimestampFn(); + abstract @Nullable Topic getTopic(); - abstract @Nullable Integer getMaxNumConnections(); + abstract SerializableFunction getTimestampFn(); - abstract boolean getDeduplicateRecords(); + abstract @Nullable Integer getMaxNumConnections(); - abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn(); + abstract boolean getDeduplicateRecords(); - abstract @Nullable SempClientFactory getSempClientFactory(); + abstract SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn(); - abstract @Nullable SessionServiceFactory getSessionServiceFactory(); + abstract SempClientFactory getSempClientFactory(); - abstract TypeDescriptor getTypeDescriptor(); + abstract SessionServiceFactory getSessionServiceFactory(); - public static Builder builder() { - Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read.Builder(); - builder.setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS); - return builder; - } + abstract TypeDescriptor getTypeDescriptor(); - abstract Builder toBuilder(); + public static Builder builder() { + Builder builder = + new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder(); + return builder; + } - @AutoValue.Builder - public abstract static class Builder { + @AutoValue.Builder + public abstract static class Builder { - abstract Builder setQueue(Queue queue); + abstract Builder setQueue(Queue queue); - abstract Builder setTopic(Topic topic); + abstract Builder setTopic(Topic topic); - abstract Builder setTimestampFn(SerializableFunction timestampFn); + abstract Builder setTimestampFn(SerializableFunction timestampFn); - abstract Builder setMaxNumConnections(Integer maxNumConnections); + abstract Builder setMaxNumConnections(Integer maxNumConnections); - abstract Builder setDeduplicateRecords(boolean deduplicateRecords); + abstract Builder setDeduplicateRecords(boolean deduplicateRecords); - abstract Builder setParseFn( - SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn); + abstract Builder setParseFn( + SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn); - abstract Builder setSempClientFactory(SempClientFactory brokerServiceFactory); + abstract Builder setSempClientFactory(SempClientFactory brokerServiceFactory); - abstract Builder setSessionServiceFactory(SessionServiceFactory sessionServiceFactory); + abstract Builder setSessionServiceFactory(SessionServiceFactory sessionServiceFactory); - abstract Builder setTypeDescriptor(TypeDescriptor typeDescriptor); + abstract Builder setTypeDescriptor(TypeDescriptor typeDescriptor); - abstract Read build(); + abstract Configuration build(); + } + } + + @Override + public void validate(@Nullable PipelineOptions options) { + Configuration configuration = configurationBuilder.build(); + checkState( + (configuration.getQueue() == null ^ configuration.getTopic() == null), + "SolaceIO.Read: One of the Solace {Queue, Topic} must be set."); } @Override public PCollection expand(PBegin input) { - throw new UnsupportedOperationException(""); + Configuration configuration = configurationBuilder.build(); + SempClientFactory sempClientFactory = configuration.getSempClientFactory(); + String jobName = input.getPipeline().getOptions().getJobName(); + Queue initializedQueue = + initializeQueueForTopicIfNeeded( + configuration.getQueue(), configuration.getTopic(), jobName, sempClientFactory); + + SessionServiceFactory sessionServiceFactory = configuration.getSessionServiceFactory(); + sessionServiceFactory.setQueue(initializedQueue); + + Coder coder = inferCoder(input.getPipeline(), configuration.getTypeDescriptor()); + + return input.apply( + org.apache.beam.sdk.io.Read.from( + new UnboundedSolaceSource<>( + initializedQueue, + sempClientFactory, + sessionServiceFactory, + configuration.getMaxNumConnections(), + configuration.getDeduplicateRecords(), + coder, + configuration.getTimestampFn(), + configuration.getParseFn()))); + } + + @VisibleForTesting + Coder inferCoder(Pipeline pipeline, TypeDescriptor typeDescriptor) { + Coder coderFromCoderRegistry = getFromCoderRegistry(pipeline, typeDescriptor); + if (coderFromCoderRegistry != null) { + return coderFromCoderRegistry; + } + + Coder coderFromSchemaRegistry = getFromSchemaRegistry(pipeline, typeDescriptor); + if (coderFromSchemaRegistry != null) { + return coderFromSchemaRegistry; + } + + throw new RuntimeException( + "SolaceIO.Read: Cannot infer a coder for the TypeDescriptor. Annotate your" + + " output class with @DefaultSchema annotation or create a coder manually" + + " and register it in the CoderRegistry."); + } + + private @Nullable Coder getFromSchemaRegistry( + Pipeline pipeline, TypeDescriptor typeDescriptor) { + try { + return pipeline.getSchemaRegistry().getSchemaCoder(typeDescriptor); + } catch (NoSuchSchemaException e) { + return null; + } + } + + private @Nullable Coder getFromCoderRegistry( + Pipeline pipeline, TypeDescriptor typeDescriptor) { + try { + return pipeline.getCoderRegistry().getCoder(typeDescriptor); + } catch (CannotProvideCoderException e) { + return null; + } + } + + private Queue initializeQueueForTopicIfNeeded( + @Nullable Queue queue, + @Nullable Topic topic, + String jobName, + SempClientFactory sempClientFactory) { + Queue initializedQueue; + if (queue != null) { + return queue; + } else { + String queueName = String.format("queue-%s-%s", topic, jobName); + try { + String topicName = checkNotNull(topic).getName(); + initializedQueue = sempClientFactory.create().createQueueForTopic(queueName, topicName); + LOG.warn( + "SolaceIO.Read: A new queue {} was created. The Queue will not be" + + " deleted when this job finishes. Make sure to remove it yourself" + + " when not needed.", + initializedQueue.getName()); + return initializedQueue; + } catch (IOException e) { + throw new RuntimeException(e); + } + } } } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java index 9b4ef99eba77..7d1dee7a1187 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java @@ -17,7 +17,9 @@ */ package org.apache.beam.sdk.io.solace.broker; +import com.solacesystems.jcsmp.Queue; import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.Nullable; /** * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a @@ -25,9 +27,26 @@ */ public abstract class SessionServiceFactory implements Serializable { + /** + * A reference to a Queue object. This is set when the pipline is constructed (in the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method). + * This could be used to associate the created SessionService with a specific queue for message + * handling. + */ + @Nullable Queue queue; + /** * This is the core method that subclasses must implement. It defines how to construct and return * a SessionService object. */ public abstract SessionService create(); + + /** + * This method is called in the {@link + * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method + * to set the Queue reference. + */ + public void setQueue(Queue queue) { + this.queue = queue; + } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java index 79057445a4e4..97d688bff23f 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -21,6 +21,8 @@ import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.ByteArrayOutputStream; import java.io.IOException; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -74,6 +76,7 @@ public enum DestinationType { /** Represents a Solace message destination (either a Topic or a Queue). */ @AutoValue + @DefaultSchema(AutoValueSchema.class) public abstract static class Destination { /** * Gets the name of the destination. @@ -105,6 +108,7 @@ abstract static class Builder { /** Represents a Solace message record with its associated metadata. */ @AutoValue + @DefaultSchema(AutoValueSchema.class) public abstract static class Record { /** * Gets the unique identifier of the message, a string for an application-specific message diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java new file mode 100644 index 000000000000..285c1cb8a7e8 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import org.apache.beam.sdk.io.solace.broker.MessageReceiver; +import org.apache.beam.sdk.io.solace.broker.SessionService; + +public class MockEmptySessionService implements SessionService { + + String exceptionMessage = "This is an empty client, use a MockSessionService instead."; + + @Override + public void close() { + throw new UnsupportedOperationException(exceptionMessage); + } + + @Override + public boolean isClosed() { + throw new UnsupportedOperationException(exceptionMessage); + } + + @Override + public MessageReceiver createReceiver() { + throw new UnsupportedOperationException(exceptionMessage); + } + + @Override + public void connect() { + throw new UnsupportedOperationException(exceptionMessage); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java new file mode 100644 index 000000000000..d4703237371a --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClient.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.Queue; +import java.io.IOException; +import org.apache.beam.sdk.io.solace.broker.SempClient; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class MockSempClient implements SempClient { + + private final SerializableFunction isQueueNonExclusiveFn; + private final SerializableFunction getBacklogBytesFn; + private final SerializableFunction createQueueForTopicFn; + + private MockSempClient( + SerializableFunction isQueueNonExclusiveFn, + SerializableFunction getBacklogBytesFn, + SerializableFunction createQueueForTopicFn) { + this.isQueueNonExclusiveFn = isQueueNonExclusiveFn; + this.getBacklogBytesFn = getBacklogBytesFn; + this.createQueueForTopicFn = createQueueForTopicFn; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private SerializableFunction isQueueNonExclusiveFn = (queueName) -> true; + private SerializableFunction getBacklogBytesFn = (queueName) -> 0L; + private SerializableFunction createQueueForTopicFn = (queueName) -> 0; + + public Builder setIsQueueNonExclusiveFn( + SerializableFunction isQueueNonExclusiveFn) { + this.isQueueNonExclusiveFn = isQueueNonExclusiveFn; + return this; + } + + public Builder setGetBacklogBytesFn(SerializableFunction getBacklogBytesFn) { + this.getBacklogBytesFn = getBacklogBytesFn; + return this; + } + + public Builder setCreateQueueForTopicFn( + SerializableFunction createQueueForTopicFn) { + this.createQueueForTopicFn = createQueueForTopicFn; + return this; + } + + public MockSempClient build() { + return new MockSempClient(isQueueNonExclusiveFn, getBacklogBytesFn, createQueueForTopicFn); + } + } + + @Override + public boolean isQueueNonExclusive(String queueName) throws IOException { + return isQueueNonExclusiveFn.apply(queueName); + } + + @Override + public Queue createQueueForTopic(String queueName, String topicName) throws IOException { + createQueueForTopicFn.apply(queueName); + return JCSMPFactory.onlyInstance().createQueue(queueName); + } + + @Override + public long getBacklogBytes(String queueName) throws IOException { + return getBacklogBytesFn.apply(queueName); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java new file mode 100644 index 000000000000..3e64c3d9bfe1 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSempClientFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import org.apache.beam.sdk.io.solace.broker.SempClient; +import org.apache.beam.sdk.io.solace.broker.SempClientFactory; + +public class MockSempClientFactory implements SempClientFactory { + SempClient sempClient; + + public MockSempClientFactory(SempClient sempClient) { + this.sempClient = sempClient; + } + + public static SempClientFactory getDefaultMock() { + return new MockSempClientFactory(MockSempClient.builder().build()); + } + + @Override + public SempClient create() { + return sempClient; + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java new file mode 100644 index 000000000000..7b14da138c64 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.io.solace.broker.MessageReceiver; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class MockSessionService implements SessionService { + + private final SerializableFunction getRecordFn; + private MessageReceiver messageReceiver = null; + private final int minMessagesReceived; + + public MockSessionService( + SerializableFunction getRecordFn, int minMessagesReceived) { + this.getRecordFn = getRecordFn; + this.minMessagesReceived = minMessagesReceived; + } + + @Override + public void close() {} + + @Override + public boolean isClosed() { + return false; + } + + @Override + public MessageReceiver createReceiver() { + if (messageReceiver == null) { + messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived); + } + return messageReceiver; + } + + @Override + public void connect() {} + + public static class MockReceiver implements MessageReceiver, Serializable { + private final AtomicInteger counter = new AtomicInteger(); + private final SerializableFunction getRecordFn; + private final int minMessagesReceived; + + public MockReceiver( + SerializableFunction getRecordFn, int minMessagesReceived) { + this.getRecordFn = getRecordFn; + this.minMessagesReceived = minMessagesReceived; + } + + @Override + public void start() {} + + @Override + public boolean isClosed() { + return false; + } + + @Override + public BytesXMLMessage receive() throws IOException { + return getRecordFn.apply(counter.getAndIncrement()); + } + + @Override + public boolean isEOF() { + return counter.get() >= minMessagesReceived; + } + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java new file mode 100644 index 000000000000..603a30ad2c90 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; + +public class MockSessionServiceFactory extends SessionServiceFactory { + SessionService sessionService; + + public MockSessionServiceFactory(SessionService clientService) { + this.sessionService = clientService; + } + + public static SessionServiceFactory getDefaultMock() { + return new MockSessionServiceFactory(new MockEmptySessionService()); + } + + @Override + public SessionService create() { + return sessionService; + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java new file mode 100644 index 000000000000..bd9d5d401b54 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.Topic; +import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.io.solace.SolaceIO.Read; +import org.apache.beam.sdk.io.solace.SolaceIO.Read.Configuration; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils.SimpleRecord; +import org.apache.beam.sdk.io.solace.read.SolaceCheckpointMark; +import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SolaceIOTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private Read getDefaultRead() { + return SolaceIO.read() + .from(Solace.Queue.fromName("queue")) + .withSempClientFactory(MockSempClientFactory.getDefaultMock()) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()) + .withMaxNumConnections(1); + } + + private Read getDefaultReadForTopic() { + return SolaceIO.read() + .from(Solace.Topic.fromName("topic")) + .withSempClientFactory(MockSempClientFactory.getDefaultMock()) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()) + .withMaxNumConnections(1); + } + + private static BytesXMLMessage getOrNull(Integer index, List messages) { + return index != null && index < messages.size() ? messages.get(index) : null; + } + + private static UnboundedSolaceSource getSource(Read spec, TestPipeline pipeline) { + Configuration configuration = spec.configurationBuilder.build(); + return new UnboundedSolaceSource<>( + configuration.getQueue(), + configuration.getSempClientFactory(), + configuration.getSessionServiceFactory(), + configuration.getMaxNumConnections(), + configuration.getDeduplicateRecords(), + spec.inferCoder(pipeline, configuration.getTypeDescriptor()), + configuration.getTimestampFn(), + configuration.getParseFn()); + } + + @Test + public void testReadMessages() { + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }, + 3); + + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + // Expected data + List expected = new ArrayList<>(); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450")); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451")); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test2", "452")); + + // Run the pipeline + PCollection events = + pipeline.apply( + "Read from Solace", + getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory)); + + // Assert results + PAssert.that(events).containsInAnyOrder(expected); + pipeline.run(); + } + + @Test + public void testReadMessagesWithDeduplication() { + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); + return getOrNull(index, messages); + }, + 3); + + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + // Expected data + List expected = new ArrayList<>(); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450")); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451")); + + // Run the pipeline + PCollection events = + pipeline.apply( + "Read from Solace", + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withDeduplicateRecords(true)); + // Assert results + PAssert.that(events).containsInAnyOrder(expected); + pipeline.run(); + } + + @Test + public void testReadMessagesWithoutDeduplication() { + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); + return getOrNull(index, messages); + }, + 3); + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + // Expected data + List expected = new ArrayList<>(); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test0", "450")); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test1", "451")); + expected.add(SolaceDataUtils.getSolaceRecord("payload_test2", "451")); + + // Run the pipeline + + PCollection events = + pipeline.apply( + "Read from Solace", + getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory)); + // Assert results + PAssert.that(events).containsInAnyOrder(expected); + pipeline.run(); + } + + @Test + public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() { + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage( + "payload_test0", null, null, new ReplicationGroupMessageIdImpl(2L, 1L)), + SolaceDataUtils.getBytesXmlMessage( + "payload_test1", null, null, new ReplicationGroupMessageIdImpl(2L, 2L)), + SolaceDataUtils.getBytesXmlMessage( + "payload_test2", null, null, new ReplicationGroupMessageIdImpl(2L, 2L))); + return getOrNull(index, messages); + }, + 3); + + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + // Expected data + List expected = new ArrayList<>(); + expected.add( + SolaceDataUtils.getSolaceRecord( + "payload_test0", null, new ReplicationGroupMessageIdImpl(2L, 1L))); + expected.add( + SolaceDataUtils.getSolaceRecord( + "payload_test1", null, new ReplicationGroupMessageIdImpl(2L, 2L))); + + // Run the pipeline + PCollection events = + pipeline.apply( + "Read from Solace", + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withDeduplicateRecords(true)); + // Assert results + PAssert.that(events).containsInAnyOrder(expected); + pipeline.run(); + } + + @Test + public void testReadWithCoderAndParseFnAndTimestampFn() { + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }, + 3); + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + // Expected data + List expected = new ArrayList<>(); + expected.add(new SimpleRecord("payload_test0", "450")); + expected.add(new SimpleRecord("payload_test1", "451")); + expected.add(new SimpleRecord("payload_test2", "452")); + + // Run the pipeline + PCollection events = + pipeline.apply( + "Read from Solace", + SolaceIO.read( + TypeDescriptor.of(SimpleRecord.class), + input -> + new SimpleRecord( + new String(input.getBytes(), StandardCharsets.UTF_8), + input.getApplicationMessageId()), + input -> Instant.ofEpochMilli(1708100477061L)) + .from(Solace.Queue.fromName("queue")) + .withSempClientFactory(MockSempClientFactory.getDefaultMock()) + .withSessionServiceFactory(fakeSessionServiceFactory) + .withMaxNumConnections(1)); + + // Assert results + PAssert.that(events).containsInAnyOrder(expected); + pipeline.run(); + } + + @Test + public void testNoQueueAndTopicSet() { + Read spec = SolaceIO.read(); + assertThrows(IllegalStateException.class, () -> spec.validate(pipeline.getOptions())); + } + + @Test + public void testSplitsForExclusiveQueue() throws Exception { + MockSempClient mockSempClient = + MockSempClient.builder().setIsQueueNonExclusiveFn((q) -> false).build(); + + Read spec = + SolaceIO.read() + .from(Solace.Queue.fromName("queue")) + .withSempClientFactory(new MockSempClientFactory(mockSempClient)) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); + + int desiredNumSplits = 5; + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + List> splits = + initialSource.split(desiredNumSplits, PipelineOptionsFactory.create()); + assertEquals(1, splits.size()); + } + + @Test + public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Exception { + Read spec = getDefaultRead().withMaxNumConnections(3); + + int desiredNumSplits = 5; + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + List> splits = + initialSource.split(desiredNumSplits, PipelineOptionsFactory.create()); + assertEquals(3, splits.size()); + } + + @Test + public void testSplitsForNonExclusiveQueueWithMaxNumConnectionsRespectDesired() throws Exception { + Read spec = getDefaultRead().withMaxNumConnections(10); + int desiredNumSplits = 5; + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + List> splits = + initialSource.split(desiredNumSplits, PipelineOptionsFactory.create()); + assertEquals(5, splits.size()); + } + + @Test + public void testCreateQueueForTopic() { + AtomicInteger createQueueForTopicFnCounter = new AtomicInteger(0); + MockSempClient mockSempClient = + MockSempClient.builder() + .setCreateQueueForTopicFn((q) -> createQueueForTopicFnCounter.incrementAndGet()) + .build(); + + Read spec = + getDefaultReadForTopic().withSempClientFactory(new MockSempClientFactory(mockSempClient)); + spec.expand(PBegin.in(TestPipeline.create())); + // check if createQueueForTopic was executed + assertEquals(1, createQueueForTopicFnCounter.get()); + } + + @Test + public void testCheckpointMark() throws Exception { + AtomicInteger countConsumedMessages = new AtomicInteger(0); + AtomicInteger countAckMessages = new AtomicInteger(0); + + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }, + 10); + + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + Read spec = getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory); + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + UnboundedReader reader = + initialSource.createReader(PipelineOptionsFactory.create(), null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(String.format("Failed at %d-th message", i), reader.advance()); + } + + // check if 4 messages were consumed + assertEquals(4, countConsumedMessages.get()); + + // check if no messages were acknowledged yet + assertEquals(0, countAckMessages.get()); + + // finalize the checkpoint + reader.getCheckpointMark().finalizeCheckpoint(); + + // check if messages were acknowledged + assertEquals(4, countAckMessages.get()); + } + + @Test + public void testCheckpointMarkAndFinalizeSeparately() throws Exception { + AtomicInteger countConsumedMessages = new AtomicInteger(0); + AtomicInteger countAckMessages = new AtomicInteger(0); + + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }, + 10); + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + Read spec = + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withMaxNumConnections(4); + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + + UnboundedReader reader = + initialSource.createReader(PipelineOptionsFactory.create(), null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(String.format("Failed at %d-th message", i), reader.advance()); + } + + // create checkpoint but don't finalize yet + CheckpointMark checkpointMark = reader.getCheckpointMark(); + + // consume 2 more messages + reader.advance(); + reader.advance(); + + // check if messages are still not acknowledged + assertEquals(0, countAckMessages.get()); + + // acknowledge from the first checkpoint + checkpointMark.finalizeCheckpoint(); + + // only messages from the first checkpoint are acknowledged + assertEquals(4, countAckMessages.get()); + } + + @Test + public void testCheckpointMarkSafety() throws Exception { + + final int messagesToProcess = 100; + + AtomicInteger countConsumedMessages = new AtomicInteger(0); + AtomicInteger countAckMessages = new AtomicInteger(0); + + // Broker that creates input data + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < messagesToProcess; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }, + 10); + + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + Read spec = + getDefaultRead() + .withSessionServiceFactory(fakeSessionServiceFactory) + .withMaxNumConnections(4); + + UnboundedSolaceSource initialSource = getSource(spec, pipeline); + + UnboundedReader reader = + initialSource.createReader(PipelineOptionsFactory.create(), null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume half the messages (NB: start already consumed the first message) + for (int i = 0; i < (messagesToProcess / 2) - 1; i++) { + assertTrue(reader.advance()); + } + + // the messages are still pending in the queue (no ACK yet) + assertEquals(0, countAckMessages.get()); + + // we finalize the checkpoint for the already-processed messages while simultaneously + // consuming the remainder of messages from the queue + Thread runner = + new Thread( + () -> { + try { + for (int i = 0; i < messagesToProcess / 2; i++) { + assertTrue(reader.advance()); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + runner.start(); + reader.getCheckpointMark().finalizeCheckpoint(); + + // Concurrency issues would cause an exception to be thrown before this method exits, + // failing the test + runner.join(); + } + + @Test + public void testDefaultCoder() { + Coder coder = + new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null) + .getCheckpointMarkCoder(); + CoderProperties.coderSerializable(coder); + } + + @Test + public void testDestinationTopicQueueCreation() { + String topicName = "some-topic"; + String queueName = "some-queue"; + Topic topic = SolaceIO.topicFromName(topicName); + Queue queue = SolaceIO.queueFromName(queueName); + + Destination dest = topic; + assertTrue(dest instanceof Topic); + assertFalse(dest instanceof Queue); + assertEquals(topicName, dest.getName()); + + dest = queue; + assertTrue(dest instanceof Queue); + assertFalse(dest instanceof Topic); + assertEquals(queueName, dest.getName()); + + Record r = SolaceDataUtils.getSolaceRecord("payload_test0", "450"); + dest = SolaceIO.convertToJcsmpDestination(r.getDestination()); + assertTrue(dest instanceof Topic); + assertFalse(dest instanceof Queue); + } + + @Test + public void testTopicEncoding() { + MockSessionService mockClientService = + new MockSessionService( + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }, + 3); + + SessionServiceFactory fakeSessionServiceFactory = + new MockSessionServiceFactory(mockClientService); + + // Run + PCollection events = + pipeline.apply( + "Read from Solace", + getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory)); + + // Run the pipeline + PCollection destAreTopics = + events.apply( + MapElements.into(TypeDescriptors.booleans()) + .via( + r -> { + Destination dest = SolaceIO.convertToJcsmpDestination(r.getDestination()); + return dest instanceof Topic; + })); + + List expected = ImmutableList.of(true, true, true); + + // Assert results + PAssert.that(destAreTopics).containsInAnyOrder(expected); + pipeline.run(); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java new file mode 100644 index 000000000000..2e953150c6d3 --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java @@ -0,0 +1,708 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.data; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.MessageType; +import com.solacesystems.jcsmp.ReplicationGroupMessageId; +import com.solacesystems.jcsmp.SDTMap; +import com.solacesystems.jcsmp.User_Cos; +import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl; +import java.io.IOException; +import java.io.InputStream; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.schemas.JavaBeanSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class SolaceDataUtils { + public static final ReplicationGroupMessageId DEFAULT_REPLICATION_GROUP_ID = + new ReplicationGroupMessageIdImpl(1L, 136L); + + @DefaultSchema(JavaBeanSchema.class) + public static class SimpleRecord { + public String payload; + public String messageId; + + public SimpleRecord() {} + + public SimpleRecord(String payload, String messageId) { + this.payload = payload; + this.messageId = messageId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SimpleRecord)) { + return false; + } + SimpleRecord that = (SimpleRecord) o; + return Objects.equals(payload, that.payload) && Objects.equals(messageId, that.messageId); + } + + @Override + public int hashCode() { + return Objects.hash(payload, messageId); + } + + @Override + public String toString() { + return "SimpleRecord{" + + "payload='" + + payload + + '\'' + + ", messageId='" + + messageId + + '\'' + + '}'; + } + } + + public static Solace.Record getSolaceRecord(String payload, String messageId) { + return getSolaceRecord(payload, messageId, null); + } + + public static Solace.Record getSolaceRecord( + String payload, + String messageId, + @Nullable ReplicationGroupMessageId replicationGroupMessageId) { + String replicationGroupMessageIdString = + replicationGroupMessageId != null + ? replicationGroupMessageId.toString() + : DEFAULT_REPLICATION_GROUP_ID.toString(); + + return Solace.Record.builder() + .setPayload(ByteString.copyFrom(payload, StandardCharsets.UTF_8)) + .setMessageId(messageId) + .setDestination( + Solace.Destination.builder() + .setName("destination-topic") + .setType(Solace.DestinationType.TOPIC) + .build()) + .setExpiration(1000L) + .setPriority(0) + .setReceiveTimestamp(1708100477067L) + .setRedelivered(false) + .setReplyTo(null) + .setSequenceNumber(null) + .setTimeToLive(1000L) + .setSenderTimestamp(null) + .setReplicationGroupMessageId(replicationGroupMessageIdString) + .setAttachmentBytes(ByteString.EMPTY) + .build(); + } + + public static BytesXMLMessage getBytesXmlMessage(String payload, String messageId) { + return getBytesXmlMessage(payload, messageId, null, null); + } + + public static BytesXMLMessage getBytesXmlMessage( + String payload, String messageId, SerializableFunction ackMessageFn) { + return getBytesXmlMessage(payload, messageId, ackMessageFn, null); + } + + public static BytesXMLMessage getBytesXmlMessage( + String payload, + String messageId, + SerializableFunction ackMessageFn, + ReplicationGroupMessageId replicationGroupMessageId) { + long receiverTimestamp = 1708100477067L; + long expiration = 1000L; + long timeToLive = 1000L; + String destination = "destination-topic"; + + ReplicationGroupMessageId useReplicationGroupId = + replicationGroupMessageId != null + ? replicationGroupMessageId + : DEFAULT_REPLICATION_GROUP_ID; + return new BytesXMLMessage() { + + @Override + public byte[] getBytes() { + return payload.getBytes(StandardCharsets.UTF_8); + } + + @Override + public int getContentLength() { + return payload.getBytes(StandardCharsets.UTF_8).length; + } + + @Override + public int readBytes(byte[] arg0) { + return 0; + } + + @Override + public int readBytes(byte[] arg0, int arg1) { + return 0; + } + + @Override + public void rewindContent() {} + + @Override + public void writeBytes(byte[] arg0) {} + + @Override + public void writeBytes(byte[] arg0, int arg1, int arg2) {} + + @Override + public void ackMessage() { + if (ackMessageFn != null) { + ackMessageFn.apply(0); + } + } + + @Override + public void clearAttachment() {} + + @Override + public void clearBinaryMetadataBytes(int arg0) {} + + @Override + public void clearContent() {} + + @Override + public void clearQueueNameLocation() {} + + @Override + public void clearTopicNameLocation() {} + + @Override + public String dump() { + return null; + } + + @Override + public String dump(int arg0) { + return null; + } + + @Override + public long getAckMessageId() { + return 0; + } + + @Override + public String getAppMessageID() { + return null; + } + + @Override + public String getAppMessageType() { + return null; + } + + @Override + public String getApplicationMessageId() { + return messageId; + } + + @Override + public String getApplicationMessageType() { + return null; + } + + @Override + public ByteBuffer getAttachmentByteBuffer() { + return null; + } + + @Override + public int getAttachmentContentLength() { + return 0; + } + + @Override + public int getBinaryMetadataContentLength(int arg0) { + return 0; + } + + @Override + public Collection getBinaryMetadataTypes() { + return null; + } + + @Override + public Long getCacheRequestId() { + return null; + } + + @Override + public List getConsumerIdList() { + return null; + } + + @Override + public String getCorrelationId() { + return null; + } + + @Override + public Object getCorrelationKey() { + return null; + } + + @Override + public User_Cos getCos() { + return null; + } + + @Override + public boolean getDeliverToOne() { + return false; + } + + @Override + public int getDeliveryCount() throws UnsupportedOperationException { + return 0; + } + + @Override + public DeliveryMode getDeliveryMode() { + return null; + } + + @Override + public Destination getDestination() { + return JCSMPFactory.onlyInstance().createTopic(destination); + } + + @Override + public String getDestinationTopicSuffix() { + return null; + } + + @Override + public boolean getDiscardIndication() { + return false; + } + + @Override + public long getExpiration() { + return expiration; + } + + @Override + public String getHTTPContentEncoding() { + return null; + } + + @Override + public String getHTTPContentType() { + return null; + } + + @Override + public String getMessageId() { + return null; + } + + @Override + public long getMessageIdLong() { + return 0; + } + + @Override + public MessageType getMessageType() { + return null; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public SDTMap getProperties() { + return null; + } + + @Override + public int getQueueNameLength() { + return 0; + } + + @Override + public int getQueueNameOffset() { + return 0; + } + + @Override + public long getReceiveTimestamp() { + return receiverTimestamp; + } + + @Override + public boolean getRedelivered() { + return false; + } + + @Override + public ReplicationGroupMessageId getReplicationGroupMessageId() { + // this is always set by Solace + return useReplicationGroupId; + } + + @Override + public Destination getReplyTo() { + return null; + } + + @Override + public String getReplyToSuffix() { + return null; + } + + @Override + public Long getSendTimestamp() { + return null; + } + + @Override + public String getSenderID() { + return null; + } + + @Override + public String getSenderId() { + return null; + } + + @Override + public Long getSenderTimestamp() { + return null; + } + + @Override + public Long getSequenceNumber() { + return null; + } + + @Override + public byte getStructuredMsgType() { + return 0x2; + } + + @Override + public boolean getTQDiscardIndication() { + return false; + } + + @Override + public long getTimeToLive() { + return timeToLive; + } + + @Override + public int getTopicNameLength() { + return 5; + } + + @Override + public int getTopicNameOffset() { + return 0; + } + + @Override + public Long getTopicSequenceNumber() { + return null; + } + + @Override + public byte[] getUserData() { + return null; + } + + @Override + public boolean hasAttachment() { + return false; + } + + @Override + public boolean hasBinaryMetadata(int arg0) { + return false; + } + + @Override + public boolean hasContent() { + return false; + } + + @Override + public boolean hasUserData() { + return false; + } + + @Override + public boolean isAckImmediately() { + return false; + } + + @Override + public boolean isCacheMessage() { + return false; + } + + @Override + public boolean isDMQEligible() { + return false; + } + + @Override + public boolean isDeliveryCountSupported() { + return false; + } + + @Override + public boolean isElidingEligible() { + return false; + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public boolean isReplyMessage() { + return false; + } + + @Override + public boolean isStructuredMsg() { + return false; + } + + @Override + public boolean isSuspect() { + return false; + } + + @Override + public int readAttachmentBytes(byte[] arg0) { + return 0; + } + + @Override + public int readAttachmentBytes(byte[] arg0, int arg1) { + return 0; + } + + @Override + public int readAttachmentBytes(int arg0, byte[] arg1, int arg2, int arg3) { + return 0; + } + + @Override + public int readBinaryMetadataBytes(int arg0, byte[] arg1) { + return 0; + } + + @Override + public int readContentBytes(byte[] arg0) { + return 0; + } + + @Override + public int readContentBytes(byte[] arg0, int arg1) { + return 0; + } + + @Override + public int readContentBytes(int arg0, byte[] arg1, int arg2, int arg3) { + return 0; + } + + @Override + public void rejectMessage() {} + + @Override + public void reset() {} + + @Override + public void resetPayload() {} + + @Override + public void rewindAttachment() {} + + @Override + public void setAckImmediately(boolean arg0) {} + + @Override + public void setAppMessageID(String arg0) {} + + @Override + public void setAppMessageType(String arg0) {} + + @Override + public void setApplicationMessageId(String arg0) {} + + @Override + public void setApplicationMessageType(String arg0) {} + + @Override + public void setAsReplyMessage(boolean arg0) {} + + @Override + public void setCorrelationId(String arg0) {} + + @Override + public void setCorrelationKey(Object arg0) {} + + @Override + public void setCos(User_Cos arg0) {} + + @Override + public void setDMQEligible(boolean arg0) {} + + @Override + public void setDeliverToOne(boolean arg0) {} + + @Override + public void setDeliveryMode(DeliveryMode arg0) {} + + @Override + public void setElidingEligible(boolean arg0) {} + + @Override + public void setExpiration(long arg0) {} + + @Override + public void setHTTPContentEncoding(String arg0) {} + + @Override + public void setHTTPContentType(String arg0) {} + + @Override + public void setMessageType(MessageType arg0) {} + + @Override + public void setPriority(int arg0) {} + + @Override + public void setProperties(SDTMap arg0) {} + + @Override + public void setQueueNameLocation(int arg0, int arg1) {} + + @Override + public void setReadOnly() {} + + @Override + public void setReplyTo(Destination arg0) {} + + @Override + public void setReplyToSuffix(String arg0) {} + + @Override + public void setSendTimestamp(long arg0) {} + + @Override + public void setSenderID(String arg0) {} + + @Override + public void setSenderId(String arg0) {} + + @Override + public void setSenderTimestamp(long arg0) {} + + @Override + public void setSequenceNumber(long arg0) {} + + @Override + public void setStructuredMsg(boolean arg0) {} + + @Override + public void setStructuredMsgType(byte arg0) {} + + @Override + public void setTimeToLive(long arg0) {} + + @Override + public void setTopicNameLocation(int arg0, int arg1) {} + + @Override + public void setUserData(byte[] arg0) {} + + @Override + public void settle(Outcome arg0) throws JCSMPException {} + + @Override + public int writeAttachment(byte[] arg0) { + return 0; + } + + @Override + public int writeAttachment(InputStream arg0) throws IOException { + return 0; + } + + @Override + public int writeAttachment(byte[] arg0, int arg1, int arg2) throws BufferUnderflowException { + return 0; + } + + @Override + public int writeBinaryMetadataBytes(int arg0, byte[] arg1) { + return 0; + } + + @Override + public int writeBinaryMetadataBytes(int arg0, byte[] arg1, int arg2, int arg3) + throws BufferUnderflowException { + return 0; + } + + @Override + public int writeNewAttachment(byte[] arg0) { + return 0; + } + + @Override + public int writeNewAttachment(InputStream arg0) throws IOException { + return 0; + } + + @Override + public int writeNewAttachment(byte[] arg0, int arg1, int arg2) + throws BufferUnderflowException { + return 0; + } + + @Override + public int writeNewAttachment(InputStream arg0, int arg1, int arg2) throws IOException { + return 0; + } + }; + } +}