-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve distributed load #18362
Improve distributed load #18362
Conversation
private final Queue<LoadSubTask> mRetrySubTasksDLQ = new ArrayDeque<>(); | ||
// Only the most recent 1k failures are persisted. | ||
// If more are needed, can turn on debug LOG for this class. | ||
private final Queue<Pair<LoadSubTask, String>> mRecentFailures = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we store the whole error messages for every error. Each entry takes up 2KB memory and I think it's too much. Plus --verbose cannot really display these much information. So I only keep the most recent 1000 ones to avoid OOM.
dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java
Show resolved
Hide resolved
dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java
Show resolved
Hide resolved
LoadSubTask subTaskToRetry = mRetrySubTasksDLQ.poll(); | ||
String path = subTaskToRetry.getUfsPath(); | ||
try { | ||
if (GET_LATEST_UFS_ON_RETRY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provide an option to avoid getting metadata again. getting an ufs status without any concurrency is low effcient. files to load (in AI use cases) barely change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can potentially delete this option and remove handling this situation. If we get not found exception here means we would get exception from worker anyway. Having this just complicates the logic. I would defer to you to make the call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i agree with you i actually wanted to delete this one but i just wanted to be not so aggressive. if you also think this needs to be removed, i'd be more than happy to do so.
@@ -249,46 +327,6 @@ private Map<WorkerInfo, DoraLoadTask> aggregateSubTasks(List<LoadSubTask> subTas | |||
return workerToTaskMap; | |||
} | |||
|
|||
private List<LoadSubTask> createSubTasks(UfsStatus ufsStatus, Set<WorkerInfo> workers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these have been moved inside the LoadSubTaskIterator
addFileFailure(subTask.getUfsPath(), type, message); | ||
mRetryCount.remove(subTask); | ||
return true; | ||
if (subTask.isRetry() || !isHealthy()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically we only retry once so if a subtask has been retried, just fail it.
@@ -402,7 +478,8 @@ public int hashCode() { | |||
|
|||
@Override | |||
public boolean isHealthy() { | |||
long totalFailureCount = mTotalFailureCount.get(); | |||
// Tasks to retry are considered as "failed" tasks when calculating if the job is healthy. | |||
long totalFailureCount = mTotalFinalFailureCount.get() + mRetrySubTasksDLQ.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^
@@ -38,11 +41,22 @@ public MembershipManagerWorkerProvider(MembershipManager membershipMgr, | |||
FileSystemContext context) { | |||
mMembershipManager = membershipMgr; | |||
mContext = context; | |||
if (context != null && context.getClusterConf() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous implementation wasn't consistent with the client implementation where USER_DYNAMIC_CONSISTENT_HASH_RING_ENABLED is checked to determine if dead workers are included or not.
I'm open to revoke this change though but i just feel the current status is not consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep it for consistency for now.
Task task = tasksQ.peek(); | ||
// only make sure 1 task is running at the time | ||
if (task == null || task.getResponseFuture() != null) { | ||
LOG.debug("head task is {}", (task == null) ? "NULL" : "already running"); | ||
return; | ||
} | ||
if (blkWorkerClientResource == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this either.
If the change above is applied and when USER_DYNAMIC_CONSISTENT_HASH_RING_ENABLED is set true, membership change will lead to a subtask to be dropped. We need a way to notify the job so that it can fail the task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree with this change. Previously we didn't add that since I think in AI ML use case dynamic is not that useful since all data need to be loaded into alluxio first(& that's where load command is used) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I guess here if we add file directly to failure is also not a correct behavior in doraloadjob under dynamic hash ring situation since we want to assign task to another worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad. i meant add this file to retry instead of just failing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, thanks for the improvement!
LoadSubTask subTaskToRetry = mRetrySubTasksDLQ.poll(); | ||
String path = subTaskToRetry.getUfsPath(); | ||
try { | ||
if (GET_LATEST_UFS_ON_RETRY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can potentially delete this option and remove handling this situation. If we get not found exception here means we would get exception from worker anyway. Having this just complicates the logic. I would defer to you to make the call
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) | ||
public static final PropertyKey MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR = | ||
stringBuilder(Name.MASTER_DORA_LOAD_JOB_FAILED_FILE_LIST_DIR) | ||
.setDefaultValue(format("${%s}/distributed_load", Name.WORK_DIR)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we put this under job/load
? potentially other job would put information in the directory too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i propose /job_results/load
Task task = tasksQ.peek(); | ||
// only make sure 1 task is running at the time | ||
if (task == null || task.getResponseFuture() != null) { | ||
LOG.debug("head task is {}", (task == null) ? "NULL" : "already running"); | ||
return; | ||
} | ||
if (blkWorkerClientResource == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree with this change. Previously we didn't add that since I think in AI ML use case dynamic is not that useful since all data need to be loaded into alluxio first(& that's where load command is used) .
} | ||
mRetryDeadLetterQueueSize = job.mRetrySubTasksDLQ.size(); | ||
mTimeElapsed = | ||
(job.mJobFinishTimestamp == -1 ? CommonUtils.getCurrentMs() : job.mJobFinishTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use optional instead would be more intuitive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right thanks
dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java
Outdated
Show resolved
Hide resolved
if (mCurrentUfsStatusSubTaskIterator.hasNext()) { | ||
return mCurrentUfsStatusSubTaskIterator.next(); | ||
} | ||
UfsStatus ufsStatus = mUfsStatusIterator.next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to check mUfsStatusIterator hasNext before calling next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch... i let it throw no such element exception now
batchBuilder.add(mLoadSubTaskIterator.next()); | ||
numSubTasks++; | ||
} else if (!mRetrySubTasksDLQ.isEmpty()) { | ||
LoadSubTask subTaskToRetry = mRetrySubTasksDLQ.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A previous strategy would be retry at the end of line except there's too many subtasks wait to be retried(try to avoid memory pressure). And that's why we have RETRY_BLOCK_CAPACITY. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea this is a really good question... I just followed what we discussed last time in the meeting.
the downside of retry immediately is that if worker goes down for a while (e.g. 15 minutes), the immediate retry won't succeed and this will result in the load subtask to be considered as failure...
however, doing retry in the end of the loading process, as you mentioned, caused memory pressure. that's why i did a lot of changes to make the retry queue slim. I did an estimate, each retrying subtask in the queue takes about 0.5KB in memory. 10M subtasks to retry takes about 5GB memory. I updated the isHealthy() method to take the tasks in the retrying queue into account. We can set the job failure threshold to be something like 5M to terminate the job if there's too many tasks in the retrying queue.
Actually I have a better idea though. When we add jobs into the retry queue, we also attach a timestamp. We only retry subtasks that are "old" enough (e.g. only retry those which intially failed at least 15 minutes ago) and we can kae this configurable.
I'm going to add a TODO here for further improvement (likely will finish it in Q4 in 3.1 release) but let me know if you do think it's necessary to make such change in this PR. Thanks!
dora/core/server/master/src/main/java/alluxio/master/job/DoraLoadJob.java
Show resolved
Hide resolved
@@ -38,11 +41,22 @@ public MembershipManagerWorkerProvider(MembershipManager membershipMgr, | |||
FileSystemContext context) { | |||
mMembershipManager = membershipMgr; | |||
mContext = context; | |||
if (context != null && context.getClusterConf() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep it for consistency for now.
Task task = tasksQ.peek(); | ||
// only make sure 1 task is running at the time | ||
if (task == null || task.getResponseFuture() != null) { | ||
LOG.debug("head task is {}", (task == null) ? "NULL" : "already running"); | ||
return; | ||
} | ||
if (blkWorkerClientResource == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I guess here if we add file directly to failure is also not a correct behavior in doraloadjob under dynamic hash ring situation since we want to assign task to another worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, we can revisit the retry strategy later on
alluxio-bot, merge this please. |
### What changes are proposed in this pull request? Features: 1. Retry in the end of the load process only once 2. Persist failed file list once the load job is done Improvements: 1. Only keep the most recent 1000 errroes to save memory 2. Create a LoadSubTaskIterator and refactor the task generation progress 3. Add an option to skip fetching ufs status again to speed up the retry (deafaulted on) 4. --progress displays the elapsed time 5. Add a metric to record error attribution 6. --progresss displays subtask level metrics additionally 7. fast fail the task when hash ring moves 8. --progress --verbose displays information about the subtask (instead of the file name only) Bug fixes: 1. Fix the wrong behavior where the distributed load still gives all workers when dynamic hash ring is disabled 2. Fix the incorrect display of the loading progress report ![image](https://github.com/Alluxio/alluxio/assets/6771554/ff2c9ccb-0eed-472d-98cc-9cc53def37ba) pr-link: #18362 change-id: cid-7fb3f6f9230c04a01f7f466b32726c906e191ee2
### What changes are proposed in this pull request? Features: 1. Retry in the end of the load process only once 2. Persist failed file list once the load job is done Improvements: 1. Only keep the most recent 1000 errroes to save memory 2. Create a LoadSubTaskIterator and refactor the task generation progress 3. Add an option to skip fetching ufs status again to speed up the retry (deafaulted on) 4. --progress displays the elapsed time 5. Add a metric to record error attribution 6. --progresss displays subtask level metrics additionally 7. fast fail the task when hash ring moves 8. --progress --verbose displays information about the subtask (instead of the file name only) Bug fixes: 1. Fix the wrong behavior where the distributed load still gives all workers when dynamic hash ring is disabled 2. Fix the incorrect display of the loading progress report ![image](https://github.com/Alluxio/alluxio/assets/6771554/ff2c9ccb-0eed-472d-98cc-9cc53def37ba) pr-link: Alluxio#18362 change-id: cid-7fb3f6f9230c04a01f7f466b32726c906e191ee2
What changes are proposed in this pull request?
Features:
Improvements:
Bug fixes: