Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix media repository failing when media store path contains symlinks #11446

Merged
merged 6 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11446.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in 1.47.1 where the media repository would fail to work if the media store path contained any symbolic links.
115 changes: 71 additions & 44 deletions synapse/rest/media/v1/filepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,47 +43,75 @@ def _wrapped(self: "MediaFilePaths", *args: Any, **kwargs: Any) -> str:
)


def _wrap_with_jail_check(func: GetPathMethod) -> GetPathMethod:
def _wrap_with_jail_check(relative: bool) -> Callable[[GetPathMethod], GetPathMethod]:
"""Wraps a path-returning method to check that the returned path(s) do not escape
the media store directory.

The path-returning method may return either a single path, or a list of paths.

The check is not expected to ever fail, unless `func` is missing a call to
`_validate_path_component`, or `_validate_path_component` is buggy.

Args:
func: The `MediaFilePaths` method to wrap. The method may return either a single
path, or a list of paths. Returned paths may be either absolute or relative.
relative: A boolean indicating whether the wrapped method returns paths relative
to the media store directory.

Returns:
The method, wrapped with a check to ensure that the returned path(s) lie within
the media store directory. Raises a `ValueError` if the check fails.
A method which will wrap a path-returning method, adding a check to ensure that
the returned path(s) lie within the media store directory. The check will raise
a `ValueError` if it fails.
"""

@functools.wraps(func)
def _wrapped(
self: "MediaFilePaths", *args: Any, **kwargs: Any
) -> Union[str, List[str]]:
path_or_paths = func(self, *args, **kwargs)

if isinstance(path_or_paths, list):
paths_to_check = path_or_paths
else:
paths_to_check = [path_or_paths]

for path in paths_to_check:
# path may be an absolute or relative path, depending on the method being
# wrapped. When "appending" an absolute path, `os.path.join` discards the
# previous path, which is desired here.
normalized_path = os.path.normpath(os.path.join(self.real_base_path, path))
if (
os.path.commonpath([normalized_path, self.real_base_path])
!= self.real_base_path
):
Comment on lines -77 to -81
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug was that path was an absolute, un-canonicalized path, so normalized_path was un-canonicalized while self.real_base_path was.

raise ValueError(f"Invalid media store path: {path!r}")

return path_or_paths

return cast(GetPathMethod, _wrapped)
def _wrap_with_jail_check_inner(func: GetPathMethod) -> GetPathMethod:
@functools.wraps(func)
def _wrapped(
self: "MediaFilePaths", *args: Any, **kwargs: Any
) -> Union[str, List[str]]:
path_or_paths = func(self, *args, **kwargs)

if isinstance(path_or_paths, list):
paths_to_check = path_or_paths
else:
paths_to_check = [path_or_paths]

for path in paths_to_check:
# Construct the path that will ultimately be used.
# We cannot guess whether `path` is relative to the media store
# directory, since the media store directory may itself be a relative
# path.
if relative:
path = os.path.join(self.base_path, path)
normalized_path = os.path.normpath(path)

# Now that `normpath` has eliminated `../`s and `./`s from the path,
# `os.path.commonpath` can be used to check whether it lies within the
# media store directory.
if (
os.path.commonpath([normalized_path, self.normalized_base_path])
!= self.normalized_base_path
):
# The path resolves to outside the media store directory,
# or `self.base_path` is `.`, which is an unlikely configuration.
raise ValueError(f"Invalid media store path: {path!r}")

# Note that `os.path.normpath`/`abspath` has a subtle caveat:
# `a/b/c/../c` will normalize to `a/b/c`, but the former refers to a
# different path if `a/b/c` is a symlink. That is, the check above is
# not perfect and may allow a certain restricted subset of untrustworthy
# paths through. Since the check above is secondary to the main
# `_validate_path_component` checks, it's less important for it to be
# perfect.
#
# As an alternative, `os.path.realpath` will resolve symlinks, but
# proves problematic if there are symlinks inside the media store.
# eg. if `url_store/` is symlinked to elsewhere, its canonical path
# won't match that of the main media store directory.

return path_or_paths

return cast(GetPathMethod, _wrapped)

return _wrap_with_jail_check_inner


ALLOWED_CHARACTERS = set(
Expand Down Expand Up @@ -127,9 +155,7 @@ class MediaFilePaths:

def __init__(self, primary_base_path: str):
self.base_path = primary_base_path

# The media store directory, with all symlinks resolved.
self.real_base_path = os.path.realpath(primary_base_path)
self.normalized_base_path = os.path.normpath(self.base_path)

# Refuse to initialize if paths cannot be validated correctly for the current
# platform.
Expand All @@ -140,7 +166,7 @@ def __init__(self, primary_base_path: str):
# for certain homeservers there, since ":"s aren't allowed in paths.
assert os.name == "posix"

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def local_media_filepath_rel(self, media_id: str) -> str:
return os.path.join(
"local_content",
Expand All @@ -151,7 +177,7 @@ def local_media_filepath_rel(self, media_id: str) -> str:

local_media_filepath = _wrap_in_base_path(local_media_filepath_rel)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def local_media_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
Expand All @@ -167,7 +193,7 @@ def local_media_thumbnail_rel(

local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=False)
def local_media_thumbnail_dir(self, media_id: str) -> str:
"""
Retrieve the local store path of thumbnails of a given media_id
Expand All @@ -185,7 +211,7 @@ def local_media_thumbnail_dir(self, media_id: str) -> str:
_validate_path_component(media_id[4:]),
)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:
return os.path.join(
"remote_content",
Expand All @@ -197,7 +223,7 @@ def remote_media_filepath_rel(self, server_name: str, file_id: str) -> str:

remote_media_filepath = _wrap_in_base_path(remote_media_filepath_rel)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel(
self,
server_name: str,
Expand All @@ -223,7 +249,7 @@ def remote_media_thumbnail_rel(
# Legacy path that was used to store thumbnails previously.
# Should be removed after some time, when most of the thumbnails are stored
# using the new path.
@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def remote_media_thumbnail_rel_legacy(
self, server_name: str, file_id: str, width: int, height: int, content_type: str
) -> str:
Expand All @@ -238,6 +264,7 @@ def remote_media_thumbnail_rel_legacy(
_validate_path_component(file_name),
)

@_wrap_with_jail_check(relative=False)
def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
return os.path.join(
self.base_path,
Expand All @@ -248,7 +275,7 @@ def remote_media_thumbnail_dir(self, server_name: str, file_id: str) -> str:
_validate_path_component(file_id[4:]),
)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def url_cache_filepath_rel(self, media_id: str) -> str:
if NEW_FORMAT_ID_RE.match(media_id):
# Media id is of the form <DATE><RANDOM_STRING>
Expand All @@ -268,7 +295,7 @@ def url_cache_filepath_rel(self, media_id: str) -> str:

url_cache_filepath = _wrap_in_base_path(url_cache_filepath_rel)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=False)
def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id file"
if NEW_FORMAT_ID_RE.match(media_id):
Expand All @@ -290,7 +317,7 @@ def url_cache_filepath_dirs_to_delete(self, media_id: str) -> List[str]:
),
]

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_rel(
self, media_id: str, width: int, height: int, content_type: str, method: str
) -> str:
Expand Down Expand Up @@ -318,7 +345,7 @@ def url_cache_thumbnail_rel(

url_cache_thumbnail = _wrap_in_base_path(url_cache_thumbnail_rel)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=True)
def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
# Media id is of the form <DATE><RANDOM_STRING>
# E.g.: 2017-09-28-fsdRDt24DS234dsf
Expand All @@ -341,7 +368,7 @@ def url_cache_thumbnail_directory_rel(self, media_id: str) -> str:
url_cache_thumbnail_directory_rel
)

@_wrap_with_jail_check
@_wrap_with_jail_check(relative=False)
def url_cache_thumbnail_dirs_to_delete(self, media_id: str) -> List[str]:
"The dirs to try and remove if we delete the media_id thumbnails"
# Media id is of the form <DATE><RANDOM_STRING>
Expand Down
109 changes: 108 additions & 1 deletion tests/rest/media/v1/test_filepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import os
from typing import Iterable

from synapse.rest.media.v1.filepath import MediaFilePaths
from synapse.rest.media.v1.filepath import MediaFilePaths, _wrap_with_jail_check

from tests import unittest

Expand Down Expand Up @@ -486,3 +487,109 @@ def _test_path_validation(
f"{value!r} unexpectedly passed validation: "
f"{method} returned {path_or_list!r}"
)


class MediaFilePathsJailTestCase(unittest.TestCase):
def _check_relative_path(self, filepaths: MediaFilePaths, path: str) -> None:
"""Passes a relative path through the jail check.

Args:
filepaths: The `MediaFilePaths` instance.
path: A path relative to the media store directory.

Raises:
ValueError: If the jail check fails.
"""

@_wrap_with_jail_check(relative=True)
def _make_relative_path(self: MediaFilePaths, path: str) -> str:
return path

_make_relative_path(filepaths, path)

def _check_absolute_path(self, filepaths: MediaFilePaths, path: str) -> None:
"""Passes an absolute path through the jail check.

Args:
filepaths: The `MediaFilePaths` instance.
path: A path relative to the media store directory.

Raises:
ValueError: If the jail check fails.
"""

@_wrap_with_jail_check(relative=False)
def _make_absolute_path(self: MediaFilePaths, path: str) -> str:
return os.path.join(self.base_path, path)

_make_absolute_path(filepaths, path)

def test_traversal_inside(self) -> None:
"""Test the jail check for paths that stay within the media directory."""
# Despite the `../`s, these paths still lie within the media directory and it's
# expected for the jail check to allow them through.
# These paths ought to trip the other checks in place and should never be
# returned.
filepaths = MediaFilePaths("/media_store")
path = "url_cache/2020-01-02/../../GerZNDnDZVjsOtar"
self._check_relative_path(filepaths, path)
self._check_absolute_path(filepaths, path)

def test_traversal_outside(self) -> None:
"""Test that the jail check fails for paths that escape the media directory."""
filepaths = MediaFilePaths("/media_store")
path = "url_cache/2020-01-02/../../../GerZNDnDZVjsOtar"
with self.assertRaises(ValueError):
self._check_relative_path(filepaths, path)
with self.assertRaises(ValueError):
self._check_absolute_path(filepaths, path)

def test_traversal_reentry(self) -> None:
"""Test the jail check for paths that exit and re-enter the media directory."""
# These paths lie outside the media directory if it is a symlink, and inside
# otherwise. Ideally the check should fail, but this proves difficult.
# This test documents the behaviour for this edge case.
# These paths ought to trip the other checks in place and should never be
# returned.
filepaths = MediaFilePaths("/media_store")
path = "url_cache/2020-01-02/../../../media_store/GerZNDnDZVjsOtar"
self._check_relative_path(filepaths, path)
self._check_absolute_path(filepaths, path)

def test_symlink(self) -> None:
"""Test that a symlink does not cause the jail check to fail."""
media_store_path = self.mktemp()

# symlink the media store directory
os.symlink("/mnt/synapse/media_store", media_store_path)

# Test that relative and absolute paths don't trip the check
# NB: `media_store_path` is a relative path
filepaths = MediaFilePaths(media_store_path)
self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")

filepaths = MediaFilePaths(os.path.abspath(media_store_path))
self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")

def test_symlink_subdirectory(self) -> None:
"""Test that a symlinked subdirectory does not cause the jail check to fail."""
media_store_path = self.mktemp()
os.mkdir(media_store_path)

# symlink `url_cache/`
os.symlink(
"/mnt/synapse/media_store_url_cache",
os.path.join(media_store_path, "url_cache"),
)

# Test that relative and absolute paths don't trip the check
# NB: `media_store_path` is a relative path
filepaths = MediaFilePaths(media_store_path)
self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")

filepaths = MediaFilePaths(os.path.abspath(media_store_path))
self._check_relative_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")
self._check_absolute_path(filepaths, "url_cache/2020-01-02/GerZNDnDZVjsOtar")