Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline: large jobs needs faster support #207

Closed
ajstewart opened this issue Aug 26, 2020 · 3 comments · Fixed by #211
Closed

Pipeline: large jobs needs faster support #207

ajstewart opened this issue Aug 26, 2020 · 3 comments · Fixed by #211
Labels
enhancement New feature or request

Comments

@ajstewart
Copy link
Collaborator

ajstewart commented Aug 26, 2020

Loading the entire pilot survey pipeline run into the notebook interface took 38m 30s.

This was always going to be the case as I purposely went pandas only just to get it working for now, but with the entire survey it is already becoming a bit slow.

Two epoch analysis takes 45 minutes plus (it hasn't finished yet).

While this is not so terrible in the grand scheme of things for the entire Pilot Survey it is far from ideal when you might want to run the query multiple times with different filters.

The memory usage is minimal however, running the above used about 6 GB of RAM. So we are not at a point yet where these dataframes cannot fit into sensible memory.

Dask is probably the answer here as the two epoch still requires calculations. Although possibly Vaex could also be useful as it's becoming more of a visualisation problem.

Though the two epoch functions can definitely be improved to be faster (on the other hand we may move these calculations to the pipeline stage askap-vast/vast-pipeline#232)

@ajstewart ajstewart added the enhancement New feature or request label Aug 26, 2020
@marxide
Copy link
Collaborator

marxide commented Aug 26, 2020

Try the function below. It accepts 3 DataFrames:

  • sources_df: read from sources.parquet.
  • associations_df: read from associations.parquet.
  • measurements_df: concatenated DataFrame of all measurements.parquet files found in images.parquet.

I don't know if it's the most efficient, but it takes about 5 mins to run on my laptop for a simulated dataset of ~7.3 million measurements (114 fields * 8,000 sources per field * 8 epochs). Also note that I'm taking the easy way out and only looking at sources with no siblings.

from itertools import combinations
import numpy as np
import pandas as pd

def calculate_two_epoch_metrics(sources_df, associations_df, measurements_df) -> pd.DataFrame:
    # create DataFrame of source measurements without siblings
    associations_no_siblings_df = (
        # only keep sources without siblings
        associations_df.set_index("source_id")
        .loc[sources_df.query("n_sibl == 0").index]
        .reset_index()
        # add peak flux and error
        .join(measurements_df[["flux_peak", "flux_peak_err"]], on="meas_id")
    )

    # create a DataFrame of all measurement pairs
    measurement_combinations = associations_no_siblings_df.groupby("source_id")["meas_id"].apply(
        lambda x: pd.DataFrame(list(combinations(x, 2)))
    ).reset_index(level=1, drop=True).rename(columns={0: "meas_id_a", 1: "meas_id_b"}).astype(int).reset_index()

    # add the measurement fluxes and errors
    association_fluxes = associations_no_siblings_df.set_index(["source_id", "meas_id"])[["flux_peak", "flux_peak_err"]]
    measurement_combinations = measurement_combinations.join(
        association_fluxes,
        on=["source_id", "meas_id_a"],
    ).join(
        association_fluxes,
        on=["source_id", "meas_id_b"],
        lsuffix="_a",
        rsuffix="_b",
    )

    # calculate 2-epoch metrics
    measurement_combinations["vs"] = (
        (measurement_combinations.flux_peak_a - measurement_combinations.flux_peak_b) / 
        np.hypot(measurement_combinations.flux_peak_err_a, measurement_combinations.flux_peak_err_b)
    )

    measurement_combinations["m"] = 2 * (
        (measurement_combinations.flux_peak_a - measurement_combinations.flux_peak_b)
        / (measurement_combinations.flux_peak_a + measurement_combinations.flux_peak_b)
    )
    return measurement_combinations

@ajstewart ajstewart mentioned this issue Aug 27, 2020
@ajstewart
Copy link
Collaborator Author

Thanks for this @marxide. I went back and looked at what I did and it was not good 😅. I had followed the same principal as you above but had failed to panda-ise the middle section, no idea why not! This meant it was really slow. I've incorporated your pandas code for the building part and it's much better. An all-images pipeline run takes about 3 minutes to process.

I'll also note here that I've switched the measurement parquet loading to using Dask and this can now load the all images pipeline run in about 1 min 30 seconds.

@ajstewart
Copy link
Collaborator Author

Below is assuming that the two epoch calculations no longer need to be calculated by vast-tools.

To document what I've been experimenting with over the past few days using a full pilot survey run including Epoch 12. Running this through the pipeline produces ~9.6 million measurements including forced fits, although running this on an 'epoch based' association this reduces to about 6.5 million or so (a normal association run with forced extractions will break past 10 million measurements).

In a pure pandas environment we have the machines to handles such a dataframe. The measurements dataframe for this run has an in memory footprint of about 4 - 5 GB. This is ok but starts to become problematic if multiple users wish to open the full pipeline run to play around with, suddenly half the Nimbus memory could be used up with open dataframes.

All other parquets are not very problematic in memory. The sources parquet (~800k rows) is only 200 MB.

in the current rc.3 v2 release the following takes place:

  • Parquets are read in using Dask with a .compute() being run immediately. This is done because Dask makes it much faster to open many files at once (measurements are spread over 1600 parquet files).
  • The measurements are then merged with the associations parquet to get the source id.
  • Currently other image information is also merged to the measurements as this makes loading a 'source' in the vast-tools environment easier - but this will be moved to later in the process in an update.

Firstly this process is good for small runs. It's fast and the memory footprint is not too bad.

But for large runs, while it's still relatively fast taking about 1 min, the downside is the memory used by the measurements and the merges. If the .compute() is delayed with Dask to later (i.e. performing merges and then trying to make selections, then .compute()) the memory improves but the time taken is very slow (10 mins +).

One solution I explored to solve the memory problem is using vaex which is a library to explore dataframes in an out-of-core context. However simply subbing in Vaex to the process above arose the following problems:

  • Vaex cannot currently memory-map parquet files so memory usage is still high and opening is not fast.
  • Vaex performance on opening many files is slow (~4 min).
  • When loaded the merge can still be slow and take some memory if not done in a specific way.

Vaex can memory-map HDF5 and .arrow files. So to get around these issues it was hugely beneficial to create a measurements.arrow file for the large run. This enables the opening of the job to take milliseconds with the measurements loaded into an out-of-core dataframe with no memory footprint. Selections are fast - and as this file is pre-made, no merges are required (the before mentioned image info like paths are appended only when needed when a source is fetched).

This also has the advantage of having the measurements loaded in Vaex which makes doing aggregate calculations/filters on ~10 million rows very fast. The measurements.arrow for the full run was 1.4 GB - not bad at all.

With this it seems to be a huge advantage to have the ability to create a measurements.arrow file for large runs, which can be added into the pipeline. The generation of the .arrow file is not intensive and takes about 2 mins.

vast-tools can then check for the presence of this file in the job directory and open, or falling back to the Dask method if not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants