Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] Reduce the operation count of imap_running_job_metrics #4861

Merged
merged 4 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ public class Constant {
public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";

public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics";

public static final Long IMAP_RUNNING_JOB_METRICS_KEY = 1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
Expand All @@ -53,6 +55,7 @@
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -126,6 +129,8 @@ public class CoordinatorService {
*/
private IMap<PipelineLocation, Map<TaskGroupLocation, SlotProfile>> ownedSlotProfilesIMap;

private IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;

/** If this node is a master node */
private volatile boolean isActive = false;

Expand Down Expand Up @@ -191,6 +196,7 @@ private void initCoordinatorService() {
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
ownedSlotProfilesIMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
metricsImap = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);

jobHistoryService =
new JobHistoryService(
Expand Down Expand Up @@ -256,6 +262,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
metricsImap,
engineConfig);

// If Job Status is CANCELLING , set needRestore to false
Expand Down Expand Up @@ -419,6 +426,7 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
metricsImap,
engineConfig);
executorService.submit(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ public class TaskExecutionService implements DynamicMetricsProvider {

private final ScheduledExecutorService scheduledExecutorService;

private CountDownLatch waitClusterStarted;

public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties properties) {
seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
Expand Down Expand Up @@ -493,51 +491,60 @@ public void provideDynamicMetrics(
task.provideDynamicMetrics(copy3, context);
});
});
updateMetricsContextInImap();
} catch (Throwable t) {
logger.warning("Dynamic metric collection failed", t);
throw t;
}
}

private synchronized void updateMetricsContextInImap() {
if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
logger.warning(
String.format(
"The Node is not ready yet, Node state %s,looking forward to the next "
+ "scheduling",
nodeEngine.getNode().getState()));
return;
}
IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
Map<TaskGroupLocation, TaskGroupContext> contextMap = new HashMap<>();
contextMap.putAll(finishedExecutionContexts);
contextMap.putAll(executionContexts);
try {
if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
HashMap<TaskLocation, SeaTunnelMetricsContext> localMap = new HashMap<>();
contextMap.forEach(
(taskGroupLocation, taskGroupContext) -> {
taskGroupContext
.getTaskGroup()
.getTasks()
.forEach(
task -> {
// MetricsContext only exists in SeaTunnelTask
if (task instanceof SeaTunnelTask) {
SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task;
if (null != seaTunnelTask.getMetricsContext()) {
localMap.put(
seaTunnelTask.getTaskLocation(),
seaTunnelTask.getMetricsContext());
}
}
});
});
if (localMap.size() > 0) {
try {
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.computeIfAbsent(
Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap<>());
centralMap.putAll(localMap);
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
} catch (Exception e) {
logger.warning(
String.format(
"The Node is not ready yet, Node state %s,looking forward to the next "
+ "scheduling",
nodeEngine.getNode().getState()));
return;
"The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time",
e);
} finally {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
}

IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
contextMap.forEach(
(taskGroupLocation, taskGroupContext) -> {
taskGroupContext
.getTaskGroup()
.getTasks()
.forEach(
task -> {
// MetricsContext only exists in SeaTunnelTask
if (task instanceof SeaTunnelTask) {
SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task;
if (null != seaTunnelTask.getMetricsContext()) {
map.put(
seaTunnelTask.getTaskLocation(),
seaTunnelTask.getMetricsContext());
}
}
});
});
} catch (Exception e) {
logger.warning(
"The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time",
e);
}
this.printTaskExecutionRuntimeInfo();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;

import java.util.HashMap;

public class TaskExecutionContext {

private final Task task;
Expand All @@ -56,9 +58,13 @@ public ILogger getLogger() {
}

public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation taskLocation) {
IMap<TaskLocation, SeaTunnelMetricsContext> map =
IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
return map.computeIfAbsent(taskLocation, k -> new SeaTunnelMetricsContext());
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
map.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
return centralMap == null || centralMap.get(taskLocation) == null
? new SeaTunnelMetricsContext()
: centralMap.get(taskLocation);
}

public <T> T getTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;

Expand Down Expand Up @@ -139,6 +140,8 @@ public class JobMaster {

private final IMap<Long, JobInfo> runningJobInfoIMap;

private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap;

/** If the job or pipeline cancel by user, needRestore will be false */
@Getter private volatile boolean needRestore = true;

Expand All @@ -154,6 +157,7 @@ public JobMaster(
@NonNull IMap runningJobStateTimestampsIMap,
@NonNull IMap ownedSlotProfilesIMap,
@NonNull IMap<Long, JobInfo> runningJobInfoIMap,
@NonNull IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap,
EngineConfig engineConfig) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
Expand All @@ -169,6 +173,7 @@ public JobMaster(
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.runningJobInfoIMap = runningJobInfoIMap;
this.engineConfig = engineConfig;
this.metricsImap = metricsImap;
}

public void init(long initializationTimestamp, boolean restart, boolean canRestoreAgain)
Expand Down Expand Up @@ -545,17 +550,27 @@ public void removeMetricsContext(
PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) {
if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd()
|| pipelineStatus.equals(PipelineStatus.CANCELED)) {
IMap<TaskLocation, SeaTunnelMetricsContext> map =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
map.keySet().stream()
.filter(
taskLocation -> {
return taskLocation
.getTaskGroupLocation()
.getPipelineLocation()
.equals(pipelineLocation);
})
.forEach(map::remove);
try {
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
if (centralMap != null) {
List<TaskLocation> collect =
centralMap.keySet().stream()
.filter(
taskLocation -> {
return taskLocation
.getTaskGroupLocation()
.getPipelineLocation()
.equals(pipelineLocation);
})
.collect(Collectors.toList());
collect.forEach(centralMap::remove);
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
}
} finally {
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
}
}
}

Expand Down