diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java index 9f02d7a9495..7054c8ac72d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java @@ -35,8 +35,6 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; @SuppressWarnings("MagicNumber") @Slf4j @@ -113,43 +111,6 @@ public void close() throws IOException { } } - public CompletableFuture registryScheduleFlushTask( - ScheduledExecutorService scheduledExecutorService) { - // todo Register when the job started, Unload at the end(pause/cancel/crash) of the job - CompletableFuture completedFuture = new CompletableFuture(); - Runnable scheduleFlushTask = - new Runnable() { - @Override - public void run() { - if (!prepareClose - && shuffleBufferSize > 0 - && System.currentTimeMillis() - lastModify - > shuffleBatchFlushInterval) { - - try { - shuffleFlush(); - } catch (Exception e) { - log.error("Execute schedule task error.", e); - } - } - - // submit next task - if (!prepareClose) { - Runnable nextScheduleFlushTask = this; - scheduledExecutorService.schedule( - nextScheduleFlushTask, - shuffleBatchFlushInterval, - TimeUnit.MILLISECONDS); - } else { - completedFuture.complete(true); - } - } - }; - scheduledExecutorService.schedule( - scheduleFlushTask, shuffleBatchFlushInterval, TimeUnit.MILLISECONDS); - return completedFuture; - } - private synchronized void shuffleItem(Record record) { String shuffleKey = shuffleStrategy.createShuffleKey(record, pipelineId, taskIndex); shuffleBuffer.computeIfAbsent(shuffleKey, key -> new LinkedList<>()).add(record); @@ -160,8 +121,6 @@ private synchronized void shuffleItem(Record record) { && System.currentTimeMillis() - lastModify > shuffleBatchFlushInterval)) { shuffleFlush(); } - - lastModify = System.currentTimeMillis(); } private synchronized void shuffleFlush() { @@ -185,5 +144,6 @@ private synchronized void shuffleFlush() { shuffleQueueBatch.clear(); } shuffleBufferSize = 0; + lastModify = System.currentTimeMillis(); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java index 6c3559a975e..b32ba1c5243 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java @@ -43,7 +43,7 @@ public class ShuffleSourceFlowLifeCycle extends AbstractFlowLifeCycle private final ShuffleAction shuffleAction; private final int shuffleBatchSize; private final IQueue>[] shuffles; - private List> unsentBuffer; + private Map>> unsentBufferMap = new HashMap<>(); private final Map alignedBarriers = new HashMap<>(); private long currentCheckpointId = Long.MAX_VALUE; private int alignedBarriersCounter = 0; @@ -71,6 +71,8 @@ public void collect(Collector> collector) throws Exception { for (int i = 0; i < shuffles.length; i++) { IQueue> shuffleQueue = shuffles[i]; + List> unsentBuffer = + unsentBufferMap.computeIfAbsent(i, k -> new LinkedList<>()); if (shuffleQueue.size() == 0) { emptyShuffleQueueCount++; continue; @@ -84,9 +86,9 @@ public void collect(Collector> collector) throws Exception { List> shuffleBatch = new LinkedList<>(); if (alignedBarriersCounter > 0) { shuffleBatch.add(shuffleQueue.take()); - } else if (unsentBuffer != null && !unsentBuffer.isEmpty()) { - shuffleBatch = unsentBuffer; - unsentBuffer = null; + } else if (!unsentBuffer.isEmpty()) { + shuffleBatch.addAll(unsentBuffer); + unsentBuffer.clear(); } shuffleQueue.drainTo(shuffleBatch, shuffleBatchSize); @@ -121,9 +123,8 @@ public void collect(Collector> collector) throws Exception { } if (recordIndex + 1 < shuffleBatch.size()) { - unsentBuffer = - new LinkedList<>( - shuffleBatch.subList(recordIndex + 1, shuffleBatch.size())); + unsentBuffer.addAll( + shuffleBatch.subList(recordIndex + 1, shuffleBatch.size())); } break; } else {