From 9cdb6ae4bd81212e97fc4fe827428dfc78fc7d83 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Wed, 27 Mar 2024 12:27:21 -0700 Subject: [PATCH] copy staging folder to output folder after job runs (SUCESS or FAILURE) --- jupyter_scheduler/executors.py | 36 ++++++++++++++++---- jupyter_scheduler/job_files_manager.py | 31 ++++++++++++----- jupyter_scheduler/models.py | 1 + jupyter_scheduler/scheduler.py | 46 +++++++++++++++----------- jupyter_scheduler/utils.py | 36 ++++++++++++++++++-- src/components/job-row.tsx | 32 +++++++++++++++++- src/handler.ts | 3 +- src/model.ts | 2 +- 8 files changed, 149 insertions(+), 38 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7a0308c52..bbd559378 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -11,10 +11,10 @@ import nbformat from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor -from jupyter_scheduler.models import DescribeJob, JobFeature, Status +from jupyter_scheduler.models import DescribeJob, JobFeature, JobFile, Status from jupyter_scheduler.orm import Job, create_session from jupyter_scheduler.parameterize import add_parameters -from jupyter_scheduler.utils import get_utc_timestamp +from jupyter_scheduler.utils import copy_directory, get_utc_timestamp class ExecutionManager(ABC): @@ -29,11 +29,19 @@ class ExecutionManager(ABC): _model = None _db_session = None - def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]): + def __init__( + self, + job_id: str, + root_dir: str, + db_url: str, + staging_paths: Dict[str, str], + output_dir: str, + ): self.job_id = job_id self.staging_paths = staging_paths self.root_dir = root_dir self.db_url = db_url + self.output_dir = output_dir @property def model(self): @@ -131,13 +139,13 @@ def execute(self): if job.parameters: nb = add_parameters(nb, job.parameters) - notebook_dir = os.path.dirname(self.staging_paths["input"]) + staging_dir = os.path.dirname(self.staging_paths["input"]) ep = ExecutePreprocessor( - kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=notebook_dir + kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir ) try: - ep.preprocess(nb, {"metadata": {"path": notebook_dir}}) + ep.preprocess(nb, {"metadata": {"path": staging_dir}}) except CellExecutionError as e: raise e finally: @@ -147,6 +155,22 @@ def execute(self): with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f: f.write(output) + self.copy_staged_files_to_output() + + def copy_staged_files_to_output(self): + """Copies snapshot of the original notebook and staged input files from the staging directory to the output directory and includes them into job_files.""" + staging_dir = os.path.dirname(self.staging_paths["input"]) + copied_files = copy_directory( + source_dir=staging_dir, destination_dir=self.output_dir, base_dir=self.root_dir + ) + + if copied_files: + for rel_path in copied_files: + if not any(job_file.file_path == rel_path for job_file in self.model.job_files): + self.model.job_files.append( + JobFile(display_name="File", file_format="File", file_path=rel_path) + ) + def supported_features(cls) -> Dict[JobFeature, bool]: return { JobFeature.job_name: True, diff --git a/jupyter_scheduler/job_files_manager.py b/jupyter_scheduler/job_files_manager.py index 78dbb4375..0bc6c23cc 100644 --- a/jupyter_scheduler/job_files_manager.py +++ b/jupyter_scheduler/job_files_manager.py @@ -21,7 +21,9 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals job = await ensure_async(self.scheduler.get_job(job_id, False)) staging_paths = await ensure_async(self.scheduler.get_staging_paths(job)) output_filenames = self.scheduler.get_job_filenames(job) - output_dir = self.scheduler.get_local_output_path(job) + output_dir = self.scheduler.get_local_output_path( + input_filename=job.input_filename, job_id=job.job_id, root_dir_relative=True + ) p = Process( target=Downloader( @@ -30,6 +32,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals staging_paths=staging_paths, output_dir=output_dir, redownload=redownload, + include_staging_files=job.package_input_folder, ).download ) p.start() @@ -43,22 +46,34 @@ def __init__( staging_paths: Dict[str, str], output_dir: str, redownload: bool, + include_staging_files: bool = False, ): self.output_formats = output_formats self.output_filenames = output_filenames self.staging_paths = staging_paths self.output_dir = output_dir self.redownload = redownload + self.include_staging_files = include_staging_files def generate_filepaths(self): """A generator that produces filepaths""" - output_formats = self.output_formats + ["input"] - - for output_format in output_formats: - input_filepath = self.staging_paths[output_format] - output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format]) - if not os.path.exists(output_filepath) or self.redownload: - yield input_filepath, output_filepath + if self.include_staging_files: + staging_dir = os.path.dirname(self.staging_paths["input"]) + for root, _, files in os.walk(staging_dir): + for file in files: + input_filepath = os.path.join(root, file) + relative_path = os.path.relpath(input_filepath, staging_dir) + output_filepath = os.path.join(self.output_dir, relative_path) + yield input_filepath, output_filepath + else: + output_formats = self.output_formats + ["input"] + for output_format in output_formats: + input_filepath = self.staging_paths[output_format] + output_filepath = os.path.join( + self.output_dir, self.output_filenames[output_format] + ) + if not os.path.exists(output_filepath) or self.redownload: + yield input_filepath, output_filepath def download_tar(self, archive_format: str = "tar"): archive_filepath = self.staging_paths[archive_format] diff --git a/jupyter_scheduler/models.py b/jupyter_scheduler/models.py index 242fa68de..917e341f1 100644 --- a/jupyter_scheduler/models.py +++ b/jupyter_scheduler/models.py @@ -147,6 +147,7 @@ class DescribeJob(BaseModel): status_message: Optional[str] = None downloaded: bool = False package_input_folder: Optional[bool] = None + output_folder: Optional[str] = None class Config: orm_mode = True diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index ba6fd5506..a1b24b1ab 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -39,7 +39,7 @@ UpdateJobDefinition, ) from jupyter_scheduler.orm import Job, JobDefinition, create_session -from jupyter_scheduler.utils import create_output_directory, create_output_filename +from jupyter_scheduler.utils import copy_directory, create_output_directory, create_output_filename class BaseScheduler(LoggingConfigurable): @@ -289,7 +289,10 @@ def add_job_files(self, model: DescribeJob): mapping = self.environments_manager.output_formats_mapping() job_files = [] output_filenames = self.get_job_filenames(model) - output_dir = os.path.relpath(self.get_local_output_path(model), self.root_dir) + output_dir = self.get_local_output_path( + input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True + ) + for output_format in model.output_formats: filename = output_filenames[output_format] output_path = os.path.join(output_dir, filename) @@ -316,13 +319,20 @@ def add_job_files(self, model: DescribeJob): model.job_files = job_files model.downloaded = all(job_file.file_path for job_file in job_files) - def get_local_output_path(self, model: DescribeJob) -> str: + def get_local_output_path( + self, input_filename: str, job_id: str, root_dir_relative: Optional[bool] = False + ) -> str: """Returns the local output directory path where all the job files will be downloaded from the staging location. """ - output_dir_name = create_output_directory(model.input_filename, model.job_id) - return os.path.join(self.root_dir, self.output_directory, output_dir_name) + output_dir_name = create_output_directory(input_filename, job_id) + if root_dir_relative: + return os.path.relpath( + os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir + ) + else: + return os.path.join(self.root_dir, self.output_directory, output_dir_name) class Scheduler(BaseScheduler): @@ -375,20 +385,10 @@ def copy_input_folder(self, input_uri: str, nb_copy_to_path: str): """Copies the input file along with the input directory to the staging directory""" input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri)) staging_dir = os.path.dirname(nb_copy_to_path) - - # Copy the input file - self.copy_input_file(input_uri, nb_copy_to_path) - - # Copy the rest of the input folder excluding the input file - for item in os.listdir(input_dir_path): - source = os.path.join(input_dir_path, item) - destination = os.path.join(staging_dir, item) - if os.path.isdir(source): - shutil.copytree(source, destination) - elif os.path.isfile(source) and item != os.path.basename(input_uri): - with fsspec.open(source) as src_file: - with fsspec.open(destination, "wb") as output_file: - output_file.write(src_file.read()) + copy_directory( + source_dir=input_dir_path, + destination_dir=staging_dir, + ) def create_job(self, model: CreateJob) -> str: if not model.job_definition_id and not self.file_exists(model.input_uri): @@ -439,6 +439,10 @@ def create_job(self, model: CreateJob) -> str: staging_paths=staging_paths, root_dir=self.root_dir, db_url=self.db_url, + output_dir=self.get_local_output_path( + input_filename=model.input_filename, + job_id=job.job_id, + ), ).process ) p.start() @@ -489,6 +493,10 @@ def list_jobs(self, query: ListJobsQuery) -> ListJobsResponse: for job in jobs: model = DescribeJob.from_orm(job) self.add_job_files(model=model) + if model.package_input_folder: + model.output_folder = self.get_local_output_path( + input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True + ) jobs_list.append(model) list_jobs_response = ListJobsResponse( diff --git a/jupyter_scheduler/utils.py b/jupyter_scheduler/utils.py index d0b8bee11..2aeac7922 100644 --- a/jupyter_scheduler/utils.py +++ b/jupyter_scheduler/utils.py @@ -1,10 +1,11 @@ import json import os -import pathlib from datetime import datetime, timezone -from typing import Optional +import shutil +from typing import List, Optional from uuid import UUID +import fsspec import pytz from croniter import croniter from nbformat import NotebookNode @@ -84,3 +85,34 @@ def get_localized_timestamp(timezone) -> int: tz = pytz.timezone(timezone) local_date = datetime.now(tz=tz) return int(local_date.timestamp() * 1000) + + +def copy_directory( + source_dir: str, + destination_dir: str, + base_dir: Optional[str] = None, # This is the new argument + exclude_files: Optional[List[str]] = [], +) -> List[str]: + """Copies content of source_dir to destination_dir excluding exclude_files. + Returns a list of relative paths to copied files, relative to base_dir if provided. + """ + copied_files = [] + for item in os.listdir(source_dir): + source = os.path.join(source_dir, item) + destination = os.path.join(destination_dir, item) + if os.path.isdir(source): + shutil.copytree(source, destination, ignore=shutil.ignore_patterns(*exclude_files)) + for dirpath, _, filenames in os.walk(destination): + for filename in filenames: + rel_path = os.path.relpath( + os.path.join(dirpath, filename), base_dir if base_dir else destination_dir + ) + copied_files.append(rel_path) + elif os.path.isfile(source) and item not in exclude_files: + with fsspec.open(source, "rb") as source_file: + with fsspec.open(destination, "wb") as output_file: + output_file.write(source_file.read()) + rel_path = os.path.relpath(destination, base_dir if base_dir else destination_dir) + copied_files.append(rel_path) + + return copied_files diff --git a/src/components/job-row.tsx b/src/components/job-row.tsx index 97bbfff69..b55fceb00 100644 --- a/src/components/job-row.tsx +++ b/src/components/job-row.tsx @@ -75,6 +75,33 @@ function JobFiles(props: { ); } +function FilesDirectoryLink(props: { + job: Scheduler.IDescribeJob; + app: JupyterFrontEnd; +}): JSX.Element | null { + if (!props.job.package_input_folder || !props.job.output_folder) { + return null; + } + const trans = useTranslator('jupyterlab'); + return ( + { + e.preventDefault(); + props.app.commands.execute('filebrowser:open-path', { + path: props.job.output_folder + }); + }} + > + {trans.__('Files')} + + ); +} + type DownloadFilesButtonProps = { app: JupyterFrontEnd; job: Scheduler.IDescribeJob; @@ -90,7 +117,7 @@ function DownloadFilesButton(props: DownloadFilesButtonProps) { return ( { setDownloading(true); @@ -167,6 +194,9 @@ export function buildJobRow( /> )} + {(job.status === 'COMPLETED' || job.status === 'FAILED') && ( + + )} , , translateStatus(job.status), diff --git a/src/handler.ts b/src/handler.ts index f1f2d59e3..0084c0417 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -466,7 +466,8 @@ export namespace Scheduler { start_time?: number; end_time?: number; downloaded: boolean; - packageInputFolder?: boolean; + package_input_folder?: boolean; + output_folder?: string; } export interface ICreateJobResponse { diff --git a/src/model.ts b/src/model.ts index 7dcf01196..cd4f7740a 100644 --- a/src/model.ts +++ b/src/model.ts @@ -388,7 +388,7 @@ export function convertDescribeJobtoJobDetail( startTime: describeJob.start_time, endTime: describeJob.end_time, downloaded: describeJob.downloaded, - packageInputFolder: describeJob.packageInputFolder + packageInputFolder: describeJob.package_input_folder }; }