From 8eb09bfd6d7b14b1a69dfa9517b565d6f3e7926b Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Thu, 25 Jul 2024 20:35:46 +0200 Subject: [PATCH] Explicitly close the FlowReceiver (#31982) --- .../broker/BasicAuthJcsmpSessionService.java | 17 +++++++++++------ .../sdk/io/solace/broker/MessageReceiver.java | 3 +++ .../io/solace/broker/SolaceMessageReceiver.java | 7 +++++++ .../beam/sdk/io/solace/MockSessionService.java | 3 +++ 4 files changed, 24 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index df814b5c2be1..2137d574b09a 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -46,6 +46,7 @@ public class BasicAuthJcsmpSessionService extends SessionService { private final String password; private final String vpnName; @Nullable private JCSMPSession jcsmpSession; + @Nullable private MessageReceiver messageReceiver; private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); /** @@ -73,12 +74,14 @@ public void connect() { @Override public void close() { - if (isClosed()) { - return; - } retryCallableManager.retryCallable( () -> { - checkStateNotNull(jcsmpSession).closeSession(); + if (messageReceiver != null) { + messageReceiver.close(); + } + if (!isClosed()) { + checkStateNotNull(jcsmpSession).closeSession(); + } return 0; }, ImmutableSet.of(IOException.class)); @@ -86,8 +89,10 @@ public void close() { @Override public MessageReceiver createReceiver() { - return retryCallableManager.retryCallable( - this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + this.messageReceiver = + retryCallableManager.retryCallable( + this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + return this.messageReceiver; } @Override diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java index 199a83e322bd..95f989bd1be9 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -49,6 +49,9 @@ public interface MessageReceiver { */ BytesXMLMessage receive() throws IOException; + /** Closes the message receiver. */ + void close(); + /** * Test clients may return {@literal true} to signal that all expected messages have been pulled * and the test may complete. Real clients should always return {@literal false}. diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java index e5f129d3ddfc..d548d2049a5b 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java @@ -69,4 +69,11 @@ public BytesXMLMessage receive() throws IOException { throw new IOException(e); } } + + @Override + public void close() { + if (!isClosed()) { + this.flowReceiver.close(); + } + } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index 207cfef9c62c..a4d6a42ef302 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -92,6 +92,9 @@ public BytesXMLMessage receive() throws IOException { return getRecordFn.apply(counter.getAndIncrement()); } + @Override + public void close() {} + @Override public boolean isEOF() { return counter.get() >= minMessagesReceived;