Skip to content

Commit

Permalink
[BugFix][Zeta] Fix finding TaskGroup deployment node bug (apache#4449)
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y authored and EricJoy2048 committed Apr 9, 2023
1 parent d805f4b commit 039acb2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ServerConfigOptions {
public static final Option<Integer> JOB_METRICS_BACKUP_INTERVAL =
Options.key("job-metrics-backup-interval")
.intType()
.defaultValue(60)
.defaultValue(10)
.withDescription("The interval (in seconds) of job metrics backups");

public static final Option<ThreadShareMode> TASK_EXECUTION_THREAD_SHARE_MODE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,19 @@ public boolean isSavePointEnd() {
}

protected InvocationFuture<?> sendOperationToMemberNode(TaskOperation operation) {
log.debug(
"Sead Operation : "
+ operation.getClass().getSimpleName()
+ " to "
+ jobMaster.queryTaskGroupAddress(
operation.getTaskLocation().getTaskGroupLocation())
+ " for task group:"
+ operation.getTaskLocation().getTaskGroupLocation());
return NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
operation,
jobMaster.queryTaskGroupAddress(
operation.getTaskLocation().getTaskGroupLocation().getTaskGroupId()));
operation.getTaskLocation().getTaskGroupLocation()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -400,23 +399,23 @@ public void cleanJob() {
removeJobIMap();
}

public Address queryTaskGroupAddress(long taskGroupId) {
for (PipelineLocation pipelineLocation : ownedSlotProfilesIMap.keySet()) {
Optional<TaskGroupLocation> currentVertex =
ownedSlotProfilesIMap.get(pipelineLocation).keySet().stream()
.filter(
taskGroupLocation ->
taskGroupLocation.getTaskGroupId() == taskGroupId)
.findFirst();
if (currentVertex.isPresent()) {
return ownedSlotProfilesIMap
.get(pipelineLocation)
.get(currentVertex.get())
.getWorker();
public Address queryTaskGroupAddress(TaskGroupLocation taskGroupLocation) {

PipelineLocation pipelineLocation =
new PipelineLocation(
taskGroupLocation.getJobId(), taskGroupLocation.getPipelineId());

Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
ownedSlotProfilesIMap.get(pipelineLocation);

if (null != taskGroupLocationSlotProfileMap) {
SlotProfile slotProfile = taskGroupLocationSlotProfileMap.get(taskGroupLocation);
if (null != slotProfile) {
return slotProfile.getWorker();
}
}
throw new IllegalArgumentException(
"can't find task group address from task group id: " + taskGroupId);
"can't find task group address from taskGroupLocation: " + taskGroupLocation);
}

public ClassLoader getClassLoader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ private CompletableFuture<Void> schedulerPipeline(SubPlan pipeline) {
pipeline,
jobMaster.getOwnedSlotProfiles(pipeline.getPipelineLocation()));

log.debug("slotProfiles: {}", slotProfiles);
log.debug(
"slotProfiles: {}, PipelineLocation: {}",
slotProfiles,
pipeline.getPipelineLocation());

// To ensure release pipeline resource after new master node active, we need store
// slotProfiles first and then deploy tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ public void run() throws Exception {
() ->
server.getCoordinatorService()
.getJobMaster(taskLocation.getJobId())
.queryTaskGroupAddress(
taskLocation
.getTaskGroupLocation()
.getTaskGroupId()),
.queryTaskGroupAddress(taskLocation.getTaskGroupLocation()),
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
Expand Down

0 comments on commit 039acb2

Please sign in to comment.