Skip to content

Commit

Permalink
copy staging folder to output folder after job runs (SUCESS or FAILURE)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Mar 27, 2024
1 parent f55e355 commit 9cdb6ae
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 38 deletions.
36 changes: 30 additions & 6 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import nbformat
from nbconvert.preprocessors import CellExecutionError, ExecutePreprocessor

from jupyter_scheduler.models import DescribeJob, JobFeature, Status
from jupyter_scheduler.models import DescribeJob, JobFeature, JobFile, Status
from jupyter_scheduler.orm import Job, create_session
from jupyter_scheduler.parameterize import add_parameters
from jupyter_scheduler.utils import get_utc_timestamp
from jupyter_scheduler.utils import copy_directory, get_utc_timestamp


class ExecutionManager(ABC):
Expand All @@ -29,11 +29,19 @@ class ExecutionManager(ABC):
_model = None
_db_session = None

def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]):
def __init__(
self,
job_id: str,
root_dir: str,
db_url: str,
staging_paths: Dict[str, str],
output_dir: str,
):
self.job_id = job_id
self.staging_paths = staging_paths
self.root_dir = root_dir
self.db_url = db_url
self.output_dir = output_dir

@property
def model(self):
Expand Down Expand Up @@ -131,13 +139,13 @@ def execute(self):
if job.parameters:
nb = add_parameters(nb, job.parameters)

notebook_dir = os.path.dirname(self.staging_paths["input"])
staging_dir = os.path.dirname(self.staging_paths["input"])
ep = ExecutePreprocessor(
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=notebook_dir
kernel_name=nb.metadata.kernelspec["name"], store_widget_state=True, cwd=staging_dir
)

try:
ep.preprocess(nb, {"metadata": {"path": notebook_dir}})
ep.preprocess(nb, {"metadata": {"path": staging_dir}})
except CellExecutionError as e:
raise e
finally:
Expand All @@ -147,6 +155,22 @@ def execute(self):
with fsspec.open(self.staging_paths[output_format], "w", encoding="utf-8") as f:
f.write(output)

self.copy_staged_files_to_output()

def copy_staged_files_to_output(self):
"""Copies snapshot of the original notebook and staged input files from the staging directory to the output directory and includes them into job_files."""
staging_dir = os.path.dirname(self.staging_paths["input"])
copied_files = copy_directory(
source_dir=staging_dir, destination_dir=self.output_dir, base_dir=self.root_dir
)

if copied_files:
for rel_path in copied_files:
if not any(job_file.file_path == rel_path for job_file in self.model.job_files):
self.model.job_files.append(
JobFile(display_name="File", file_format="File", file_path=rel_path)
)

def supported_features(cls) -> Dict[JobFeature, bool]:
return {
JobFeature.job_name: True,
Expand Down
31 changes: 23 additions & 8 deletions jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
job = await ensure_async(self.scheduler.get_job(job_id, False))
staging_paths = await ensure_async(self.scheduler.get_staging_paths(job))
output_filenames = self.scheduler.get_job_filenames(job)
output_dir = self.scheduler.get_local_output_path(job)
output_dir = self.scheduler.get_local_output_path(
input_filename=job.input_filename, job_id=job.job_id, root_dir_relative=True
)

p = Process(
target=Downloader(
Expand All @@ -30,6 +32,7 @@ async def copy_from_staging(self, job_id: str, redownload: Optional[bool] = Fals
staging_paths=staging_paths,
output_dir=output_dir,
redownload=redownload,
include_staging_files=job.package_input_folder,
).download
)
p.start()
Expand All @@ -43,22 +46,34 @@ def __init__(
staging_paths: Dict[str, str],
output_dir: str,
redownload: bool,
include_staging_files: bool = False,
):
self.output_formats = output_formats
self.output_filenames = output_filenames
self.staging_paths = staging_paths
self.output_dir = output_dir
self.redownload = redownload
self.include_staging_files = include_staging_files

def generate_filepaths(self):
"""A generator that produces filepaths"""
output_formats = self.output_formats + ["input"]

for output_format in output_formats:
input_filepath = self.staging_paths[output_format]
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath
if self.include_staging_files:
staging_dir = os.path.dirname(self.staging_paths["input"])
for root, _, files in os.walk(staging_dir):
for file in files:
input_filepath = os.path.join(root, file)
relative_path = os.path.relpath(input_filepath, staging_dir)
output_filepath = os.path.join(self.output_dir, relative_path)
yield input_filepath, output_filepath
else:
output_formats = self.output_formats + ["input"]
for output_format in output_formats:
input_filepath = self.staging_paths[output_format]
output_filepath = os.path.join(
self.output_dir, self.output_filenames[output_format]
)
if not os.path.exists(output_filepath) or self.redownload:
yield input_filepath, output_filepath

def download_tar(self, archive_format: str = "tar"):
archive_filepath = self.staging_paths[archive_format]
Expand Down
1 change: 1 addition & 0 deletions jupyter_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class DescribeJob(BaseModel):
status_message: Optional[str] = None
downloaded: bool = False
package_input_folder: Optional[bool] = None
output_folder: Optional[str] = None

class Config:
orm_mode = True
Expand Down
46 changes: 27 additions & 19 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
UpdateJobDefinition,
)
from jupyter_scheduler.orm import Job, JobDefinition, create_session
from jupyter_scheduler.utils import create_output_directory, create_output_filename
from jupyter_scheduler.utils import copy_directory, create_output_directory, create_output_filename


class BaseScheduler(LoggingConfigurable):
Expand Down Expand Up @@ -289,7 +289,10 @@ def add_job_files(self, model: DescribeJob):
mapping = self.environments_manager.output_formats_mapping()
job_files = []
output_filenames = self.get_job_filenames(model)
output_dir = os.path.relpath(self.get_local_output_path(model), self.root_dir)
output_dir = self.get_local_output_path(
input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True
)

for output_format in model.output_formats:
filename = output_filenames[output_format]
output_path = os.path.join(output_dir, filename)
Expand All @@ -316,13 +319,20 @@ def add_job_files(self, model: DescribeJob):
model.job_files = job_files
model.downloaded = all(job_file.file_path for job_file in job_files)

def get_local_output_path(self, model: DescribeJob) -> str:
def get_local_output_path(
self, input_filename: str, job_id: str, root_dir_relative: Optional[bool] = False
) -> str:
"""Returns the local output directory path
where all the job files will be downloaded
from the staging location.
"""
output_dir_name = create_output_directory(model.input_filename, model.job_id)
return os.path.join(self.root_dir, self.output_directory, output_dir_name)
output_dir_name = create_output_directory(input_filename, job_id)
if root_dir_relative:
return os.path.relpath(
os.path.join(self.root_dir, self.output_directory, output_dir_name), self.root_dir
)
else:
return os.path.join(self.root_dir, self.output_directory, output_dir_name)


class Scheduler(BaseScheduler):
Expand Down Expand Up @@ -375,20 +385,10 @@ def copy_input_folder(self, input_uri: str, nb_copy_to_path: str):
"""Copies the input file along with the input directory to the staging directory"""
input_dir_path = os.path.dirname(os.path.join(self.root_dir, input_uri))
staging_dir = os.path.dirname(nb_copy_to_path)

# Copy the input file
self.copy_input_file(input_uri, nb_copy_to_path)

# Copy the rest of the input folder excluding the input file
for item in os.listdir(input_dir_path):
source = os.path.join(input_dir_path, item)
destination = os.path.join(staging_dir, item)
if os.path.isdir(source):
shutil.copytree(source, destination)
elif os.path.isfile(source) and item != os.path.basename(input_uri):
with fsspec.open(source) as src_file:
with fsspec.open(destination, "wb") as output_file:
output_file.write(src_file.read())
copy_directory(
source_dir=input_dir_path,
destination_dir=staging_dir,
)

def create_job(self, model: CreateJob) -> str:
if not model.job_definition_id and not self.file_exists(model.input_uri):
Expand Down Expand Up @@ -439,6 +439,10 @@ def create_job(self, model: CreateJob) -> str:
staging_paths=staging_paths,
root_dir=self.root_dir,
db_url=self.db_url,
output_dir=self.get_local_output_path(
input_filename=model.input_filename,
job_id=job.job_id,
),
).process
)
p.start()
Expand Down Expand Up @@ -489,6 +493,10 @@ def list_jobs(self, query: ListJobsQuery) -> ListJobsResponse:
for job in jobs:
model = DescribeJob.from_orm(job)
self.add_job_files(model=model)
if model.package_input_folder:
model.output_folder = self.get_local_output_path(
input_filename=model.input_filename, job_id=model.job_id, root_dir_relative=True
)
jobs_list.append(model)

list_jobs_response = ListJobsResponse(
Expand Down
36 changes: 34 additions & 2 deletions jupyter_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import os
import pathlib
from datetime import datetime, timezone
from typing import Optional
import shutil
from typing import List, Optional
from uuid import UUID

import fsspec
import pytz
from croniter import croniter
from nbformat import NotebookNode
Expand Down Expand Up @@ -84,3 +85,34 @@ def get_localized_timestamp(timezone) -> int:
tz = pytz.timezone(timezone)
local_date = datetime.now(tz=tz)
return int(local_date.timestamp() * 1000)


def copy_directory(
source_dir: str,
destination_dir: str,
base_dir: Optional[str] = None, # This is the new argument
exclude_files: Optional[List[str]] = [],
) -> List[str]:
"""Copies content of source_dir to destination_dir excluding exclude_files.
Returns a list of relative paths to copied files, relative to base_dir if provided.
"""
copied_files = []
for item in os.listdir(source_dir):
source = os.path.join(source_dir, item)
destination = os.path.join(destination_dir, item)
if os.path.isdir(source):
shutil.copytree(source, destination, ignore=shutil.ignore_patterns(*exclude_files))
for dirpath, _, filenames in os.walk(destination):
for filename in filenames:
rel_path = os.path.relpath(
os.path.join(dirpath, filename), base_dir if base_dir else destination_dir
)
copied_files.append(rel_path)
elif os.path.isfile(source) and item not in exclude_files:
with fsspec.open(source, "rb") as source_file:
with fsspec.open(destination, "wb") as output_file:
output_file.write(source_file.read())
rel_path = os.path.relpath(destination, base_dir if base_dir else destination_dir)
copied_files.append(rel_path)

return copied_files
32 changes: 31 additions & 1 deletion src/components/job-row.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ function JobFiles(props: {
);
}

function FilesDirectoryLink(props: {
job: Scheduler.IDescribeJob;
app: JupyterFrontEnd;
}): JSX.Element | null {
if (!props.job.package_input_folder || !props.job.output_folder) {
return null;
}
const trans = useTranslator('jupyterlab');
return (
<Link
href={`/lab/tree/${props.job.output_folder}`}
title={trans.__(
'Open output directory with files for "%1"',
props.job.name
)}
onClick={e => {
e.preventDefault();
props.app.commands.execute('filebrowser:open-path', {
path: props.job.output_folder
});
}}
>
{trans.__('Files')}
</Link>
);
}

type DownloadFilesButtonProps = {
app: JupyterFrontEnd;
job: Scheduler.IDescribeJob;
Expand All @@ -90,7 +117,7 @@ function DownloadFilesButton(props: DownloadFilesButtonProps) {
return (
<IconButton
aria-label="download"
title={trans.__('Download job files for "%1"', props.job.name)}
title={trans.__('Download output files for "%1"', props.job.name)}
disabled={downloading}
onClick={async () => {
setDownloading(true);
Expand Down Expand Up @@ -167,6 +194,9 @@ export function buildJobRow(
/>
)}
<JobFiles job={job} app={app} />
{(job.status === 'COMPLETED' || job.status === 'FAILED') && (
<FilesDirectoryLink job={job} app={app} />
)}
</>,
<Timestamp job={job} />,
translateStatus(job.status),
Expand Down
3 changes: 2 additions & 1 deletion src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ export namespace Scheduler {
start_time?: number;
end_time?: number;
downloaded: boolean;
packageInputFolder?: boolean;
package_input_folder?: boolean;
output_folder?: string;
}

export interface ICreateJobResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ export function convertDescribeJobtoJobDetail(
startTime: describeJob.start_time,
endTime: describeJob.end_time,
downloaded: describeJob.downloaded,
packageInputFolder: describeJob.packageInputFolder
packageInputFolder: describeJob.package_input_folder
};
}

Expand Down

0 comments on commit 9cdb6ae

Please sign in to comment.