Skip to content

Commit

Permalink
Merge pull request #262 from SciCatProject/all-files-in-source-folder
Browse files Browse the repository at this point in the history
Prevent uploading outside source folder
  • Loading branch information
jl-wynen authored Jan 9, 2025
2 parents ca36437 + 298bc56 commit 490482e
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 4 deletions.
32 changes: 28 additions & 4 deletions src/scitacean/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def upload_new_dataset_now(self, dataset: Dataset) -> Dataset:
Note the error message if that happens.
"""
dataset = dataset.replace(source_folder=self._source_folder_for(dataset))
files_to_upload = _files_to_upload(dataset.files)
files_to_upload = _files_to_upload(dataset, self.file_transfer)
self.scicat.validate_dataset_model(dataset.make_upload_model())
with self._connect_for_file_upload(dataset, files_to_upload) as con:
# TODO check if any remote file is out of date.
Expand Down Expand Up @@ -1327,15 +1327,39 @@ def is_up_to_date(file: File) -> bool:
]


def _files_to_upload(files: Iterable[File]) -> list[File]:
for file in files:
def _files_to_upload(
dataset: Dataset,
file_transfer: FileTransfer | None,
) -> list[File]:
for file in dataset.files:
if file.is_on_local and file.is_on_remote:
raise ValueError(
f"Refusing to upload file at remote_path={file.remote_path} "
"because it is both on local and remote and it is unclear what "
"to do. If you want to perform the upload, set the local path to None."
)
return [file for file in files if file.local_path is not None]

to_upload = [file for file in dataset.files if file.is_on_local]
if not to_upload:
return []

if file_transfer is not None:
source_folder = file_transfer.source_folder_for(dataset)
outside = [
(file.remote_path, file.local_path)
for file in to_upload
if not (source_folder / file.remote_path)
.resolve()
.is_relative_to(source_folder)
]
if outside:
raise ValueError(
"Refusing to upload files that would be placed outside of the "
f"source folder '{source_folder.posix}': {[str(l) for _, l in outside]}"
f" with remote paths {[r.posix for r, _ in outside]}."
)

return to_upload


class _NullUploadConnection:
Expand Down
30 changes: 30 additions & 0 deletions src/scitacean/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,36 @@ def parent(self) -> RemotePath:
return RemotePath(base)
return RemotePath(parts[0] or base)

def is_absolute(self) -> bool:
"""Return True if the path is absolute."""
return self._path.startswith("/")

def resolve(self) -> RemotePath:
"""Resolve any parent segments in a path.
This does *not* resolve symlinks or the current working directory like
:meth:`pathlib.Path.resolve`.
Returns
-------
:
A new remote path with resolved parent segments.
"""
parts: list[str] = []
for part in self._path.split("/"):
if part == ".." and parts:
parts.pop()
else:
parts.append(part)
res = RemotePath(*parts)
if self.is_absolute():
res._path = "/" + res._path
return res

def is_relative_to(self, other: RemotePath) -> bool:
"""Check whether this path is relative to another."""
return self.posix.startswith(other.posix)

def truncated(self, max_length: int = 255) -> RemotePath:
"""Return a new remote path with all path segments truncated.
Expand Down
23 changes: 23 additions & 0 deletions tests/filesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,29 @@ def test_remote_path_parent() -> None:
assert RemotePath("relative/sub").parent == RemotePath("relative")


def test_remote_path_resolve() -> None:
assert RemotePath("").resolve() == RemotePath("")
assert RemotePath("/").resolve() == RemotePath("/")
assert RemotePath("/a").resolve() == RemotePath("/a")
assert RemotePath("file").resolve() == RemotePath("file")
assert RemotePath("../up").resolve() == RemotePath("../up")
assert RemotePath("base/../up").resolve() == RemotePath("up")
assert RemotePath("/base/../up").resolve() == RemotePath("/up")
assert RemotePath("/base/../mid/../up").resolve() == RemotePath("/up")
assert RemotePath("/base/mid/../up").resolve() == RemotePath("/base/up")
assert RemotePath("base/mid/../up").resolve() == RemotePath("base/up")
assert RemotePath("base/mid/inner/../../file").resolve() == RemotePath("base/file")


def test_remote_path_is_relative_to() -> None:
path = RemotePath("/source/nested/file.txt")
assert path.is_relative_to(RemotePath("/"))
assert path.is_relative_to(RemotePath("/source"))
assert path.is_relative_to(RemotePath("/source/nested"))
assert not path.is_relative_to(RemotePath("/other-top"))
assert not path.is_relative_to(RemotePath("/source/other-nested"))


def test_remote_path_truncated() -> None:
assert RemotePath("something-long.txt").truncated(10) == "someth.txt"
assert RemotePath("longlonglong/short").truncated(5) == "longl/short"
Expand Down
15 changes: 15 additions & 0 deletions tests/upload_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ def test_upload_uploads_files_to_source_folder(
)


@pytest.mark.parametrize("remote_prefix", ["../relative/", "/absolute/"])
def test_upload_rejects_files_outside_of_source_folder(
client: FakeClient,
dataset_with_files: Dataset,
fs: FakeFilesystem,
remote_prefix: str,
) -> None:
make_file(fs, path="bad", contents=b"This wants to be outside the source folder")
dataset_with_files.add_files(
File.from_local("bad", remote_path=remote_prefix + "bad")
)
with pytest.raises(ValueError, match="outside of the source folder"):
client.upload_new_dataset_now(dataset_with_files)


def test_upload_does_not_create_dataset_if_file_upload_fails(
dataset_with_files: Dataset, fs: FakeFilesystem
) -> None:
Expand Down

0 comments on commit 490482e

Please sign in to comment.