-
Notifications
You must be signed in to change notification settings - Fork 128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: backport dataset_tools to coffea 0.7.x #1036
feat: backport dataset_tools to coffea 0.7.x #1036
Conversation
I played around with querying from this branch and it seems to be working fine. diff --git a/src/coffea/dataset_tools/dataset_query.py b/coffea/dataset_tools/dataset_query.py
index 905daa12..0f9f90a8 100644
--- a/src/coffea/dataset_tools/dataset_query.py
+++ b/coffea/dataset_tools/dataset_query.py
@@ -7,7 +7,6 @@ from collections import defaultdict
from typing import List
import yaml
-from dask.distributed import Client
from rich import print
from rich.console import Console
from rich.prompt import Confirm, IntPrompt, Prompt
@@ -15,7 +14,6 @@ from rich.table import Table
from rich.tree import Tree
from . import rucio_utils
-from .preprocess import preprocess
def print_dataset_query(query, dataset_list, console, selected=[]):
@@ -101,8 +99,6 @@ class DataDiscoveryCLI:
self.sites_regex = None
self.last_replicas_results = None
self.final_output = None
- self.preprocessed_total = None
- self.preprocessed_available = None
self.replica_results = defaultdict(list)
self.replica_results_metadata = {}
@@ -118,7 +114,6 @@ class DataDiscoveryCLI:
"replicas",
"list-replicas",
"save",
- "preprocess",
"allow-sites",
"block-sites",
"regex-sites",
@@ -145,7 +140,6 @@ Some basic commands:
- [bold cyan]block-sites[/]: Exclude grid sites from the available sites for replicas query
- [bold cyan]regex-sites[/]: Select sites with a regex for replica queries: e.g. "T[123]_(FR|IT|BE|CH|DE)_\w+"
- [bold cyan]save[/]: Save the replicas query results to file (json or yaml) for further processing
- - [bold cyan]preprocess[/]: Preprocess the replicas with dask and save the fileset for further processing with uproot/coffea
- [bold cyan]help[/]: Print this help message
"""
)
@@ -168,8 +162,6 @@ Some basic commands:
self.do_list_replicas()
elif command == "save":
self.do_save()
- elif command == "preprocess":
- self.do_preprocess()
elif command == "allow-sites":
self.do_allowlist_sites()
elif command == "block-sites":
@@ -520,46 +512,6 @@ Some basic commands:
json.dump(self.final_output, file, indent=2)
print(f"[green]File {filename} saved!")
- def do_preprocess(
- self,
- output_file=None,
- step_size=None,
- align_to_clusters=None,
- scheduler_url=None,
- ):
- """Perform preprocessing for concrete fileset extraction.
- Args: output_file [step_size] [align to file cluster boundaries] [dask scheduler url]
- """
- if not output_file:
- output_file = Prompt.ask(
- "[yellow bold]Output name", default="output_preprocessing"
- )
- if step_size is None:
- step_size = IntPrompt.ask("[yellow bold]Step size", default=None)
- if align_to_clusters is None:
- align_to_clusters = Confirm.ask(
- "[yellow bold]Align to clusters", default=True
- )
-
- # init a local Dask cluster
- with self.console.status(
- "[red] Preprocessing files to extract available chunks with dask[/]"
- ):
- with Client(scheduler_url) as _:
- self.preprocessed_available, self.preprocessed_total = preprocess(
- self.final_output,
- step_size=step_size,
- align_clusters=align_to_clusters,
- skip_bad_files=True,
- )
- with gzip.open(f"{output_file}_available.json.gz", "wt") as file:
- print(f"Saved available fileset chunks to {output_file}_available.json.gz")
- json.dump(self.preprocessed_total, file, indent=2)
- with gzip.open(f"{output_file}_all.json.gz", "wt") as file:
- print(f"Saved all fileset chunks to {output_file}_all.json.gz")
- json.dump(self.preprocessed_available, file, indent=2)
- return self.preprocessed_total, self.preprocessed_available
-
def load_dataset_definition(
self,
dataset_definition,
@@ -631,27 +583,6 @@ if __name__ == "__main__":
required=False,
default="output_dataset.json",
)
- parser.add_argument(
- "-fo",
- "--fileset-output",
- help="Output name for fileset",
- type=str,
- required=False,
- default="output_fileset",
- )
- parser.add_argument(
- "-p",
- "--preprocess",
- help="Preprocess with dask",
- action="store_true",
- default=False,
- )
- parser.add_argument(
- "--step-size", help="Step size for preprocessing", type=int, default=500000
- )
- parser.add_argument(
- "--scheduler-url", help="Dask scheduler url", type=str, default=None
- )
parser.add_argument(
"-as",
"--allow-sites",
@@ -706,13 +637,6 @@ if __name__ == "__main__":
# Save
if args.output:
cli.do_save(filename=args.output)
- if args.preprocess:
- cli.do_preprocess(
- output_file=args.fileset_output,
- step_size=args.step_size,
- scheduler_url=args.scheduler_url,
- align_to_clusters=False,
- )
if args.cli:
cli.start_cli() |
I also set the version to 0.7.23 (let me know if you don't like that), and added rucio optional dependencies to |
I'll take a look at it later this week. Thanks! |
The dataset tools now output filesets where "files" are of the form |
Thanks @iasonkrom I think your changes are fine. I think we can remove |
Reading cc @lgray |
The only issue being that the |
Done, should we also add the binder tutorial in 0.7 and just remove the preprocessing shell? Other than that this should be ready. |
Yeah add another notebook for docs if you have time, that would be great! |
Done. Should be ready to review! |
This PR aims to backport @valsdav's dataset tools to coffea 0.7.x.
The preprocessing part has to be removed since that is tied to coffea 202x.