Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Jul 20, 2023
1 parent 72c8eb4 commit ce17aac
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ public void init() throws Exception {
}

@NonNull @Override
public ProgressState call() throws Exception {
return progress.toState();
}
public abstract ProgressState call() throws Exception;

public TaskLocation getTaskLocation() {
return this.taskLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {

private final Meter sourceReceivedQPS;

private volatile long rowCountThisPollNext;
private volatile boolean emptyThisPollNext;

public SeaTunnelSourceCollector(
Object checkpointLock,
Expand All @@ -56,7 +56,7 @@ public SeaTunnelSourceCollector(
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
rowCountThisPollNext++;
emptyThisPollNext = false;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
} catch (IOException e) {
Expand All @@ -69,12 +69,12 @@ public Object getCheckpointLock() {
return checkpointLock;
}

public long getRowCountThisPollNext() {
return this.rowCountThisPollNext;
public boolean isEmptyThisPollNext() {
return emptyThisPollNext;
}

public void resetRowCountThisPollNext() {
this.rowCountThisPollNext = 0;
public void resetEmptyThisPollNext() {
this.emptyThisPollNext = true;
}

public void sendRecordToNext(Record<?> record) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT>

private static final long serialVersionUID = 5906594537520393503L;

private SeaTunnelTaskState currState;
private volatile SeaTunnelTaskState currState;
private final SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink;
private final int maxWriterSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,13 @@ public void close() throws IOException {
public void collect() throws Exception {
if (!prepareClose) {
reader.pollNext(collector);
if (collector.getRowCountThisPollNext() == 0) {
if (collector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
collector.resetRowCountThisPollNext();
collector.resetEmptyThisPollNext();
}
} else {
Thread.sleep(100);
}
}

Expand Down

0 comments on commit ce17aac

Please sign in to comment.