Skip to content

Commit

Permalink
Remove CheckpointMark's reference to the UnboundedSolaceReader - unne…
Browse files Browse the repository at this point in the history
…cessary.
  • Loading branch information
bzablocki committed Jun 10, 2024
1 parent 5015b32 commit 2e1c10e
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
Expand All @@ -36,26 +35,24 @@
@Internal
@DefaultCoder(AvroCoder.class)
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
private transient ArrayDeque<BytesXMLMessage> ackQueue;

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private SolaceCheckpointMark() {}

public SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage> ackQueue) {
this.activeReader = activeReader;
public SolaceCheckpointMark(List<BytesXMLMessage> ackQueue) {
this.ackQueue = new ArrayDeque<>(ackQueue);
}

@Override
public void finalizeCheckpoint() {
if (activeReader == null || !activeReader.get() || ackQueue == null) {
if (ackQueue == null) {
return;
}

while (ackQueue.size() > 0) {
while (!ackQueue.isEmpty()) {
BytesXMLMessage msg = ackQueue.poll();
if (msg != null) {
msg.ackMessage();
Expand All @@ -79,11 +76,11 @@ public boolean equals(@Nullable Object o) {
// content.
ArrayList<BytesXMLMessage> ackList = new ArrayList<>(ackQueue);
ArrayList<BytesXMLMessage> thatAckList = new ArrayList<>(that.ackQueue);
return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList);
return Objects.equals(ackList, thatAckList);
}

@Override
public int hashCode() {
return Objects.hash(activeReader, ackQueue);
return Objects.hash(ackQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
Expand All @@ -51,7 +50,6 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
private @Nullable T solaceMappedRecord;
private @Nullable MessageReceiver messageReceiver;
private @Nullable SessionService sessionService;
AtomicBoolean active = new AtomicBoolean(true);

/**
* Queue to place advanced messages before {@link #getCheckpointMark()} be called non-concurrent
Expand Down Expand Up @@ -116,7 +114,6 @@ public boolean advance() {

@Override
public void close() {
active.set(false);
checkNotNull(sessionService).close();
}

Expand All @@ -138,7 +135,7 @@ public UnboundedSource.CheckpointMark getCheckpointMark() {
ackQueue.add(msg);
}
}
return new SolaceCheckpointMark(active, ackQueue);
return new SolaceCheckpointMark(ackQueue);
}

@Override
Expand Down

0 comments on commit 2e1c10e

Please sign in to comment.