-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Solace Read connector #31476
Solace Read connector #31476
Changes from all commits
f404eec
00274a2
399ffcc
62dfa4a
ea202fb
260f645
9057860
a74b6a6
552cab4
5d847b3
23292a1
a69e10a
f4b0d6c
a1ca8b3
8007a33
cff3105
f3eaabe
34c4170
7b24fdb
be74c86
a85c68c
533f122
2f41380
9a317a8
38b6f0c
cf5d24d
aa29100
94101a8
c625069
1521f40
b815286
08b777c
0f63749
e90e69a
f6833d2
df1eb6c
5015b32
2e1c10e
4d02b99
bee8faf
a5b29ed
09368d6
f2d284a
9af3185
60d854d
3847876
a00b997
19b02be
bcc0a51
0d59032
2787260
b46a8f8
1c09b82
81b7f84
d1ada91
15b394c
2e536f9
daf2cfe
93ffdd1
9888b09
bbade93
4689fed
58ddecc
a39f950
da26ab5
63425d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ | |
import com.solacesystems.jcsmp.BytesXMLMessage; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import org.apache.beam.sdk.schemas.AutoValueSchema; | ||
import org.apache.beam.sdk.schemas.annotations.DefaultSchema; | ||
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
import org.slf4j.Logger; | ||
|
@@ -74,6 +76,7 @@ public enum DestinationType { | |
|
||
/** Represents a Solace message destination (either a Topic or a Queue). */ | ||
@AutoValue | ||
@DefaultSchema(AutoValueSchema.class) | ||
public abstract static class Destination { | ||
/** | ||
* Gets the name of the destination. | ||
|
@@ -105,6 +108,7 @@ abstract static class Builder { | |
|
||
/** Represents a Solace message record with its associated metadata. */ | ||
@AutoValue | ||
@DefaultSchema(AutoValueSchema.class) | ||
public abstract static class Record { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have the following questions about this class.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ad 1) The SchemaFieldNumber annotations are necessary here as we stumbled upon error (#30276) related to non-deterministic schema inference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ad 2, 3 - done. I left only the necessary |
||
/** | ||
* Gets the unique identifier of the message, a string for an application-specific message | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace; | ||
|
||
import org.apache.beam.sdk.io.solace.broker.MessageReceiver; | ||
import org.apache.beam.sdk.io.solace.broker.SessionService; | ||
|
||
public class MockEmptySessionService implements SessionService { | ||
|
||
String exceptionMessage = "This is an empty client, use a MockSessionService instead."; | ||
|
||
@Override | ||
public void close() { | ||
throw new UnsupportedOperationException(exceptionMessage); | ||
} | ||
|
||
@Override | ||
public boolean isClosed() { | ||
throw new UnsupportedOperationException(exceptionMessage); | ||
} | ||
|
||
@Override | ||
public MessageReceiver createReceiver() { | ||
throw new UnsupportedOperationException(exceptionMessage); | ||
} | ||
|
||
@Override | ||
public void connect() { | ||
throw new UnsupportedOperationException(exceptionMessage); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace; | ||
|
||
import com.solacesystems.jcsmp.JCSMPFactory; | ||
import com.solacesystems.jcsmp.Queue; | ||
import java.io.IOException; | ||
import org.apache.beam.sdk.io.solace.broker.SempClient; | ||
import org.apache.beam.sdk.transforms.SerializableFunction; | ||
|
||
public class MockSempClient implements SempClient { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I heard for this IO there a plan to use https://java.testcontainers.org/modules/solace/? I think this is one of several PRs. Feel free resolve if this is the case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the testcontainers will be used for an integration test. A preliminary PR here: #31543 But there are also unit tests, which use the MockClients here. |
||
|
||
private final SerializableFunction<String, Boolean> isQueueNonExclusiveFn; | ||
private final SerializableFunction<String, Long> getBacklogBytesFn; | ||
private final SerializableFunction<String, Integer> createQueueForTopicFn; | ||
|
||
private MockSempClient( | ||
SerializableFunction<String, Boolean> isQueueNonExclusiveFn, | ||
SerializableFunction<String, Long> getBacklogBytesFn, | ||
SerializableFunction<String, Integer> createQueueForTopicFn) { | ||
this.isQueueNonExclusiveFn = isQueueNonExclusiveFn; | ||
this.getBacklogBytesFn = getBacklogBytesFn; | ||
this.createQueueForTopicFn = createQueueForTopicFn; | ||
} | ||
|
||
public static Builder builder() { | ||
return new Builder(); | ||
} | ||
|
||
public static class Builder { | ||
private SerializableFunction<String, Boolean> isQueueNonExclusiveFn = (queueName) -> true; | ||
private SerializableFunction<String, Long> getBacklogBytesFn = (queueName) -> 0L; | ||
private SerializableFunction<String, Integer> createQueueForTopicFn = (queueName) -> 0; | ||
|
||
public Builder setIsQueueNonExclusiveFn( | ||
SerializableFunction<String, Boolean> isQueueNonExclusiveFn) { | ||
this.isQueueNonExclusiveFn = isQueueNonExclusiveFn; | ||
return this; | ||
} | ||
|
||
public Builder setGetBacklogBytesFn(SerializableFunction<String, Long> getBacklogBytesFn) { | ||
this.getBacklogBytesFn = getBacklogBytesFn; | ||
return this; | ||
} | ||
|
||
public Builder setCreateQueueForTopicFn( | ||
SerializableFunction<String, Integer> createQueueForTopicFn) { | ||
this.createQueueForTopicFn = createQueueForTopicFn; | ||
return this; | ||
} | ||
|
||
public MockSempClient build() { | ||
return new MockSempClient(isQueueNonExclusiveFn, getBacklogBytesFn, createQueueForTopicFn); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean isQueueNonExclusive(String queueName) throws IOException { | ||
return isQueueNonExclusiveFn.apply(queueName); | ||
} | ||
|
||
@Override | ||
public Queue createQueueForTopic(String queueName, String topicName) throws IOException { | ||
createQueueForTopicFn.apply(queueName); | ||
return JCSMPFactory.onlyInstance().createQueue(queueName); | ||
} | ||
|
||
@Override | ||
public long getBacklogBytes(String queueName) throws IOException { | ||
return getBacklogBytesFn.apply(queueName); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace; | ||
|
||
import org.apache.beam.sdk.io.solace.broker.SempClient; | ||
import org.apache.beam.sdk.io.solace.broker.SempClientFactory; | ||
|
||
public class MockSempClientFactory implements SempClientFactory { | ||
SempClient sempClient; | ||
|
||
public MockSempClientFactory(SempClient sempClient) { | ||
this.sempClient = sempClient; | ||
} | ||
|
||
public static SempClientFactory getDefaultMock() { | ||
return new MockSempClientFactory(MockSempClient.builder().build()); | ||
} | ||
|
||
@Override | ||
public SempClient create() { | ||
return sempClient; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace; | ||
|
||
import com.solacesystems.jcsmp.BytesXMLMessage; | ||
import java.io.IOException; | ||
import java.io.Serializable; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import org.apache.beam.sdk.io.solace.broker.MessageReceiver; | ||
import org.apache.beam.sdk.io.solace.broker.SessionService; | ||
import org.apache.beam.sdk.transforms.SerializableFunction; | ||
|
||
public class MockSessionService implements SessionService { | ||
|
||
private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn; | ||
private MessageReceiver messageReceiver = null; | ||
private final int minMessagesReceived; | ||
|
||
public MockSessionService( | ||
SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int minMessagesReceived) { | ||
this.getRecordFn = getRecordFn; | ||
this.minMessagesReceived = minMessagesReceived; | ||
} | ||
|
||
@Override | ||
public void close() {} | ||
|
||
@Override | ||
public boolean isClosed() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public MessageReceiver createReceiver() { | ||
if (messageReceiver == null) { | ||
messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived); | ||
} | ||
return messageReceiver; | ||
} | ||
|
||
@Override | ||
public void connect() {} | ||
|
||
public static class MockReceiver implements MessageReceiver, Serializable { | ||
private final AtomicInteger counter = new AtomicInteger(); | ||
private final SerializableFunction<Integer, BytesXMLMessage> getRecordFn; | ||
private final int minMessagesReceived; | ||
|
||
public MockReceiver( | ||
SerializableFunction<Integer, BytesXMLMessage> getRecordFn, int minMessagesReceived) { | ||
this.getRecordFn = getRecordFn; | ||
this.minMessagesReceived = minMessagesReceived; | ||
} | ||
|
||
@Override | ||
public void start() {} | ||
|
||
@Override | ||
public boolean isClosed() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public BytesXMLMessage receive() throws IOException { | ||
return getRecordFn.apply(counter.getAndIncrement()); | ||
} | ||
|
||
@Override | ||
public boolean isEOF() { | ||
return counter.get() >= minMessagesReceived; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.beam.sdk.io.solace; | ||
|
||
import org.apache.beam.sdk.io.solace.broker.SessionService; | ||
import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; | ||
|
||
public class MockSessionServiceFactory extends SessionServiceFactory { | ||
SessionService sessionService; | ||
|
||
public MockSessionServiceFactory(SessionService clientService) { | ||
this.sessionService = clientService; | ||
} | ||
|
||
public static SessionServiceFactory getDefaultMock() { | ||
return new MockSessionServiceFactory(new MockEmptySessionService()); | ||
} | ||
|
||
@Override | ||
public SessionService create() { | ||
return sessionService; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm watching whether Nullable is needed but not PR blocking at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this has to be marked as @nullable. The SessionServiceFactory will be initialized when the pipeline is constructed, as one of the arguments to the SolaceIO. But the
queue
can only be set later, in the expand method.Consider the scenario when a user creates a pipeline and specifies to read from a Topic - in this case we need to create a Queue (which is like a Subscription in PubSub) that will read from this Topic
only then we can store the queue reference in the SessionServiceFactory
I was trying to avoid it when designing it, but I couldn't find a better way around it.