Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task history handling when missing data #2285

Merged
merged 3 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ public class SingularityConfiguration extends Configuration {
private boolean skipPersistingTooLongTaskIds = false;

private boolean allowEmptyRequestInstances = false;
private boolean verifyTaskDataWrites = false;

public long getAskDriverToKillTasksAgainAfterMillis() {
return askDriverToKillTasksAgainAfterMillis;
Expand Down Expand Up @@ -2152,12 +2151,4 @@ public boolean allowEmptyRequestInstances() {
public void setAllowEmptyRequestInstances(boolean allowEmptyRequestInstances) {
this.allowEmptyRequestInstances = allowEmptyRequestInstances;
}

public boolean isVerifyTaskDataWrites() {
return verifyTaskDataWrites;
}

public void setVerifyTaskDataWrites(boolean verifyTaskDataWrites) {
this.verifyTaskDataWrites = verifyTaskDataWrites;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1315,15 +1315,29 @@ public void createTaskAndDeletePendingTask(SingularityTask task) {
public Optional<SingularityTask> tryRepairTask(SingularityTaskId taskId) {
try {
Optional<SingularityTask> maybeTask = getTask(taskId); // checks zkCache for task data
String path = getTaskPath(taskId);
if (maybeTask.isPresent() && !exists(path)) {
LOG.info("Found info for task {} from cache not in zk node, rewriting", taskId);
save(path, maybeTask.map(taskTranscoder::toBytes));
leaderCache.putActiveTask(taskId);
if (maybeTask.isPresent() && repairFoundTask(maybeTask.get())) {
return maybeTask;
}
return maybeTask;
} catch (Exception e) {
return Optional.empty();
LOG.error("Could not find or repair task data for {}", taskId, e);
}
return Optional.empty();
}

public boolean repairFoundTask(SingularityTask task) {
try {
String path = getTaskPath(task.getTaskId());
LOG.info(
"Found info for task {} from cache not in zk node, rewriting",
task.getTaskId()
);
save(path, Optional.of(taskTranscoder.toBytes(task)));
leaderCache.putActiveTask(task.getTaskId());
taskCache.set(path, task);
return true;
} catch (Exception e) {
LOG.error("Could not repair task data for {}", task.getTaskId(), e);
return false;
}
}

Expand Down Expand Up @@ -1406,13 +1420,6 @@ private void createTaskAndDeletePendingTaskPrivate(SingularityTask task)
// Not checking isActive here, already called within offer check flow
leaderCache.putActiveTask(task.getTaskId());
taskCache.set(path, task);
if (configuration.isVerifyTaskDataWrites()) {
Optional<SingularityTask> maybeTask = getTaskCheckCache(task.getTaskId(), true);
if (!maybeTask.isPresent()) {
LOG.error("Found empty task after write for {}", task.getTaskId());
saveTaskDeletePendingInTransaction(hasErr, path, task, taskStatusHolder);
}
}
} catch (KeeperException.NodeExistsException nee) {
LOG.error("Task or active path already existed for {}", task.getTaskId());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.data.DeployManager;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.mesos.SingularitySchedulerLock;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class SingularityTaskHistoryPersister
private final DeployManager deployManager;
private final HistoryManager historyManager;
private final int agentReregisterTimeoutSeconds;
private final SingularitySchedulerLock singularitySchedulerLock;

@Inject
public SingularityTaskHistoryPersister(
Expand All @@ -50,6 +52,7 @@ public SingularityTaskHistoryPersister(
DeployManager deployManager,
HistoryManager historyManager,
SingularityManagedThreadPoolFactory managedThreadPoolFactory,
SingularitySchedulerLock singularitySchedulerLock,
@Named(SingularityHistoryModule.PERSISTER_LOCK) ReentrantLock persisterLock,
@Named(
SingularityHistoryModule.LAST_TASK_PERSISTER_SUCCESS
Expand All @@ -61,6 +64,7 @@ public SingularityTaskHistoryPersister(
this.deployManager = deployManager;
this.agentReregisterTimeoutSeconds =
configuration.getMesosConfiguration().getAgentReregisterTimeoutSeconds();
this.singularitySchedulerLock = singularitySchedulerLock;
}

@Override
Expand Down Expand Up @@ -88,22 +92,30 @@ public void runActionOnPoll() {
() -> {
try {
LOG.debug("Checking request {}", requestId);
List<SingularityTaskId> taskIds = taskManager.getTaskIdsForRequest(
requestId
List<SingularityTaskId> taskIds = singularitySchedulerLock.runWithRequestLockAndReturn(
() -> {
List<SingularityTaskId> ids = taskManager.getTaskIdsForRequest(
requestId
);
ids.removeAll(taskManager.getActiveTaskIdsForRequest(requestId));
ids.removeAll(taskManager.getLBCleanupTasks());
List<SingularityPendingDeploy> pendingDeploys = deployManager.getPendingDeploys();
ids =
ids
.stream()
.filter(
taskId ->
!isPartOfPendingDeploy(pendingDeploys, taskId) &&
!couldReturnWithRecoveredAgent(taskId)
)
.sorted(SingularityTaskId.STARTED_AT_COMPARATOR_DESC)
.collect(Collectors.toList());
return ids;
},
requestId,
"task history persister fetch"
);
taskIds.removeAll(taskManager.getActiveTaskIdsForRequest(requestId));
taskIds.removeAll(taskManager.getLBCleanupTasks());
List<SingularityPendingDeploy> pendingDeploys = deployManager.getPendingDeploys();
taskIds =
taskIds
.stream()
.filter(
taskId ->
!isPartOfPendingDeploy(pendingDeploys, taskId) &&
!couldReturnWithRecoveredAgent(taskId)
)
.sorted(SingularityTaskId.STARTED_AT_COMPARATOR_DESC)
.collect(Collectors.toList());

int forRequest = 0;
int transferred = 0;
for (SingularityTaskId taskId : taskIds) {
Expand Down
Loading