Skip to content

Commit

Permalink
Solace Read connector (#31476)
Browse files Browse the repository at this point in the history
* wip solace connector

* wip solace connector

* some checker errors resolved

* all checker errors resolved

* improving unit tests

* respond to pr commments

* Documentation

* Small refactor - move data classes out of the client

* refactor

* Add github action for integration test of Solace

* testing github workflow

* bump testcontainers to 1.19.7 - soalce semp was updated with an admin user access

* Use FlowHandle to acknowledge messages to make SolaceCheckpointMark's fields serializable.

* Handle StaleSessionException error

* Add @internal annotation to mark the SolaceIO API beta and subject to change.

* Improve documentation

* Back to ack based on bytesxmlmessages. Deduplicate default to false.

* update changes.md with Solace read connector

* remove ack by id code

* remove todo comment

* Add licenses to package-info.java files

* Restructure documentation

* update aws test after upgrading testcontainers version.

* Disable publishing docs until the first pass on the master branch

* Remove files from this branch to split PR into smaller chunks

* refactor tests for readability

* revert upgrade of testcontainers - not needed in this PR chunk

* revert upgrade of testcontainers - not needed in this PR chunk

* spotless

* remove IT tests from this pr

* Tech Writer review

* Add a field to Solace.Record mapped from BytesXMLMessage.getAttachmentByteBuffer()

* Add and fix some documentation

* Remove CheckpointMark's reference to the UnboundedSolaceReader - unnecessary.

* Revert "Remove CheckpointMark's reference to the UnboundedSolaceReader - unnecessary."

This reverts commit 2e1c10e.

* Solace project init - github workflow file, gradle module

* Splitting the #31476 - Leaving only PTransform AutoValue configurations

* remove unnecessary dependencies

* remove info from CHANGES.md

* Add watermark-related code

* Remove excessive @nullable annotations on Solace.Record class

* Remove entry from CHANGES.md

* Make Record builder package-private

* Refactor SolaceIO - the constructor of Read takes a configuration builder as an argument

* Change payload and attachments type to immutable ByteString

* Downgrade Record builders access modifiers to package private

* Add documentation

* Add documentation to classes and methods in Solace.java

* typo

* Add SolaceCheckpointMark.java

* Make SolaceCheckpointMark visible for testing

* Remove SolaceRecordCoder and take advantage of @DefaultSchema
  • Loading branch information
bzablocki committed Jun 21, 2024
1 parent 14fd366 commit 18af8c8
Show file tree
Hide file tree
Showing 11 changed files with 2,008 additions and 40 deletions.
8 changes: 8 additions & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.solace
implementation library.java.vendored_guava_32_1_2_jre
implementation project(":sdks:java:extensions:avro")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.avro
permitUnusedDeclared library.java.avro
implementation library.java.vendored_grpc_1_60_1

testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,36 @@
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.Queue;
import java.io.Serializable;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
* queue property and mandates the implementation of a create() method in concrete subclasses.
*/
public abstract class SessionServiceFactory implements Serializable {

/**
* A reference to a Queue object. This is set when the pipline is constructed (in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method).
* This could be used to associate the created SessionService with a specific queue for message
* handling.
*/
@Nullable Queue queue;

/**
* This is the core method that subclasses must implement. It defines how to construct and return
* a SessionService object.
*/
public abstract SessionService create();

/**
* This method is called in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
* to set the Queue reference.
*/
public void setQueue(Queue queue) {
this.queue = queue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,6 +76,7 @@ public enum DestinationType {

/** Represents a Solace message destination (either a Topic or a Queue). */
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class Destination {
/**
* Gets the name of the destination.
Expand Down Expand Up @@ -105,6 +108,7 @@ abstract static class Builder {

/** Represents a Solace message record with its associated metadata. */
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class Record {
/**
* Gets the unique identifier of the message, a string for an application-specific message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace;

import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SessionService;

public class MockEmptySessionService implements SessionService {

String exceptionMessage = "This is an empty client, use a MockSessionService instead.";

@Override
public void close() {
throw new UnsupportedOperationException(exceptionMessage);
}

@Override
public boolean isClosed() {
throw new UnsupportedOperationException(exceptionMessage);
}

@Override
public MessageReceiver createReceiver() {
throw new UnsupportedOperationException(exceptionMessage);
}

@Override
public void connect() {
throw new UnsupportedOperationException(exceptionMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace;

import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.transforms.SerializableFunction;

public class MockSempClient implements SempClient {

private final SerializableFunction<String, Boolean> isQueueNonExclusiveFn;
private final SerializableFunction<String, Long> getBacklogBytesFn;
private final SerializableFunction<String, Integer> createQueueForTopicFn;

private MockSempClient(
SerializableFunction<String, Boolean> isQueueNonExclusiveFn,
SerializableFunction<String, Long> getBacklogBytesFn,
SerializableFunction<String, Integer> createQueueForTopicFn) {
this.isQueueNonExclusiveFn = isQueueNonExclusiveFn;
this.getBacklogBytesFn = getBacklogBytesFn;
this.createQueueForTopicFn = createQueueForTopicFn;
}

public static Builder builder() {
return new Builder();
}

public static class Builder {
private SerializableFunction<String, Boolean> isQueueNonExclusiveFn = (queueName) -> true;
private SerializableFunction<String, Long> getBacklogBytesFn = (queueName) -> 0L;
private SerializableFunction<String, Integer> createQueueForTopicFn = (queueName) -> 0;

public Builder setIsQueueNonExclusiveFn(
SerializableFunction<String, Boolean> isQueueNonExclusiveFn) {
this.isQueueNonExclusiveFn = isQueueNonExclusiveFn;
return this;
}

public Builder setGetBacklogBytesFn(SerializableFunction<String, Long> getBacklogBytesFn) {
this.getBacklogBytesFn = getBacklogBytesFn;
return this;
}

public Builder setCreateQueueForTopicFn(
SerializableFunction<String, Integer> createQueueForTopicFn) {
this.createQueueForTopicFn = createQueueForTopicFn;
return this;
}

public MockSempClient build() {
return new MockSempClient(isQueueNonExclusiveFn, getBacklogBytesFn, createQueueForTopicFn);
}
}

@Override
public boolean isQueueNonExclusive(String queueName) throws IOException {
return isQueueNonExclusiveFn.apply(queueName);
}

@Override
public Queue createQueueForTopic(String queueName, String topicName) throws IOException {
createQueueForTopicFn.apply(queueName);
return JCSMPFactory.onlyInstance().createQueue(queueName);
}

@Override
public long getBacklogBytes(String queueName) throws IOException {
return getBacklogBytesFn.apply(queueName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace;

import org.apache.beam.sdk.io.solace.broker.SempClient;
import org.apache.beam.sdk.io.solace.broker.SempClientFactory;

public class MockSempClientFactory implements SempClientFactory {
SempClient sempClient;

public MockSempClientFactory(SempClient sempClient) {
this.sempClient = sempClient;
}

public static SempClientFactory getDefaultMock() {
return new MockSempClientFactory(MockSempClient.builder().build());
}

@Override
public SempClient create() {
return sempClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.transforms.SerializableFunction;

public class MockSessionService implements SessionService {

private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
private MessageReceiver messageReceiver = null;
private final int minMessagesReceived;

public MockSessionService(
SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int minMessagesReceived) {
this.getRecordFn = getRecordFn;
this.minMessagesReceived = minMessagesReceived;
}

@Override
public void close() {}

@Override
public boolean isClosed() {
return false;
}

@Override
public MessageReceiver createReceiver() {
if (messageReceiver == null) {
messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived);
}
return messageReceiver;
}

@Override
public void connect() {}

public static class MockReceiver implements MessageReceiver, Serializable {
private final AtomicInteger counter = new AtomicInteger();
private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn;
private final int minMessagesReceived;

public MockReceiver(
SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int minMessagesReceived) {
this.getRecordFn = getRecordFn;
this.minMessagesReceived = minMessagesReceived;
}

@Override
public void start() {}

@Override
public boolean isClosed() {
return false;
}

@Override
public BytesXMLMessage receive() throws IOException {
return getRecordFn.apply(counter.getAndIncrement());
}

@Override
public boolean isEOF() {
return counter.get() >= minMessagesReceived;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace;

import org.apache.beam.sdk.io.solace.broker.SessionService;
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory;

public class MockSessionServiceFactory extends SessionServiceFactory {
SessionService sessionService;

public MockSessionServiceFactory(SessionService clientService) {
this.sessionService = clientService;
}

public static SessionServiceFactory getDefaultMock() {
return new MockSessionServiceFactory(new MockEmptySessionService());
}

@Override
public SessionService create() {
return sessionService;
}
}
Loading

0 comments on commit 18af8c8

Please sign in to comment.