Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskVine Futures: High latency on chained tasks. #3892

Closed
gpauloski opened this issue Jul 30, 2024 · 47 comments · Fixed by #3902
Closed

TaskVine Futures: High latency on chained tasks. #3892

gpauloski opened this issue Jul 30, 2024 · 47 comments · Fixed by #3902
Assignees
Labels
bug For modifications that fix a flaw in the code. TaskVine

Comments

@gpauloski
Copy link
Contributor

This is related to PR #3876, but different enough that I figured I'd open it as a separate issue to track.

I was using the vine.FuturesExecutor and getting pretty poor task latency (> 1s) with no-op tasks (using a compute-zen3 node at CHI@TACC). To figure out if it was my vine.FuturesExecutor executor, I also tested with the Parsl TaskVineExecutor and got similarly bad performance.

I wrote a script to test round-trip task time, and it seems the vine.FuturesExecutor and TaskVineExecutor are much slower than just creating PythonTask directly. Is there a reason for this performance difference or have I configured something wrong? I also see quite some variance in the results for the "executor" and "parsl" configurations (as low as 0.25 and as high as 10). The "basic" configuration is a pretty consistent 25 tasks/s.

$ python taskvine.py --tasks 100 basic
Basic -- 25.752 tasks/s
$ python taskvine.py --tasks 100 executor
Executor -- 4.396 tasks/s
$ python taskvine.py --tasks 100 parsl
Parsl -- 1.394 tasks/s
import argparse
import time

import ndcctools.taskvine as vine

from parsl.concurrent import ParslPoolExecutor
from parsl.config import Config
from parsl.executors.taskvine import (
    TaskVineExecutor,
    TaskVineFactoryConfig,
    TaskVineManagerConfig,
)


def noop() -> None:
    pass


def basic(tasks: int) -> None:
    manager = vine.Manager(9123)
    factory = vine.Factory(manager=manager)
    factory.min_workers = 1
    factory.start()

    start = time.perf_counter()
    for i in range(tasks):
        task = vine.PythonTask(noop)
        manager.submit(task)

        while not manager.empty():
            result = manager.wait(5)
            if result:
                if isinstance(result.output, vine.PythonTaskNoResult):
                    print(f'Task {result.id} failed.')
    total = time.perf_counter() - start
    print(f'Basic -- {tasks/total:.3f} tasks/s')


def executor(tasks: int) -> None:
    with vine.FuturesExecutor(
        manager_name='taskvine-manager',
        opts={'min_workers': 1},
    ) as executor:
        start = time.perf_counter()
        for i in range(tasks):
            future = executor.submit(noop)
            future.result()
        total = time.perf_counter() - start
        print(f'Executor -- {tasks/total:.3f} tasks/s')


def parsl(tasks: int) -> None:
    config = Config(
        executors=[
            TaskVineExecutor(
                label="parsl-vine-example",
                manager_config=TaskVineManagerConfig(),
                factory_config=TaskVineFactoryConfig(min_workers=1),
            )
        ],
        run_dir='parsl-runinfo',
    )
    with ParslPoolExecutor(config) as executor:
        start = time.perf_counter()
        for i in range(tasks):
            future = executor.submit(noop)
            future.result()
        total = time.perf_counter() - start
        print(f'Parsl -- {tasks/total:.3f} tasks/s')


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('method', choices=['basic', 'executor', 'parsl'])
    parser.add_argument('--tasks', default=10, type=int)
    args = parser.parse_args()

    if args.method == 'basic':
        basic(args.tasks)
    elif args.method == 'executor':
        executor(args.tasks)
    elif args.method == 'parsl':
        parsl(args.tasks)


if __name__ == '__main__':
    main()

@benclifford pointed me at Issue #3432 which seems like a similar performance problem. It's unclear from the comments if the issue was only resolved in WorkQueue or also TaskVine.

@dthain dthain added bug For modifications that fix a flaw in the code. TaskVine labels Jul 30, 2024
@dthain
Copy link
Member

dthain commented Jul 30, 2024

@BarrySlyDelgado you are the futures expert, can you help out here?

@BarrySlyDelgado
Copy link
Contributor

I'm not familiar with the issue with the Parsl TaskVine Executor. However, TaskVine Futures were implemented initially using TaskVine 'sTemp Files which are files intended to be left out in the cluster. When result() is called on a future, a new retrievertask is spawned to read the temporary file and return it to the manager. Essentially, For N independent tasks retrieving the output for all N tasks eventually creates 2N tasks This is less apparent with workflows with highly coupled tasks, i.e., a reduction workflow that produces a singular result spawns N+1 tasks. There has since been functionality added to the TaskVine manager that allows for directly streaming the results stored in temp files but have yet to be integrated within futures. I will have to look further to see what is causing the inconsistency.

@gpauloski If you have any of the TaskVine logs from these runs, is there a way you can send them to me?

@dthain
Copy link
Member

dthain commented Jul 30, 2024

Also @colinthomas-z80 can you give us some insight into the distance in performance between plain TaskVine and Parsl+TaskVine.

@colinthomas-z80
Copy link
Contributor

The task throughput improvement for Work Queue was updated for TaskVine as well at Parsl 3038

While those PRs did improve throughput, the task submission pattern addressed is where one task is submitted and waited on until completion before the next task is submitted, and so on. This type of pattern unfortunately exposes the difficulty communicating tasks one at a time to the executor, and still remains to be inefficient.

If your application allows, it would be better to submit the tasks without waiting on the result after each submission, rather appending the futures to an array and waiting on it as_completed. This way Parsl will hand all of the tasks to TaskVine in short order.

@benclifford
Copy link

parsl+taskvine integrated performance is disappointing to me but let's not tangle that discussion too much here with @gpauloski original pure-TaskVine issue...

@gpauloski
Copy link
Contributor Author

@BarrySlyDelgado, do you have a preferred way of sharing the logs?

@colinthomas-z80, I am running a benchmark so the application is as simple as "submit N tasks given N workers and submit another tasks when one completes." The actual benchmark does use as_completed. You can see the code here (and docs) if you are curious.

I understand that this kind of benchmark doesn't play to TaskVine's strengths (it's not the only benchmark I am hoping to run), but I'm still surprised by the delta between the basic example and the FuturesExecutor. I ran this script on a single node with an NVMe drive, so I wouldn't expect the additional task for result retrieval to reduce throughput by an order of magnitude.

To @benclifford's point, the Parsl stuff may very well be a red herring. I used the TaskVineExecutor to try and sanity check if I had configured the FuturesExecutor wrong and stumbled upon more odd performance.

@dthain
Copy link
Member

dthain commented Jul 31, 2024

Ok, to summarize, we really have two different things going on here:

First - TaskVine Futures have a high latency when sequentially chained. This has two subproblems:

1 - The benchmark is calling future.result() for every task, while the intention for TaskVine futures is to (usually) pass the future directly to the recipient, so it doesn't need to be transferred back to the manager.

@gpauloski would it make sense to pass the future to the next function, instead of evaluating it? The method of transferring the result back to the manager involves submitting another task for that purpose.

@BarrySlyDelgado would it make sense to call File.contents instead?

Second, it seems that the Parsl-TaskVine executor still has some higher than expected latency when dealing with sequentially chained tasks. I'm going to separate that out into issue #3894 since that is a separate problem.

I agree that the sequentially-chained-task is not really our target workflow, but nonetheless, it does measure the submit-wait latency and improving that will have a variety of good downstream effects.

@dthain dthain changed the title Slow and inconsistent TaskVine performance TaskVine Futures: High latency on chained tasks. Jul 31, 2024
@BarrySlyDelgado
Copy link
Contributor

@gpauloski logs can be emailed to me directly - blsydelg@nd.edu

@BarrySlyDelgado
Copy link
Contributor

@dthain it would be better to have an option to directly stream the contents for a given file. However, It seems .contents only works for buffers and local files.

@dthain
Copy link
Member

dthain commented Jul 31, 2024

Hmm, do you have a test case showing that it doesn't work?
In principle, it should:

@dthain
Copy link
Member

dthain commented Jul 31, 2024

Oh, wait a minute, there are several layers in play here:
However note that File.contents() and vine_file_contents() fetch the contents of a file but return null if it has not been materialized. They don't know about futures. Something else in the futures layer would have to wait for the producing task to exit before calling that.

@BarrySlyDelgado
Copy link
Contributor

Right, within the python wrapper, File.contentsassumes the file is a local file or buffer. I assumed file.contents would have done the fetching itself. fetch_file makes this easier.

@dthain
Copy link
Member

dthain commented Jul 31, 2024

Ah, my mistake for adding to the confusion:

  • vine_fetch_file does what you want.
  • vine_file_contents only works on buffers, which is unfortunate.
  • VineFile.contents seems to be a strange hybrid of the two. I think it would be better if the functionality was fully in the C library, and this method just called the appropriate thing.

@gpauloski
Copy link
Contributor Author

I dug a bit more into my logs and noticed that my no-ops are not really no-ops and were taking ~0.5s. This turns out to be my module import overhead, so I'll probably need to figure out how to incorporate a "serverless" mode like in the Parsl executor.

I still observe about a 50% slow down when calling result() after each task, but I guess that makes sense given Barry's explanation of the retriever tasks. Getting rid of all my imports, I now get ~24 tasks/s with chained tasks and ~13 tasks/s with a bag of independent tasks.

I do notice that the 0.5s import overhead is present in both the actual task and the retriever task, so the total overhead ends up being 1s. Does the retriever have the side effect of importing dependencies of my task, and if so, is that intended? The retrieval task is very fast when I run a minimal example with no imports.

@BarrySlyDelgado, here's the logs (with the 0.5s imports for all task types) if you want to look further.

gpauloski-taskvine-logs.tar.gz

@BarrySlyDelgado
Copy link
Contributor

#3899 fixes the high latency for the futures example above. @gpauloski were you able observe overhead from imports with this fix as well? It is not intended that regular tasks also bring along imports. This could be an issue with how we are serializing the functions.

@gpauloski
Copy link
Contributor Author

@BarrySlyDelgado yeah, I still see the high import costs with the fix (<1 task/s). Commenting out all the imports gets me back to ~25 tasks/s.

@gpauloski
Copy link
Contributor Author

@JinZhou5042 here are the logs for two different runs with TaPS. The first run is a bag of tasks, and the second is a chain of tasks. All tasks are no-ops and I get ~2 tasks/sec in both cases (it was previously around ~1 task/sec before Barry's fix in #3899). Each directory contains the vine logs and some other logging by TaPS.

taps-taskvine-issue-3892.tar.gz

I used this commit 7c0510b of TaskVine (PR #3899) and this branch of TaPS https://github.com/proxystore/taps/tree/taskvine-issue-3892.

To reproduce, I ran this for the bag of tasks run:

$ python -m taps.run synthetic --structure bag --task-count 16 --bag-max-running 1 --executor taskvine --taskvine-workers 1
...
[2024-08-05 18:53:21.820] APP   (taps.apps.synthetic) :: Completed 16/16 (rate: 2.06 tasks/s)

And this for the sequential run:

$ python -m taps.run synthetic --structure sequential --task-count 16 --bag-max-running 1 --executor taskvine --taskvine-workers 1
...
[2024-08-05 18:57:25.605] APP   (taps.apps.synthetic) :: Task completion rate: 2.052 tasks/s

TaPS records each function execution time with a decorator. Looking at the tasks.jsonl file, each function took less than one millisecond (execution_end_time - execution_start_time). In the vine-run-info/most-recent/vine-logs/debug file, I see that tasks are taking on average ~0.48s which matches up with ~2 tasks per second.

@JinZhou5042
Copy link
Member

@gpauloski I git cloned your repo but didn't find taps.run under that directory. Is there another way to build it?

@gpauloski
Copy link
Contributor Author

taps.run is an executable module rather than a script, so you will need to install TaPS into the same Conda environment that has TaskVine installed.

git clone ...
cd taps
git checkout taskvine-issue-3892
pip install -e .

@JinZhou5042
Copy link
Member

That works! I observe the very similar performance issue as you have. Is there a way to modify the source code using TaskVine for the purpose of enabling the serverless mode?

@gpauloski
Copy link
Contributor Author

To enable serverless mode, would you need to modify the app or the executor? I ask because there is a strong separation between the two in TaPS. I.e., you won't be able to modify the app to add any TaskVine specific features, rather you would have to enable it within the FuturesExecutor.

For example, the app I used above is defined here taps/apps/synthetic.py and uses the run_bag_of_tasks() and run_sequential() functions specifically. However, these functions submit tasks to an AppEngine rather than TaskVine FuturesExecutor directly. (This is how we can write apps that work with any Executor implementation.) In this case, the AppEngine is essentially a wrapper around the FuturesExecutor created in TaskVineConfig.get_executor() (defined in taps/executor/taskvine.py).

I did start trying to modify the FuturesExecutor to use the TaskVine libraries (the commented out code in taps/executor/taskvine.py). So you could certainly modify a subclassed FuturesExecutor there if you want, and change TaskVineConfig.get_executor() to return the subclass.

@JinZhou5042
Copy link
Member

JinZhou5042 commented Aug 6, 2024

Thanks for your explanation! I've already able to modify both parts and successfully used the serverless mode, but still unable to use engin.submit for serverless function calls, because in TaskVine, function calls are specified by their names (i.e., str object), but it seems that the executor in TaPS specify each task as a callable object (_TaskWrapper). Maybe there is a way to combine but I quite didn't find a solution at this moment.

Initially, the way to install a library looks like this:

libtask = engine.executor.create_library_from_functions(
    "test-library", noop_task, import_modules=[uuid, Data, generate_data, random], add_env=False
)
engine.executor.manager.tune('watch-library-logfiles', 1)
engine.executor.install_library(libtask)
engine.executor.enable_serverless('test-library')

Which will hoist all the required packages, functions or classes at the preamble of the library to reduce over-loading overhead.

The way to submit a task is like this (I used engine.executor.submit instead of engine.submit):

engine.executor.submit(
            noop_task,
            generate_data(task_data_bytes),
            output_size=task_data_bytes,
            sleep=task_sleep,
            task_id=uuid.uuid4(),
        )

These are mostly the changes I made in the application part.

I initially thought it would accelerate the runtime, because modules are hoisted and the functions can be focused on running the code. However, I got a very similar performance compared to using the non-serverless mode:
QQ_1722912076951

That said, the hoisting is not really the problem, there is something going wrong in the functions. Since for this particular application, all arguments are passing simple data structures (int, float and uuid.UUID) except data, and the function itself is not doing something expensive, I commented the data argument and instead call the generate_data inside of the function, which looks like:

def noop_task(
    output_size: int,
    sleep: float,
    task_id: uuid.UUID | None = None,
) -> Data:
    data = [generate_data(output_size)]
    
    start = time.perf_counter_ns()
    # Validate the data is real
    assert all(len(d.raw) >= 0 for d in data)
    result = generate_data(output_size)
    elapsed = (time.perf_counter_ns() - start) / 1e9

    # Remove elapsed time for generating result from remaining sleep time.
    time.sleep(max(0, sleep - elapsed))
    return result

And the function definition:

task = engine.executor.submit(
    noop_task,
    output_size=task_data_bytes,
    sleep=task_sleep,
    task_id=uuid.uuid4(),
)

I immediately got a boosted performance:
QQ_1722912938033

@JinZhou5042
Copy link
Member

However, it's worth noting that moving the generate_data from the argument to inside of the function doesn't benefit the performance in non-serverless mode.

That said, I guess there might exist two separate issues, one is that non-serverless mode doesn't re-use the repeated context and introduce some overhead. And one is that passing a function call as one of the arguments somehow induces notable overhead in TaskVine, which I never had such case when I tested on my laptop.

@JinZhou5042
Copy link
Member

Per the discussion with @dthain this morning, such slowdown was caused by the deserialization of class object (Data in this case). return Data(raw) in generate_data is very slow (~0.8 tasks/s) while return raw is very fast (~70 tasks/s). Our current hypothesis is that deserializing python objects in TaskVine is somehow expensive, will do more experiments on that.

@gpauloski
Copy link
Contributor Author

Thanks, @JinZhou5042! This is super informative.

I also ran into the issue of including functions in a library that were not defined top-level in a module (e.g., wrapped/decorated functions, partial functions, etc.). That said, I have been considering changing TaPS in a future update to support statically decorating the app task functions, so I think we could better support TaskVine serverless after that. (I probably won't get to that feature until the end of the month though.)

Nice work narrowing it down to the result serialization. This is helpful---I could, in the meantime, run experiments just returning the raw data. The Data type is there to add some indirection to solve an incompatibility with two other libraries, specifically, ProxyStore and Ray. So it would be fine to change it for the TaskVine experiments (commit for reference).

@JinZhou5042
Copy link
Member

@gpauloski I am not sure if you can have such speedup by unpassing python object to the task, because your code is using non-serverless mode and when I tried that yesterday, it didn't give me any speedup.

@JinZhou5042
Copy link
Member

But we are pretty sure that cloudpickling taskvine-generated pkl files is the culprit to introduce such latency.

I did a quick test to cloudpickle files created by taskvine and my little script, essentially they are the same data, dumping in the same way, running on the same machine, but the time cost to load each of them are significantly different:

QQ_1722962757914

@gpauloski
Copy link
Contributor Author

Oh, I believe I understand now. Just to make sure, the slow serialization of Data is only present in serverless mode when the function that returns Data is included in the library. And for my original code that does not use serverless, serializing Data is fast and the slowdown is mostly due to imports. Is that all correct?

I wonder if the deserialization is also triggering imports as a side effect?

@benclifford
Copy link

deserialization of stuff will often trigger imports - if you have a particular binary pickle file/dump that is "bad" i can help you understand what it's actually doing on deserialization if you attach it here

@dthain
Copy link
Member

dthain commented Aug 6, 2024

Jin, the two different pickled examples have the same size. Do they also have exactly the same content? (diff)

@JinZhou5042
Copy link
Member

@dthain They don't have the same content, the taskvine-created pkl file has 89 bytes while my little test pkl file has 1227 bytes.

@JinZhou5042
Copy link
Member

Since they one created by taskvine is much smaller but turned out it was slow, I am assuming it does some expensive 'import' stuff, which tries to import the taps module while serializing...

@BarrySlyDelgado
Copy link
Contributor

Some of the discussion from #3901 may be relevant here.

@JinZhou5042
Copy link
Member

I tried to import Data from the source code:

import os
import sys
import random
import cloudpickle


class Data:
    def __init__(self, raw: bytes) -> None:
        self.raw = raw
    def __len__(self) -> int:
        return len(self.raw)
    def __sizeof__(self) -> int:
        return sys.getsizeof(self.raw)


def generate_data(size: int) -> Data:
    max_bytes = int(1e9)
    if sys.version_info >= (3, 9) and size < max_bytes:
        raw = random.randbytes(size)
    else:
        raw = os.urandom(size)
    return Data(raw)


event = {'fn_args': (generate_data(0),), 'fn_kwargs': {}}

with open('test.pkl', 'wb') as f:
    cloudpickle.dump(event, f)

And it turns out that both files have the same file size and the first pickled file is slow but the second is fast
QQ_1722967235919

@dthain
Copy link
Member

dthain commented Aug 6, 2024

Ok, I think we finally ran the issue to ground.

The Problem: @benclifford was correct in that the un-cloudpickling of the Data object resulted in a horrific number of imports (12,000 system calls) processed on each reference, which were lost b/c we fork for each function execution. Imports in the function itself are not such a problem b/c the function is an intrinsic part of the serverless process, and only gets loaded once.

Workaround: The application could avoid this by only sending primitive types as arguments and results. Probably not a good solution for complex applications.

Solution: @JinZhou5042 is putting together a few small PRs that will connect the FuturesExecutor to use the serverless mode properly, and then adjust the serverless mode to perform argument un-cloudpickling in the parent process (rather than the child) so that imports are only suffered once.

More to follow tonight and tomorrow.

@JinZhou5042
Copy link
Member

I just went through the cloudpickle doc and it seems that there are two types of serializing a file: by value or by reference. An important difference between cloudpickle and pickle is that cloudpickle can serialize a function or class by value, whereas pickle can only serialize it by reference:

Serialization by reference treats functions and classes as attributes of modules, and pickles them through instructions that trigger the import of their module at load time. Serialization by reference is thus limited in that it assumes that the module containing the function or class is available/importable in the unpickling environment. This assumption breaks when pickling constructs defined in an interactive session, a case that is automatically detected by cloudpickle, that pickles such constructs by value.

By default, TaskVine cloudpickle dumps arguments by reference, which will try to collect all of the required modules and trigger the import of those modules when loading.

However, if we leverage the new feature provided by cloudpickle, which is to explicitly indicate which module we want to serialize by value, we don't keep the reference and will not to trigger imports on loading. For example -

This took me 0.8s or so while deserializing:

import cloudpickle
from taps.apps.synthetic import Data

with open('data.pkl', 'wb') as f:
    cloudpickle.dump(Data(None), f)

And this only took me around 0.0001s:

import cloudpickle
from taps.apps.synthetic import Data
import taps

cloudpickle.register_pickle_by_value(taps)
with open('data.pkl', 'wb') as f:
    cloudpickle.dump(Data(None), f)

@JinZhou5042
Copy link
Member

JinZhou5042 commented Aug 6, 2024

Given that it's dumping data by reference, so the hoisting imports should definitely help.

With data generated by this code (which is to dump by reference):

import cloudpickle
from taps.apps.synthetic import Data

with open('data.pkl', 'wb') as f:
    cloudpickle.dump(Data(None), f)

Deserializing by hoisting import taps doesn't give me a performance improve, which took about 0.8s:

import time
import taps
import cloudpickle

time_start = time.time()
with open('data.pkl', 'rb') as f:
    cloudpickle.load(f)
print(time.time() - time_start)

However, deserializing by import taps.apps.synthetic does improve the loading performance, which took about 0.0001s:

import time
import cloudpickle
import taps.apps.synthetic

time_start = time.time()
with open('data.pkl', 'rb') as f:
    cloudpickle.load(f)
print(time.time() - time_start)

@JinZhou5042
Copy link
Member

That said, we have three approaches -

  1. Deserializing arguments before forking, which will cache the needed environment for all subsequent function calls, but it causes some overhead in deserializing argument files and switching the working directory in the library process.
  2. Explicitly register the module to serialize by value, but this increases the file size and adds a burden in file transfer.
  3. Extract the module we want to hoist. For example, the type of one of the arguments is taps.apps.synthetic.Data, that way we can do a import taps.apps.synthetic in the parent process. But the problem is that arguments are passed later the library creation, that way we wouldn't know which module to hoist at the library's creation.

I am going to adopt the first solution here and would like to know a more decent way to solve it.

@JinZhou5042
Copy link
Member

JinZhou5042 commented Aug 7, 2024

@gpauloski You could do some little changes to your application in order to support the serverless mode and is likely to get a relatively better performance once #3902 and #3903 are merged. Take the run_bag_of_tasks as an example:

def run_bag_of_tasks(
    engine: AppEngine,
    task_count: int,
    task_data_bytes: int,
    task_sleep: float,
    max_running_tasks: int,
) -> None:
    """Run bag of tasks workflow."""
    max_running_tasks = min(max_running_tasks, task_count)
    start = time.monotonic()

    taskvine_serverless = True
    if taskvine_serverless:
        # declare the library
        libtask = engine.executor.create_library_from_functions(
            "test-library", noop_task, hoisting_modules=[Data, uuid, generate_data, random], add_env=False
        )
        engine.executor.install_library(libtask)

        for i in range(1, task_count + 1):
            # declare the function call
            function_call = engine.executor.future_funcall(
                "test-library", "noop_task",
                generate_data(task_data_bytes),
                output_size=task_data_bytes,
                sleep=task_sleep,
                task_id=uuid.uuid4(),
            )
            task_future = engine.executor.submit(function_call)

            # wait for the completion of the function call
            task_future.result()
            
            rate = i / (time.monotonic() - start)
            logger.log(
                APP_LOG_LEVEL,
                f'Completed {i}/{task_count} tasks '
                f'(rate: {rate:.2f} tasks/s'
            )

In my machine I got this throughput (compared to ~2 task/s previously):

QQ_1722995806779

@benclifford
Copy link

Looks like you've moved beyond this a bit now, but for completeness here is some dissection of the two pickle files in comment #3892 (comment)

I've disassembled the two files using pickletools (which is built into Python). I'm not expecting you to understand the whole assembly output but I included it alongside the discussion for completeness.

$ python3 -m pickletools taskvine.pkl 
    0: \x80 PROTO      5
    2: \x95 FRAME      78
   11: }    EMPTY_DICT
   12: \x94 MEMOIZE    (as 0)
   13: (    MARK
   14: \x8c     SHORT_BINUNICODE 'fn_args'
   23: \x94     MEMOIZE    (as 1)
   24: \x8c     SHORT_BINUNICODE 'taps.apps.synthetic'
   45: \x94     MEMOIZE    (as 2)
   46: \x8c     SHORT_BINUNICODE 'Data'
   52: \x94     MEMOIZE    (as 3)
   53: \x93     STACK_GLOBAL
   54: \x94     MEMOIZE    (as 4)
   55: )        EMPTY_TUPLE
   56: \x81     NEWOBJ
   57: \x94     MEMOIZE    (as 5)
   58: }        EMPTY_DICT
   59: \x94     MEMOIZE    (as 6)
   60: \x8c     SHORT_BINUNICODE 'raw'
   65: \x94     MEMOIZE    (as 7)
   66: C        SHORT_BINBYTES b''
   68: \x94     MEMOIZE    (as 8)
   69: s        SETITEM
   70: b        BUILD
   71: \x85     TUPLE1
   72: \x94     MEMOIZE    (as 9)
   73: \x8c     SHORT_BINUNICODE 'fn_kwargs'
   84: \x94     MEMOIZE    (as 10)
   85: }        EMPTY_DICT
   86: \x94     MEMOIZE    (as 11)
   87: u        SETITEMS   (MARK at 13)
   88: .    STOP
highest protocol among opcodes = 4

One description of this taskvine.pkl is roughly import taps.args.synthetic.Data; x = tasp.args.synthetic.Data(); set some attributes on x; return x - the STACK_GLOBAL opcode is the rough equivalent of import in this Pickle bytecode and it imports the things on the top of the stack (so usually right before it in the listing).

The other much longer pickle says:

$ python3 -m pickletools localtest.pkl 
    0: \x80 PROTO      5
    2: \x95 FRAME      1216
   11: }    EMPTY_DICT
   12: \x94 MEMOIZE    (as 0)
   13: (    MARK
   14: \x8c     SHORT_BINUNICODE 'fn_args'
   23: \x94     MEMOIZE    (as 1)
   24: \x8c     SHORT_BINUNICODE 'cloudpickle.cloudpickle'
   49: \x94     MEMOIZE    (as 2)
   50: \x8c     SHORT_BINUNICODE '_make_skeleton_class'
   72: \x94     MEMOIZE    (as 3)
   73: \x93     STACK_GLOBAL
   74: \x94     MEMOIZE    (as 4)
   75: (        MARK
   76: \x8c         SHORT_BINUNICODE 'builtins'
   86: \x94         MEMOIZE    (as 5)
   87: \x8c         SHORT_BINUNICODE 'type'
   93: \x94         MEMOIZE    (as 6)
   94: \x93         STACK_GLOBAL
   95: \x94         MEMOIZE    (as 7)
   96: \x8c         SHORT_BINUNICODE 'Data'
  102: \x94         MEMOIZE    (as 8)
  103: h            BINGET     5
  105: \x8c         SHORT_BINUNICODE 'object'
  113: \x94         MEMOIZE    (as 9)
  114: \x93         STACK_GLOBAL
  115: \x94         MEMOIZE    (as 10)
  116: \x85         TUPLE1
  117: \x94         MEMOIZE    (as 11)
  118: }            EMPTY_DICT
  119: \x94         MEMOIZE    (as 12)
  120: \x8c         SHORT_BINUNICODE '__module__'
  132: \x94         MEMOIZE    (as 13)
  133: \x8c         SHORT_BINUNICODE '__main__'
  143: \x94         MEMOIZE    (as 14)
  144: s            SETITEM
  145: \x8c         SHORT_BINUNICODE '7d3a6bb5b2c04f15b4d7510a01effcae'
  179: \x94         MEMOIZE    (as 15)
  180: N            NONE
  181: t            TUPLE      (MARK at 75)
  182: \x94     MEMOIZE    (as 16)
  183: R        REDUCE
  184: \x94     MEMOIZE    (as 17)
  185: h        BINGET     2
  187: \x8c     SHORT_BINUNICODE '_class_setstate'
  204: \x94     MEMOIZE    (as 18)
  205: \x93     STACK_GLOBAL
  206: \x94     MEMOIZE    (as 19)
  207: h        BINGET     17
  209: }        EMPTY_DICT
  210: \x94     MEMOIZE    (as 20)
  211: (        MARK
  212: h            BINGET     13
  214: h            BINGET     14
  216: \x8c         SHORT_BINUNICODE '__init__'
  226: \x94         MEMOIZE    (as 21)
  227: h            BINGET     2
  229: \x8c         SHORT_BINUNICODE '_make_function'
  245: \x94         MEMOIZE    (as 22)
  246: \x93         STACK_GLOBAL
  247: \x94         MEMOIZE    (as 23)
  248: (            MARK
  249: h                BINGET     2
  251: \x8c             SHORT_BINUNICODE '_builtin_type'
  266: \x94             MEMOIZE    (as 24)
  267: \x93             STACK_GLOBAL
  268: \x94             MEMOIZE    (as 25)
  269: \x8c             SHORT_BINUNICODE 'CodeType'
  279: \x94             MEMOIZE    (as 26)
  280: \x85             TUPLE1
  281: \x94             MEMOIZE    (as 27)
  282: R                REDUCE
  283: \x94             MEMOIZE    (as 28)
  284: (                MARK
  285: K                    BININT1    2
  287: K                    BININT1    0
  289: K                    BININT1    0
  291: K                    BININT1    2
  293: K                    BININT1    2
  295: K                    BININT1    3
  297: C                    SHORT_BINBYTES b'\x97\x00|\x01|\x00_\x00\x00\x00\x00\x00\x00\x00\x00\x00d\x00S\x00'
  319: \x94                 MEMOIZE    (as 29)
  320: N                    NONE
  321: \x85                 TUPLE1
  322: \x94                 MEMOIZE    (as 30)
  323: \x8c                 SHORT_BINUNICODE 'raw'
  328: \x94                 MEMOIZE    (as 31)
  329: \x85                 TUPLE1
  330: \x94                 MEMOIZE    (as 32)
  331: \x8c                 SHORT_BINUNICODE 'self'
  337: \x94                 MEMOIZE    (as 33)
  338: h                    BINGET     31
  340: \x86                 TUPLE2
  341: \x94                 MEMOIZE    (as 34)
  342: \x8c                 SHORT_BINUNICODE '/afs/crc.nd.edu/user/j/jzhou24/taps/t1.py'
  385: \x94                 MEMOIZE    (as 35)
  386: h                    BINGET     21
  388: \x8c                 SHORT_BINUNICODE 'Data.__init__'
  403: \x94                 MEMOIZE    (as 36)
  404: K                    BININT1    9
  406: C                    SHORT_BINBYTES b'\x80\x00\xd8\x13\x16\x88\x04\x8c\x08\x88\x08\x88\x08'
  421: \x94                 MEMOIZE    (as 37)
  422: C                    SHORT_BINBYTES b''
  424: \x94                 MEMOIZE    (as 38)
  425: )                    EMPTY_TUPLE
  426: )                    EMPTY_TUPLE
  427: t                    TUPLE      (MARK at 284)
  428: \x94             MEMOIZE    (as 39)
  429: R                REDUCE
  430: \x94             MEMOIZE    (as 40)
  431: }                EMPTY_DICT
  432: \x94             MEMOIZE    (as 41)
  433: (                MARK
  434: \x8c                 SHORT_BINUNICODE '__package__'
  447: \x94                 MEMOIZE    (as 42)
  448: N                    NONE
  449: \x8c                 SHORT_BINUNICODE '__name__'
  459: \x94                 MEMOIZE    (as 43)
  460: h                    BINGET     14
  462: \x8c                 SHORT_BINUNICODE '__file__'
  472: \x94                 MEMOIZE    (as 44)
  473: h                    BINGET     35
  475: u                    SETITEMS   (MARK at 433)
  476: N                NONE
  477: N                NONE
  478: N                NONE
  479: t                TUPLE      (MARK at 248)
  480: \x94         MEMOIZE    (as 45)
  481: R            REDUCE
  482: \x94         MEMOIZE    (as 46)
  483: h            BINGET     2
  485: \x8c         SHORT_BINUNICODE '_function_setstate'
  505: \x94         MEMOIZE    (as 47)
  506: \x93         STACK_GLOBAL
  507: \x94         MEMOIZE    (as 48)
  508: h            BINGET     46
  510: }            EMPTY_DICT
  511: \x94         MEMOIZE    (as 49)
  512: }            EMPTY_DICT
  513: \x94         MEMOIZE    (as 50)
  514: (            MARK
  515: h                BINGET     43
  517: h                BINGET     21
  519: \x8c             SHORT_BINUNICODE '__qualname__'
  533: \x94             MEMOIZE    (as 51)
  534: h                BINGET     36
  536: \x8c             SHORT_BINUNICODE '__annotations__'
  553: \x94             MEMOIZE    (as 52)
  554: }                EMPTY_DICT
  555: \x94             MEMOIZE    (as 53)
  556: (                MARK
  557: h                    BINGET     31
  559: h                    BINGET     5
  561: \x8c                 SHORT_BINUNICODE 'bytes'
  568: \x94                 MEMOIZE    (as 54)
  569: \x93                 STACK_GLOBAL
  570: \x94                 MEMOIZE    (as 55)
  571: \x8c                 SHORT_BINUNICODE 'return'
  579: \x94                 MEMOIZE    (as 56)
  580: N                    NONE
  581: u                    SETITEMS   (MARK at 556)
  582: \x8c             SHORT_BINUNICODE '__kwdefaults__'
  598: \x94             MEMOIZE    (as 57)
  599: N                NONE
  600: \x8c             SHORT_BINUNICODE '__defaults__'
  614: \x94             MEMOIZE    (as 58)
  615: N                NONE
  616: h                BINGET     13
  618: h                BINGET     14
  620: \x8c             SHORT_BINUNICODE '__doc__'
  629: \x94             MEMOIZE    (as 59)
  630: N                NONE
  631: \x8c             SHORT_BINUNICODE '__closure__'
  644: \x94             MEMOIZE    (as 60)
  645: N                NONE
  646: \x8c             SHORT_BINUNICODE '_cloudpickle_submodules'
  671: \x94             MEMOIZE    (as 61)
  672: ]                EMPTY_LIST
  673: \x94             MEMOIZE    (as 62)
  674: \x8c             SHORT_BINUNICODE '__globals__'
  687: \x94             MEMOIZE    (as 63)
  688: }                EMPTY_DICT
  689: \x94             MEMOIZE    (as 64)
  690: u                SETITEMS   (MARK at 514)
  691: \x86         TUPLE2
  692: \x94         MEMOIZE    (as 65)
  693: \x86         TUPLE2
  694: R            REDUCE
  695: 0            POP
  696: \x8c         SHORT_BINUNICODE '__len__'
  705: \x94         MEMOIZE    (as 66)
  706: h            BINGET     23
  708: (            MARK
  709: h                BINGET     28
  711: (                MARK
  712: K                    BININT1    1
  714: K                    BININT1    0
  716: K                    BININT1    0
  718: K                    BININT1    1
  720: K                    BININT1    3
  722: K                    BININT1    3
  724: C                    SHORT_BINBYTES b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00|\x00j\x01\x00\x00\x00\x00\x00\x00\x00\x00\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00'
  768: \x94                 MEMOIZE    (as 67)
  769: h                    BINGET     30
  771: \x8c                 SHORT_BINUNICODE 'len'
  776: \x94                 MEMOIZE    (as 68)
  777: h                    BINGET     31
  779: \x86                 TUPLE2
  780: \x94                 MEMOIZE    (as 69)
  781: h                    BINGET     33
  783: \x85                 TUPLE1
  784: \x94                 MEMOIZE    (as 70)
  785: h                    BINGET     35
  787: h                    BINGET     66
  789: \x8c                 SHORT_BINUNICODE 'Data.__len__'
  803: \x94                 MEMOIZE    (as 71)
  804: K                    BININT1    11
  806: C                    SHORT_BINBYTES b'\x80\x00\xdd\x0f\x12\x904\x948\x89}\x8c}\xd0\x08\x1c'
  824: \x94                 MEMOIZE    (as 72)
  825: h                    BINGET     38
  827: )                    EMPTY_TUPLE
  828: )                    EMPTY_TUPLE
  829: t                    TUPLE      (MARK at 711)
  830: \x94             MEMOIZE    (as 73)
  831: R                REDUCE
  832: \x94             MEMOIZE    (as 74)
  833: h                BINGET     41
  835: N                NONE
  836: N                NONE
  837: N                NONE
  838: t                TUPLE      (MARK at 708)
  839: \x94         MEMOIZE    (as 75)
  840: R            REDUCE
  841: \x94         MEMOIZE    (as 76)
  842: h            BINGET     48
  844: h            BINGET     76
  846: }            EMPTY_DICT
  847: \x94         MEMOIZE    (as 77)
  848: }            EMPTY_DICT
  849: \x94         MEMOIZE    (as 78)
  850: (            MARK
  851: h                BINGET     43
  853: h                BINGET     66
  855: h                BINGET     51
  857: h                BINGET     71
  859: h                BINGET     52
  861: }                EMPTY_DICT
  862: \x94             MEMOIZE    (as 79)
  863: h                BINGET     56
  865: h                BINGET     5
  867: \x8c             SHORT_BINUNICODE 'int'
  872: \x94             MEMOIZE    (as 80)
  873: \x93             STACK_GLOBAL
  874: \x94             MEMOIZE    (as 81)
  875: s                SETITEM
  876: h                BINGET     57
  878: N                NONE
  879: h                BINGET     58
  881: N                NONE
  882: h                BINGET     13
  884: h                BINGET     14
  886: h                BINGET     59
  888: N                NONE
  889: h                BINGET     60
  891: N                NONE
  892: h                BINGET     61
  894: ]                EMPTY_LIST
  895: \x94             MEMOIZE    (as 82)
  896: h                BINGET     63
  898: }                EMPTY_DICT
  899: \x94             MEMOIZE    (as 83)
  900: u                SETITEMS   (MARK at 850)
  901: \x86         TUPLE2
  902: \x94         MEMOIZE    (as 84)
  903: \x86         TUPLE2
  904: R            REDUCE
  905: 0            POP
  906: \x8c         SHORT_BINUNICODE '__sizeof__'
  918: \x94         MEMOIZE    (as 85)
  919: h            BINGET     23
  921: (            MARK
  922: h                BINGET     28
  924: (                MARK
  925: K                    BININT1    1
  927: K                    BININT1    0
  929: K                    BININT1    0
  931: K                    BININT1    1
  933: K                    BININT1    3
  935: K                    BININT1    3
  937: C                    SHORT_BINBYTES b'\x97\x00t\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00j\x01\x00\x00\x00\x00\x00\x00\x00\x00|\x00j\x02\x00\x00\x00\x00\x00\x00\x00\x00\xa6\x01\x00\x00\xab\x01\x00\x00\x00\x00\x00\x00\x00\x00S\x00'
  991: \x94                 MEMOIZE    (as 86)
  992: h                    BINGET     30
  994: \x8c                 SHORT_BINUNICODE 'sys'
  999: \x94                 MEMOIZE    (as 87)
 1000: \x8c                 SHORT_BINUNICODE 'getsizeof'
 1011: \x94                 MEMOIZE    (as 88)
 1012: h                    BINGET     31
 1014: \x87                 TUPLE3
 1015: \x94                 MEMOIZE    (as 89)
 1016: h                    BINGET     33
 1018: \x85                 TUPLE1
 1019: \x94                 MEMOIZE    (as 90)
 1020: h                    BINGET     35
 1022: h                    BINGET     85
 1024: \x8c                 SHORT_BINUNICODE 'Data.__sizeof__'
 1041: \x94                 MEMOIZE    (as 91)
 1042: K                    BININT1    13
 1044: C                    SHORT_BINBYTES b'\x80\x00\xdd\x0f\x12\x8c}\x98T\x9cX\xd1\x0f&\xd4\x0f&\xd0\x08&'
 1066: \x94                 MEMOIZE    (as 92)
 1067: h                    BINGET     38
 1069: )                    EMPTY_TUPLE
 1070: )                    EMPTY_TUPLE
 1071: t                    TUPLE      (MARK at 924)
 1072: \x94             MEMOIZE    (as 93)
 1073: R                REDUCE
 1074: \x94             MEMOIZE    (as 94)
 1075: h                BINGET     41
 1077: N                NONE
 1078: N                NONE
 1079: N                NONE
 1080: t                TUPLE      (MARK at 921)
 1081: \x94         MEMOIZE    (as 95)
 1082: R            REDUCE
 1083: \x94         MEMOIZE    (as 96)
 1084: h            BINGET     48
 1086: h            BINGET     96
 1088: }            EMPTY_DICT
 1089: \x94         MEMOIZE    (as 97)
 1090: }            EMPTY_DICT
 1091: \x94         MEMOIZE    (as 98)
 1092: (            MARK
 1093: h                BINGET     43
 1095: h                BINGET     85
 1097: h                BINGET     51
 1099: h                BINGET     91
 1101: h                BINGET     52
 1103: }                EMPTY_DICT
 1104: \x94             MEMOIZE    (as 99)
 1105: h                BINGET     56
 1107: h                BINGET     81
 1109: s                SETITEM
 1110: h                BINGET     57
 1112: N                NONE
 1113: h                BINGET     58
 1115: N                NONE
 1116: h                BINGET     13
 1118: h                BINGET     14
 1120: h                BINGET     59
 1122: N                NONE
 1123: h                BINGET     60
 1125: N                NONE
 1126: h                BINGET     61
 1128: ]                EMPTY_LIST
 1129: \x94             MEMOIZE    (as 100)
 1130: h                BINGET     63
 1132: }                EMPTY_DICT
 1133: \x94             MEMOIZE    (as 101)
 1134: h                BINGET     87
 1136: h                BINGET     2
 1138: \x8c             SHORT_BINUNICODE 'subimport'
 1149: \x94             MEMOIZE    (as 102)
 1150: \x93             STACK_GLOBAL
 1151: \x94             MEMOIZE    (as 103)
 1152: \x8c             SHORT_BINUNICODE 'sys'
 1157: \x94             MEMOIZE    (as 104)
 1158: \x85             TUPLE1
 1159: \x94             MEMOIZE    (as 105)
 1160: R                REDUCE
 1161: \x94             MEMOIZE    (as 106)
 1162: s                SETITEM
 1163: u                SETITEMS   (MARK at 1092)
 1164: \x86         TUPLE2
 1165: \x94         MEMOIZE    (as 107)
 1166: \x86         TUPLE2
 1167: R            REDUCE
 1168: 0            POP
 1169: h            BINGET     59
 1171: N            NONE
 1172: \x8c         SHORT_BINUNICODE '__slotnames__'
 1187: \x94         MEMOIZE    (as 108)
 1188: ]            EMPTY_LIST
 1189: \x94         MEMOIZE    (as 109)
 1190: u            SETITEMS   (MARK at 211)
 1191: }        EMPTY_DICT
 1192: \x94     MEMOIZE    (as 110)
 1193: \x86     TUPLE2
 1194: \x94     MEMOIZE    (as 111)
 1195: \x86     TUPLE2
 1196: R        REDUCE
 1197: 0        POP
 1198: )        EMPTY_TUPLE
 1199: \x81     NEWOBJ
 1200: \x94     MEMOIZE    (as 112)
 1201: }        EMPTY_DICT
 1202: \x94     MEMOIZE    (as 113)
 1203: h        BINGET     31
 1205: h        BINGET     38
 1207: s        SETITEM
 1208: b        BUILD
 1209: \x85     TUPLE1
 1210: \x94     MEMOIZE    (as 114)
 1211: \x8c     SHORT_BINUNICODE 'fn_kwargs'
 1222: \x94     MEMOIZE    (as 115)
 1223: }        EMPTY_DICT
 1224: \x94     MEMOIZE    (as 116)
 1225: u        SETITEMS   (MARK at 13)
 1226: .    STOP
highest protocol among opcodes = 4

which says (instead of importing that class) ... "here is the definition for a completely new class". so you'll see in there things like the actual function definition for init as compiled byte code.

dill behaves pretty much the same way, although it might have diferent heuristics than cloudpickle for deciding when to send an import and when to send the entire definition.

I usually prefer the import route - which is the opposite direction to the way this thread has been going! -

because (in no particular order)

  • I feel like moving bytecode between Python processes is an unholy abomination that causes all kinds of awfulness when moving between versions - Python bytecode is not intended to be portable outside of the environment it was compiled in
  • because I have the Parsl HighThroughputExecutor experience of running many tasks inside a single Python worker process and so amortising the cost of imports across all tasks run by that worker (30s import time per task is very different than 30s import time for a worker thats running tasks for an hour)
  • because sending all this definitions can sometimes get accidentally ridiculously big and cause a different (size based) performance problem

That doesn't mean I think you should go that way too, but I feel like the preferences I have developed are well founded on past experience with Parsl.

@JinZhou5042
Copy link
Member

@benclifford That's really informative, thank you for sharing your valuable insights! The pickletools output aligns perfectly with the actual test cases. I totally agree that Python's bytecode heavily depends on import behaviors, and deserializing bytecode by value in each function can lead to some unexpected performance issues. Your experience further validates that #3902 has taken a relatively correct approach.

@JinZhou5042
Copy link
Member

@gpauloski Here is the script to build the most recent cctools from github and you can find details here:

# install the required packages via conda
conda install -y gcc_linux-64 gxx_linux-64 gdb m4 perl swig make zlib libopenssl-static openssl conda-pack packaging cloudpickle flake8 clang-format threadpoolctl

# git clone cctools
git clone git@github.com:cooperative-computing-lab/cctools.git

# step into the repo
cd cctools

# build packages
./configure --with-base-dir $CONDA_PREFIX --prefix $CONDA_PREFIX
make -j8
make install

# now you should be good to use the most recent cctools
vine_worker --version

I think this would be the fastest way to incorporate the features we just merged. If you feel like getting the newest thing from conda is more comfortable to you, please let us know and we'll release a new version ASAP!

@gpauloski
Copy link
Contributor Author

Thanks, @JinZhou5042! I'm hoping to give it a try later today.

@JinZhou5042
Copy link
Member

@gpauloski Sounds good! Please feel free to email to me (jzhou24@nd.edu) if you have any issues on installing the cctools or enabling the serverless!

@gpauloski
Copy link
Contributor Author

gpauloski commented Aug 7, 2024

I reinstalled taskvine and got serverless working. I'm getting ~120 tasks/s now. Thanks for quick debugging and PRs, @dthain and @JinZhou5042. I appreciate it!

@JinZhou5042
Copy link
Member

@gpauloski You are so welcome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug For modifications that fix a flaw in the code. TaskVine
Projects
Development

Successfully merging a pull request may close this issue.

6 participants