From 5097dfcd6d99c0359cddfafe06ea7268a90f7f22 Mon Sep 17 00:00:00 2001 From: liuli Date: Thu, 25 May 2023 23:09:37 +0800 Subject: [PATCH 1/3] [Improve][Zeta] Reduce the operation count of imap_running_job_metrics --- .../seatunnel/engine/common/Constant.java | 2 + .../engine/server/TaskExecutionService.java | 78 ++++++++++--------- .../execution/TaskExecutionContext.java | 10 ++- .../engine/server/master/JobMaster.java | 36 ++++++--- 4 files changed, 78 insertions(+), 48 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java index 12e13bc1949..8d52a444aa7 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java @@ -56,4 +56,6 @@ public class Constant { public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map"; public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics"; + + public static final Long IMAP_RUNNING_JOB_METRICS_KEY = 1L; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 0b95baded64..a169c9fc2a7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -127,14 +127,15 @@ public class TaskExecutionService implements DynamicMetricsProvider { private final SeaTunnelConfig seaTunnelConfig; private final ScheduledExecutorService scheduledExecutorService; - - private CountDownLatch waitClusterStarted; + private final IMap> metricsImap; public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) { seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); this.hzInstanceName = nodeEngine.getHazelcastInstance().getName(); this.nodeEngine = nodeEngine; this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class); + this.metricsImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); MetricsRegistry registry = nodeEngine.getMetricsRegistry(); MetricDescriptor descriptor = @@ -446,7 +447,6 @@ public void provideDynamicMetrics( task.provideDynamicMetrics(copy3, context); }); }); - updateMetricsContextInImap(); } catch (Throwable t) { logger.warning("Dynamic metric collection failed", t); throw t; @@ -454,43 +454,51 @@ public void provideDynamicMetrics( } private synchronized void updateMetricsContextInImap() { + if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) { + logger.warning( + String.format( + "The Node is not ready yet, Node state %s,looking forward to the next " + + "scheduling", + nodeEngine.getNode().getState())); + return; + } Map contextMap = new HashMap<>(); contextMap.putAll(finishedExecutionContexts); contextMap.putAll(executionContexts); - try { - if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) { + HashMap localMap = new HashMap<>(); + contextMap.forEach( + (taskGroupLocation, taskGroupContext) -> { + taskGroupContext + .getTaskGroup() + .getTasks() + .forEach( + task -> { + // MetricsContext only exists in SeaTunnelTask + if (task instanceof SeaTunnelTask) { + SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task; + if (null != seaTunnelTask.getMetricsContext()) { + localMap.put( + seaTunnelTask.getTaskLocation(), + seaTunnelTask.getMetricsContext()); + } + } + }); + }); + if (localMap.size() > 0) { + try { + metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); + HashMap centralMap = + metricsImap.computeIfAbsent( + Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap<>()); + centralMap.putAll(localMap); + metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap); + } catch (Exception e) { logger.warning( - String.format( - "The Node is not ready yet, Node state %s,looking forward to the next " - + "scheduling", - nodeEngine.getNode().getState())); - return; + "The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", + e); + } finally { + metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); } - - IMap map = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); - contextMap.forEach( - (taskGroupLocation, taskGroupContext) -> { - taskGroupContext - .getTaskGroup() - .getTasks() - .forEach( - task -> { - // MetricsContext only exists in SeaTunnelTask - if (task instanceof SeaTunnelTask) { - SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task; - if (null != seaTunnelTask.getMetricsContext()) { - map.put( - seaTunnelTask.getTaskLocation(), - seaTunnelTask.getMetricsContext()); - } - } - }); - }); - } catch (Exception e) { - logger.warning( - "The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", - e); } this.printTaskExecutionRuntimeInfo(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java index 50faac1512b..14f993b9ece 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java @@ -29,6 +29,8 @@ import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; +import java.util.HashMap; + public class TaskExecutionContext { private final Task task; @@ -52,9 +54,13 @@ public ILogger getLogger() { } public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation taskLocation) { - IMap map = + IMap> map = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); - return map.computeIfAbsent(taskLocation, k -> new SeaTunnelMetricsContext()); + HashMap centralMap = + map.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY); + return centralMap == null || centralMap.get(taskLocation) == null + ? new SeaTunnelMetricsContext() + : centralMap.get(taskLocation); } public T getTask() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index de9da0a1bbd..a545eea68fd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -140,6 +140,8 @@ public class JobMaster { private final IMap runningJobInfoIMap; + private final IMap> metricsImap; + /** If the job or pipeline cancel by user, needRestore will be false */ @Getter private volatile boolean needRestore = true; @@ -170,6 +172,8 @@ public JobMaster( this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; this.runningJobInfoIMap = runningJobInfoIMap; this.engineConfig = engineConfig; + this.metricsImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); } public void init(long initializationTimestamp, boolean restart, boolean canRestoreAgain) @@ -538,17 +542,27 @@ public void removeMetricsContext( PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) { if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd() || pipelineStatus.equals(PipelineStatus.CANCELED)) { - IMap map = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); - map.keySet().stream() - .filter( - taskLocation -> { - return taskLocation - .getTaskGroupLocation() - .getPipelineLocation() - .equals(pipelineLocation); - }) - .forEach(map::remove); + try { + metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); + HashMap centralMap = + metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY); + if (centralMap != null) { + List collect = + centralMap.keySet().stream() + .filter( + taskLocation -> { + return taskLocation + .getTaskGroupLocation() + .getPipelineLocation() + .equals(pipelineLocation); + }) + .collect(Collectors.toList()); + collect.forEach(centralMap::remove); + metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap); + } + } finally { + metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); + } } } From c2945665cebc2d4af6ff38348c521f9b5efd6295 Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 5 Jun 2023 12:00:53 +0800 Subject: [PATCH 2/3] fix err --- .../org/apache/seatunnel/engine/server/master/JobMaster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index ba77ac21450..509e9e0a986 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -81,6 +81,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch; From 9be81139a800f0848cb905fd70755de0d453fc01 Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 5 Jun 2023 14:10:55 +0800 Subject: [PATCH 3/3] [Bugfix][Zeta] Fixed metricsImap initialization issues --- .../seatunnel/engine/server/CoordinatorService.java | 8 ++++++++ .../seatunnel/engine/server/TaskExecutionService.java | 5 ++--- .../apache/seatunnel/engine/server/master/JobMaster.java | 4 ++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index d38f2e02810..bf913a42452 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -37,9 +37,11 @@ import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.TaskExecutionState; import org.apache.seatunnel.engine.server.execution.TaskGroupLocation; +import org.apache.seatunnel.engine.server.execution.TaskLocation; import org.apache.seatunnel.engine.server.master.JobHistoryService; import org.apache.seatunnel.engine.server.master.JobMaster; import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil; +import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager; import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory; import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile; @@ -53,6 +55,7 @@ import com.hazelcast.spi.impl.NodeEngineImpl; import lombok.NonNull; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -126,6 +129,8 @@ public class CoordinatorService { */ private IMap> ownedSlotProfilesIMap; + private IMap> metricsImap; + /** If this node is a master node */ private volatile boolean isActive = false; @@ -191,6 +196,7 @@ private void initCoordinatorService() { nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS); ownedSlotProfilesIMap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES); + metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); jobHistoryService = new JobHistoryService( @@ -256,6 +262,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI runningJobStateTimestampsIMap, ownedSlotProfilesIMap, runningJobInfoIMap, + metricsImap, engineConfig); // If Job Status is CANCELLING , set needRestore to false @@ -419,6 +426,7 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf runningJobStateTimestampsIMap, ownedSlotProfilesIMap, runningJobInfoIMap, + metricsImap, engineConfig); executorService.submit( () -> { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 8747f9b207c..dd92e610425 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -133,15 +133,12 @@ public class TaskExecutionService implements DynamicMetricsProvider { private final SeaTunnelConfig seaTunnelConfig; private final ScheduledExecutorService scheduledExecutorService; - private final IMap> metricsImap; public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) { seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); this.hzInstanceName = nodeEngine.getHazelcastInstance().getName(); this.nodeEngine = nodeEngine; this.logger = nodeEngine.getLoggingService().getLogger(TaskExecutionService.class); - this.metricsImap = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); MetricsRegistry registry = nodeEngine.getMetricsRegistry(); MetricDescriptor descriptor = @@ -509,6 +506,8 @@ private synchronized void updateMetricsContextInImap() { nodeEngine.getNode().getState())); return; } + IMap> metricsImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); Map contextMap = new HashMap<>(); contextMap.putAll(finishedExecutionContexts); contextMap.putAll(executionContexts); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 509e9e0a986..241a6dbcb5e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -157,6 +157,7 @@ public JobMaster( @NonNull IMap runningJobStateTimestampsIMap, @NonNull IMap ownedSlotProfilesIMap, @NonNull IMap runningJobInfoIMap, + @NonNull IMap> metricsImap, EngineConfig engineConfig) { this.jobImmutableInformationData = jobImmutableInformationData; this.nodeEngine = nodeEngine; @@ -172,8 +173,7 @@ public JobMaster( this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; this.runningJobInfoIMap = runningJobInfoIMap; this.engineConfig = engineConfig; - this.metricsImap = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); + this.metricsImap = metricsImap; } public void init(long initializationTimestamp, boolean restart, boolean canRestoreAgain)