diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java index 10336d00d8b5..133c1c094d13 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; @@ -36,7 +35,6 @@ @Internal @DefaultCoder(AvroCoder.class) public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { - private transient AtomicBoolean activeReader; // BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry // these messages here. We relay on Solace's retry mechanism. private transient ArrayDeque ackQueue; @@ -44,18 +42,17 @@ public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction private SolaceCheckpointMark() {} - public SolaceCheckpointMark(AtomicBoolean activeReader, List ackQueue) { - this.activeReader = activeReader; + public SolaceCheckpointMark(List ackQueue) { this.ackQueue = new ArrayDeque<>(ackQueue); } @Override public void finalizeCheckpoint() { - if (activeReader == null || !activeReader.get() || ackQueue == null) { + if (ackQueue == null) { return; } - while (ackQueue.size() > 0) { + while (!ackQueue.isEmpty()) { BytesXMLMessage msg = ackQueue.poll(); if (msg != null) { msg.ackMessage(); @@ -79,11 +76,11 @@ public boolean equals(@Nullable Object o) { // content. ArrayList ackList = new ArrayList<>(ackQueue); ArrayList thatAckList = new ArrayList<>(that.ackQueue); - return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList); + return Objects.equals(ackList, thatAckList); } @Override public int hashCode() { - return Objects.hash(activeReader, ackQueue); + return Objects.hash(ackQueue); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index 0155345a2323..c57b10222fd5 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; @@ -51,7 +50,6 @@ class UnboundedSolaceReader extends UnboundedReader { private @Nullable T solaceMappedRecord; private @Nullable MessageReceiver messageReceiver; private @Nullable SessionService sessionService; - AtomicBoolean active = new AtomicBoolean(true); /** * Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent @@ -116,7 +114,6 @@ public boolean advance() { @Override public void close() { - active.set(false); checkNotNull(sessionService).close(); } @@ -138,7 +135,7 @@ public UnboundedSource.CheckpointMark getCheckpointMark() { ackQueue.add(msg); } } - return new SolaceCheckpointMark(active, ackQueue); + return new SolaceCheckpointMark(ackQueue); } @Override