Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vine: Prune Files in Dask Graph during Evaluation #3851

Open
cmoore24-24 opened this issue May 30, 2024 · 24 comments
Open

Vine: Prune Files in Dask Graph during Evaluation #3851

cmoore24-24 opened this issue May 30, 2024 · 24 comments
Assignees
Labels
bug For modifications that fix a flaw in the code. TaskVine

Comments

@cmoore24-24
Copy link

Issue: I am currently running into an issue when trying to run a process with Task/Dask Vine, wherein the process inevitably fails when the input dataset is too large. The symptoms are:

  • The process crashing because a dask task has failed 4 times, exiting the process
  • The process gets caught in a loop of recovery tasks-- producing, and presumably completing, recovery tasks indefinitely, never finishing the main process and requiring an manual interrupt

Use Case: Using Dask Vine to perform large calculations on QCD sample root files, and output the results into parquet files. The total size of the dataset(s) that fail is ~170 GB. I have independently worked around this issue by manually cutting the datasets into batches; doing one-quarter of the dataset at a time has been a successful workaround thus far.

Manager Options:

  • Min Workers = Max Workers = 280
  • Cores = 12
  • Memory = cores*19000 = 228000 MB
  • Disk = cores*20000 = 240000 MB

I will work on recreating this scenario on my end so I can provide the vine-run-info logs.

@dthain
Copy link
Member

dthain commented May 30, 2024

@BarrySlyDelgado I think you mentioned that the distributed storage plot may come in handy here, is that present in our repo so that Connor can try it out?

@dthain dthain added bug For modifications that fix a flaw in the code. TaskVine labels May 30, 2024
@BarrySlyDelgado
Copy link
Contributor

Once the logs are generated we should see what tasks are failing and what workers are failing (If any). I currently suspect that the the accumulation of intermediate results may cause workers to fail. This is somewhat of a knock on affect of TaskVine scheduling tasks to workers that have the most bytes of data dependencies without checking if data transfers will cause that worker to exceed its allocation.
The tool that generates the distributed storage plot is Its not currently present but I'll add it.

@cmoore24-24
Copy link
Author

Apologies for taking so long to get logs-- for the past few days, I haven't been able to get more than ~20 workers at a time, probably a combination my bad priority and heavy use by other users. I will keep trying to make a faithful recreation of the issue for the purposes of logs.

@dthain
Copy link
Member

dthain commented Jun 3, 2024

Hmm, does the crash occur when using the large dataset with a small number of workers? (Our first rough hypothesis would suggest that a crash is more likely with fewer workers...)

@cmoore24-24
Copy link
Author

I haven't let it go long enough to get to a crash, but I can tell you the behavior where essentially countless recovery tasks are produced occurs almost immediately.

@cmoore24-24
Copy link
Author

Hi @BarrySlyDelgado, it looks like I'm going to have trouble getting a faithful recreation; even backing off on the memory and disk I request, I can't get any more than 70 workers. That said, I did let a run go to completion after discussing with Doug above. There were quite a few task failures, and many recovery tasks, but this run actually managed to complete, which greatly shocked me. I've never been able to get this program to run all the way through when I was requesting 150+ workers-- could there be such a thing as too many workers?

Either way, I've attached the logs from this run here in case they would be helpful.
logs.zip

@BarrySlyDelgado
Copy link
Contributor

At the time of writing, the number there seems to be a limited amount of machines in the condor pool which would suggest why you were not getting your requested amount of workers. I'll take a look at the logs to see what I can get from them. Also, do you have the code which writes the parquet files?

@cmoore24-24
Copy link
Author

Sure, here's the code that does the skimming: https://github.com/cmoore24-24/hgg/blob/main/notebooks/skims/skimmer/skimmer.py

@BarrySlyDelgado
Copy link
Contributor

I have some potential insights that will probably need to be addressed further. I've plotted the average task runtime left-y (seconds) and the number of tasks of a specific category right-y:
Screenshot from 2024-06-04 14-57-40 There are roughly 500 tasks within the workflow that take roughly an hour on average to complete. It may be possible that some workers are getting evicted in that time period, possibly causing some of the retires. For the most part, each worker has sufficient cache space for the data that its handling, and we see relatively a small amount of worker stoppages during workflow execution (of those that have been scheduled tasks) from this graph:
Screenshot from 2024-06-04 15-30-13 Here, each + represents when a task is scheduled to a worker, where the color represents the category. What is strange to see is that most of the purple tasks accumulate on a single worker raising its cache utilization near it's limit. If this is a similar occurrence on other runs its possible that this could be a point of failure.

To go forward, I think we should investigate what the long running red tasks are and why purple tasks seem to be less distributed across workers.

FYI @dthain

@dthain
Copy link
Member

dthain commented Jun 5, 2024

Very interesting, the plots really help here!

It would not be hard to imagine a case where the scheduler prefers to run jobs on the nodes that already have the most data, and thus prefers the same node over and over again until it is full.

The scheduler certainly has access to quite a lot of information about storage utilization, but is not currently making use of it. We could do a better job there.

@dthain
Copy link
Member

dthain commented Jun 12, 2024

(Giving Barry a break on this one while he is away for the summer.)

@JinZhou5042 is going to start looking into this matter. The key issue as we understand it is that the DaskVine executor is not cleaning up intermediate files as they are consumed. We need two things to make this work:
1 - The TaskVine API needs a way to indicate that a file's data is no longer needed, without forgetting the logical existing of the file. This looks something like m.prune_file(f) to delete all of the replicas. However, the manager will still be capable of recreating/replicating the file if failures make it needed.
2 - The DaskVine executor then needs to invoke prune_file to clean up replicas onec they are fully consumed by all known tasks. It should only undeclare_file at the very end when nothing more is needed.

@dthain dthain changed the title vine: Task Vine processes crash when input dataset is too large Vine: Prune Files in Dask Graph during Evaluation Jun 12, 2024
@dthain
Copy link
Member

dthain commented Jun 24, 2024

@JinZhou5042 please set up a time to talk with @btovar to discuss DaskVine and make sure you understand what is going on there. Then, you can put in m.prune_file(f) where appropriate.

@JinZhou5042
Copy link
Member

JinZhou5042 commented Jul 10, 2024

I've been able to run this application always into completion within 5 hours, and there is still a room to make it better. There are different layers of problems:

1. Not enough workers

image For this run I requested only 50 workers, each with 200GB. The total disk space (50*200GB) was unable to handle such amount of output files in the earlier stages, so the workers were repeating filling up the disk, disconnecting from the manager, re-connecting, submitting notable recovery tasks to re-create the lost files, and filling the disk again. Thus, this program only made a little progress at the beginning but consumed extensive resources.

To solve this problem, one solution is to increase the number of workers and the amount of disk space on each worker.

image

For this run I had 200 workers, each with 200GB. As seen, the disk pressure had been largely reduced, which was mainly because the manager submitted much less recovery tasks than the previous one.

2. The workers are remaining stale files

By doubling down the resources, we can reduce the possibility to have a worker failed, so that we have less files lost and submit less recovery tasks. However, here another significant issue is that, if we look into the graph structure of this application (it consists of 26 homogeneous subgraphs as the following one), most of the output files are consumed by at most 2 tasks.

subgraph_17

That said, because a file has been consumed by its child tasks will no longer be needed, we can prune it among the workers. This means removing it from the workers' disks, allowing us to clean the stale files as we proceed.

image

For this run, I requested 100 workers, each with 200GB. As shown, the peak disk usage of all workers dropped from 200GB to only 110GB, there are no workers running out the disk and requesting disconnection.

3. Recovery lost files immediately

However, at this point, I am still unable to run this application into completion. It is usually the case that it runs smoothly at the first 90% of tasks but stuck at the final stages with submitting cyclical and endless recovery tasks. If we look into the figure of task execution details among all workers, we find that though no workers are failed due to pilling up the disk, there are still several workers failing because of other reasons:

image

Here the blue rectangles represent the regular tasks, and the pink rectangles represent the recovery tasks. The reason to submit a recovery task is that we lost a file due to the worker disconnection. It is interesting to see that the recovery tasks run very sparsely.

If we look at some detailed information about task 56605, primarily focus on when each of the input files was firstly prepared and then re-created by another recovery task, we can see that some files, for example, temp-rnd-ywvbbmtzkaosdbx were initially produced 17205.95 ago but then recreated 2753.60 ago.

image

That was because the workers containing this file were disconnected and thus the manager submitted a recovery task 56606 to recreate it. Here is the file lifetime, including which worker held it, what time it was staged in and staged out.
image

Surprisingly, there is a huge gap between the file was lost on worker 16 and 186 due to worker disconnection and then re-created on worker 163! That said, we didn't timely recover the lost files until the manager finally figures out it is an input file for another task. This makes the situation worse because it has 26 homogenous subgraphs, and each subgraph has a final task requesting 100+ input files. The lost input files can be re-created until all the input files had previously been created and the manager finally schedules the target task. Submit recovery tasks earlier can definitely reduce the overall execution time.

I am pleased to find that we already had the technique to recover lost files on time by tuning parameter transfer-temps-recovery to 1.

At this stage, I am able to run it into completion at a bigger chance.

4. Recover all lost input files at a single shoot

Note that the recovery tasks were running sparsely at the end, it shouldn't be the case even the manager doesn't submit recovery tasks timely, and I am still in the position to see recovery tasks creating and submitting endlessly at the end. By looking into the submit time of each of the recovery tasks for the final fat task, it turned out that those recovery tasks were submitted one by one.

If we look at the source code that the manager checks if the input files are ready for a task

static int vine_manager_check_inputs_available(struct vine_manager *q, struct vine_task *t)
{
	struct vine_mount *m;
	LIST_ITERATE(t->input_mounts, m)
	{
		struct vine_file *f = m->file;
		if (f->type == VINE_TEMP && f->state == VINE_FILE_STATE_CREATED) {
			if (!vine_file_replica_table_exists_somewhere(q, f->cached_name)) {
				vine_manager_consider_recovery_task(q, f, f->recovery_task);
				return 0;
			}
		}
	}
	return 1;
}

This function returns 0 immediately if find an input file was lost. But in this graph, the final task requires 100+ input files, if we lost some of them, we took exceedingly long time to run the recovery tasks one-by-one, and while we recovered the previous lost files, we lose some other files again. This is the culprit that this application doesn't run to completion most often than not.

I modified the code so that it submits all the recovery tasks in one shoot, the application can be always done in 5 hours and the performance figure seems pretty good.

static int vine_manager_check_inputs_available(struct vine_manager *q, struct vine_task *t)
{
	struct vine_mount *m;
	int found = 1;
	// consider all input mounts
	LIST_ITERATE(t->input_mounts, m)
	{
		struct vine_file *f = m->file;
		if (f->type == VINE_TEMP && f->state == VINE_FILE_STATE_CREATED) {
			if (!vine_file_replica_table_exists_somewhere(q, f->cached_name)) {
				vine_manager_consider_recovery_task(q, f, f->recovery_task);
				found = 0;
			}
		}
	}
	return found;
}
image

5. Tasks that can consume some files have higher priority than regular tasks

This is a supplement that we may run this application faster. Note that the depth of the subgraphs is not big, mostly around 5, so there might be thousands of tasks in the ready queue, some are root task and some request input files that produced by another task. If we prefer to run the tasks that can consume some temp files, we get a bigger chance to clean the worker disk earlier.

image

I hacked some code to implement this idea and it is seemingly reducing the total time and relieving the disk storage slightly.

With the mentioned solutions, most of the runs I had complete around 5h, which seems good.

@JinZhou5042
Copy link
Member

Two ways to improve it:

1. Reduce worker disconnections

While the workers are not failing due to disk filling up, in the beginning of every run, several workers disconnect accidentally. I am assuming that some tasks are running out the worker memory, causing the disconnection of workers. Unfortunately, those tasks are usually long tasks. If we took 3000s to run a task and had a worker disconnection, we have to take another 3000s to run the recovery task, which lengthens the total program execution time.

If we have a mechanism on the worker to detect that a task is about to consume substantial amount of memory, the worker can limit the total cores to tell the manager to not schedule more tasks to there, or swap the process information from the memory to the disk and recover later when the memory pressure is relieved, or forsake some short tasks for long tasks because it doesn't cost much to recover them, we can further reduce the overall execution time from 5h to 4h or even less.

2. Dynamic worker cores

This application has more parallel tasks at the beginning and much less at the end. If we can assign more cores to the workers while starting and gradually limit as we go, the resource waste seems to be reduced.

@JinZhou5042
Copy link
Member

@cmoore24-24 @dthain FYI

@dthain
Copy link
Member

dthain commented Jul 10, 2024

Also @colinthomas-z80 FYI

@dthain
Copy link
Member

dthain commented Jul 10, 2024

@JinZhou5042 I hope you are keeping all of the logs from these runs handy, because you are putting together a nice body of evidence for a paper on the benefits/costs of using in-cluster storage. And I also like how you have expressed that we have several different issues, each of which can be addressed separately. Please go ahead and propose separate PRs for each of those little issues.

@dthain
Copy link
Member

dthain commented Jul 10, 2024

Now, I'm not sure about your comment above about workers crashing because of memory usage. What should happen is that the worker monitors the memory usage of the task as it runs. (Assuming the resource monitor is turned on.). If the task exceeds the allocated memory usage, the worker should kill it and return it to the manager, without the worker itself crashing.

But if the worker itself is crashing, then something else is going on.

Are you able to obtain the logs of crashed workers? (This should be possible if using the --debug-workers option to the factory.)

@JinZhou5042
Copy link
Member

JinZhou5042 commented Jul 10, 2024

I am not sure if I understand correctly, but there has to be a reason that several workers were disconnecting at a close time point. It is obvious that those workers are not filling up the disk, there might be other reasons but the only one I can think about is that the worker is running out the memory, and that is probably caused by some memory intensive tasks.

@JinZhou5042
Copy link
Member

I'll try to use the --debug-workers option on the factory and investigate the reason.

@JinZhou5042
Copy link
Member

Though the issue with the infinite loop submission of recovery tasks has been resolved, and we are now confident that the application can complete its execution, there remains a critical problem affecting the overall completion time. This problem introduces significant uncertainty to the total completion time of the entire application.

Here is the scatter plot of the execution time of each task where the x axis represents the task id and the y axis represents the time, we see that the time distribution is highly imbalanced, the majority of tasks finish in a relatively shorter period than the others.

task_execution_time_distribution (3)

In effect, if we look into the CDF of task execution time, it turns out that 90% of tasks finish in 500s and 95% finish in 1300s.

task_execution_time_distribution (4)

In a computing cluster, the total runtime of this application actually depends on the stability of the worker connections, and from my point of view this can be roughly categorized into the following scenarios:

  1. If no workers are lost, the entire task can be completed within 4 hours.
  2. If one or two workers are occasionally lost, some additional time will be required to run recovery tasks.
  3. If workers are frequently lost, especially those handling long tasks, a significant amount of extra time will be needed to run recovery tasks.
  4. If each worker can only remain connected for an average of around 5000 seconds, tasks with runtimes of 10000 seconds will repeatedly fail, submit recovery tasks, and then fail again, continuing this cycle. This will greatly extend the total runtime.

If we are lucky to have reliable workers, we may have this view, where the blue are regular tasks, the red are failed tasks and the pink are recovery tasks. It seems good because we only lose a couple of workers, we lose in the early stages and those tasks failing are not running for too long, which means that they don't extend the total runtime for too much:

task_execution_details (2)

However, if we are frequently losing workers, we need notable extra time to run recovery tasks, especially those long tasks (longer than 5000s):

task_execution_details

Therefore, at this stage, I think the key to improving overall performance is to reduce the failure probability of long tasks or to decrease the cost of recovering them. I think there are 2 ways to achieve this:

  1. Implement checkpointing for long tasks on each worker. For example, once the worker figures out that a task has been running for 1000s, it creates a checkpoint and return the file to the manager. If the task fails, the manager can resume the task from the last checkpoint. If the task completes successfully, the checkpoint file is deleted.

  2. For long tasks, submit duplicate copies to reduce the cost of recovering from scratch. For example, if a task has been running for 1000 seconds, immediately submit an identical copy to another worker. This way, if a worker fails, we do not have to start the task from the beginning. If a task has multiple copies, delete the remaining copies once the first one completes. This method should be particularly useful for applications with highly imbalanced task execution time distributions, as there are notable idle resources in the tail.

@JinZhou5042
Copy link
Member

There are 183 distinct categories of this application (183 different kinds of functions), I plotted the task count, average/max execution time of each category:

task-category-information-container

It shows that:

  • The average execution time is positively correlated with the maximum execution time.
  • Long tasks are concentrated in a few categories.
  • Categories with fewer tasks usually have shorter execution times, we can ignore them.

This indicates that we can infer whether the task execution times across the entire graph are evenly or unevenly distributed based on the current running conditions, even without running all the tasks. If the distribution is imbalanced, we can identify which categories might contain long tasks based on the current performance of different task types, and then apply the following optimizations to these categories:

  1. Increase the priority of scheduling these long tasks. Running long tasks before short tasks can reduce the total runtime.
  2. Submit more replica tasks for a few specific categories to reduce the overhead of long task failures and resubmissions.
  3. In a fully serverless platform, we could even save the historical execution performance of each category and use better algorithms to determine whether and how many replica tasks to submit for tasks in a particular category.

@JinZhou5042
Copy link
Member

JinZhou5042 commented Jul 15, 2024

The idea is to replicate long tasks on idle workers to keep those underutilized resources busy for better failure tolerance. For example, for the following dag (where the red star represents long tasks) running on 2 workers, each with 3 cores, we first schedule task 1 and 2.

image

After finishing, task 3, 4, 5 and 6 are ready to execute.
image

Task 3, 4, 5 complete, schedule task 7, 8, 9, 10, 11 to run. Among them, task 6, 7, 9 are long tasks.

image

Then, task 8, 10, 11 complete, schedule task 13 to run.

image

Now, the 4 running tasks are running long and all of the subsequent tasks require the completion of them. But here is the thing. if one of the workers crashes, the price of recovery tasks running on it depends on the duration of the longest one. Given that there are idle resources that are underutilized, we can replicate the longest tasks from one worker to another, so that we don't take too much time to submit recovery tasks from scratch if one worker crashes.

image

Then, task 7, 9 and 13 complete, delete their replicas and schedule 15. Now there are only 2 tasks running and one of them has been running for a while. As there are idle resources, submit a replica task on another worker.

image

Task 6 completes, delete its replica and schedule task 12 to run
image

And finally we have a long task 14, replicate this task on 2 workers.

image

I think this approach might help to make the overall execution time more consistent on such imbalanced task graph, especially when there are a small portion of extremely longer tasks than the others and the worker connections are unreliable.

@JinZhou5042
Copy link
Member

The evidence for that the total execution time depends on the recovery cost of long tasks, rather than losing workers or filling up the disk:

For this run I had 100 workers, the total runtime was 33231s.

execution_details

For the second run I had 84 workers at most, and the total time was half of the first one (14652s).

execution_details (1)

The first run took less time to finish the first 90% of tasks, but took considerably longer to finish the tails, while the second took longer to finish the first 90% tasks but didn't take a while to complete all subsequent tasks.

Both of them suffered worker crashing in the earlier phases, however, the second one had more reliable worker connections later and not too many long tasks were recovered. I believe that's the reason it's twice faster than the first one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug For modifications that fix a flaw in the code. TaskVine
Projects
Development

No branches or pull requests

4 participants