Skip to content

Commit

Permalink
Fix sanity check for path traversal attack
Browse files Browse the repository at this point in the history
- Previous versions do not detect the attack in some case
   - fixed it by call resolve()
   - resolve() converts "/hoge/fuga/../../../tmp/evil.sh" to be "/tmp/evil.sh" then
     relative_to() can detect path traversal attack.
- Add path checker in writef() and writestr() methods
  - When pass arcname as evil path such as "../../../../tmp/evil.sh"
    it raises ValueError
- Add test case of bad path detection
- extraction: check symlink and junction is under target folder
- Fix relative_path_marker removal
- Don't put windows file namespace to output file path

Signed-off-by: Hiroshi Miura <miurahr@linux.com>
  • Loading branch information
miurahr committed Oct 31, 2022
1 parent 04e3af5 commit 1bb43f1
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 53 deletions.
3 changes: 0 additions & 3 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ include *.rst
include *.svg
include *.toml
include LICENSE
include .coveragerc
include tox.ini
include py7zr/py.typed
recursive-include py7zr *.py
recursive-include tests *.py
Expand All @@ -25,7 +23,6 @@ recursive-include docs *.yml
recursive-include docs *.odp
recursive-include docs *.pdf
recursive-include docs *.svg
recursive-include docs *.txt
recursive-include docs Makefile
prune .github
exclude .gitignore
Expand Down
60 changes: 58 additions & 2 deletions py7zr/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import _hashlib # type: ignore # noqa

import py7zr.win32compat
from py7zr import Bad7zFile
from py7zr.win32compat import is_windows_native_python, is_windows_unc_path

# String used at the beginning of relative paths
RELATIVE_PATH_MARKER = "./"
Expand Down Expand Up @@ -265,7 +267,7 @@ def from_now():
def islink(path):
"""
Cross-platform islink implementation.
Supports Windows NT symbolic links and reparse points.
Support Windows NT symbolic links and reparse points.
"""
is_symlink = os.path.islink(str(path))
if sys.version_info >= (3, 8) or sys.platform != "win32" or sys.getwindowsversion()[0] < 6:
Expand All @@ -280,7 +282,7 @@ def islink(path):
def readlink(path: Union[str, pathlib.Path], *, dir_fd=None) -> Union[str, pathlib.Path]:
"""
Cross-platform compat implementation of os.readlink and Path.readlink().
Supports Windows NT symbolic links and reparse points.
Support Windows NT symbolic links and reparse points.
When called with path argument as pathlike(str), return result as a pathlike(str).
When called with Path object, return also Path object.
When called with path argument as bytes, return result as a bytes.
Expand Down Expand Up @@ -431,3 +433,57 @@ def remove_relative_path_marker(path: str) -> str:
processed_path = path[len(RELATIVE_PATH_MARKER) :]

return processed_path


def get_sanitized_output_path(fname: str, path: Optional[pathlib.Path]) -> pathlib.Path:
"""
check f.filename has invalid directory traversals
do following but is_relative_to introduced in py 3.9,
so I replaced it with relative_to. when condition is not satisfied, raise ValueError
if not pathlib.Path(...).joinpath(remove_relative_path_marker(outname)).is_relative_to(...):
raise Bad7zFile
"""
if path is None:
try:
pathlib.Path(os.getcwd()).joinpath(fname).resolve().relative_to(os.getcwd())
outfile = pathlib.Path(remove_relative_path_marker(fname))
except ValueError:
raise Bad7zFile(f"Specified path is bad: {fname}")
else:
try:
outfile = path.joinpath(remove_relative_path_marker(fname))
outfile.resolve().relative_to(path)
except ValueError:
raise Bad7zFile(f"Specified path is bad: {fname}")
return outfile


def check_archive_path(arcname: str) -> bool:
path = pathlib.Path("/foo/boo/fuga/hoge/a90sufoiasj09/dafj08sajfa/") # dummy path
return is_target_path_valid(path, path.joinpath(arcname))


def is_target_path_valid(path: pathlib.Path, target: pathlib.Path) -> bool:
try:
if path.is_absolute():
target.resolve().relative_to(path)
else:
target.resolve().relative_to(pathlib.Path(os.getcwd()).joinpath(path))
except ValueError:
return False
return True


def check_win32_file_namespace(pathname: pathlib.Path) -> pathlib.Path:
# When python on Windows and not python on Cygwin,
# Add win32 file namespace to exceed Microsoft Windows
# path length limitation to 260 bytes
# ref.
# https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file
# In editions of Windows before Windows 10 version 1607,
# the maximum length for a path is MAX_PATH, which is defined as
# 260 characters. In later versions of Windows, changing a registry key
# or select option when python installation is required to remove the limit.
if is_windows_native_python() and pathname.is_absolute() and not is_windows_unc_path(pathname):
pathname = pathlib.WindowsPath("\\\\?\\" + str(pathname))
return pathname
98 changes: 51 additions & 47 deletions py7zr/py7zr.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@
MemIO,
NullIO,
calculate_crc32,
check_archive_path,
filetime_to_dt,
get_sanitized_output_path,
is_target_path_valid,
readlink,
remove_relative_path_marker,
)
from py7zr.properties import DEFAULT_FILTERS, FILTER_DEFLATE64, MAGIC_7Z, get_default_blocksize, get_memory_limit
from py7zr.win32compat import is_windows_native_python, is_windows_unc_path

if sys.platform.startswith("win"):
import _winapi
Expand Down Expand Up @@ -567,34 +568,10 @@ def _extract(
break
i += 1
fnames.append(outname)
# check f.filename has invalid directory traversals
if path is None:
# do following but is_relative_to introduced in py 3.9
# so I replaced it with relative_to. when condition is not satisfied, raise ValueError
# if not pathlib.Path(...).joinpath(remove_relative_path_marker(outname)).is_relative_to(...):
# raise Bad7zFile
try:
pathlib.Path(os.getcwd()).joinpath(remove_relative_path_marker(outname)).relative_to(os.getcwd())
except ValueError:
raise Bad7zFile
outfilename = pathlib.Path(remove_relative_path_marker(outname))
if path is None or path.is_absolute():
outfilename = get_sanitized_output_path(outname, path)
else:
outfilename = path.joinpath(remove_relative_path_marker(outname))
try:
outfilename.relative_to(path)
except ValueError:
raise Bad7zFile
# When python on Windows and not python on Cygwin,
# Add win32 file namespace to exceed microsoft windows
# path length limitation to 260 bytes
# ref.
# https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file
# In editions of Windows before Windows 10 version 1607,
# the maximum length for a path is MAX_PATH, which is defined as
# 260 characters. In later versions of Windows, changing a registry key
# or select option when python installation is required to remove the limit.
if is_windows_native_python() and outfilename.is_absolute() and not is_windows_unc_path(outfilename):
outfilename = pathlib.WindowsPath("\\\\?\\" + str(outfilename))
outfilename = get_sanitized_output_path(outname, pathlib.Path(os.getcwd()).joinpath(path))
if targets is not None and f.filename not in targets:
self.worker.register_filelike(f.id, None)
continue
Expand Down Expand Up @@ -634,12 +611,14 @@ def _extract(
if callback is not None:
self.worker.extract(
self.fp,
path,
parallel=(not self.password_protected and not self._filePassed),
q=self.q,
)
else:
self.worker.extract(
self.fp,
path,
parallel=(not self.password_protected and not self._filePassed),
)

Expand Down Expand Up @@ -1040,6 +1019,11 @@ def writed(self, targets: Dict[str, IO[Any]]) -> None:
self.writef(input, target)

def writef(self, bio: IO[Any], arcname: str):
if not check_archive_path(arcname):
raise ValueError(f"Specified path is bad: {arcname}")
return self._writef(bio, arcname)

def _writef(self, bio: IO[Any], arcname: str):
if isinstance(bio, io.BytesIO):
size = bio.getbuffer().nbytes
elif isinstance(bio, io.TextIOBase):
Expand Down Expand Up @@ -1069,12 +1053,17 @@ def writef(self, bio: IO[Any], arcname: str):
self.files.append(file_info)

def writestr(self, data: Union[str, bytes, bytearray, memoryview], arcname: str):
if not check_archive_path(arcname):
raise ValueError(f"Specified path is bad: {arcname}")
return self._writestr(data, arcname)

def _writestr(self, data: Union[str, bytes, bytearray, memoryview], arcname: str):
if not isinstance(arcname, str):
raise ValueError("Unsupported arcname")
if isinstance(data, str):
self.writef(io.BytesIO(data.encode("UTF-8")), arcname)
self._writef(io.BytesIO(data.encode("UTF-8")), arcname)
elif isinstance(data, bytes) or isinstance(data, bytearray) or isinstance(data, memoryview):
self.writef(io.BytesIO(data), arcname)
self._writef(io.BytesIO(bytes(data)), arcname)
else:
raise ValueError("Unsupported data type.")

Expand Down Expand Up @@ -1131,7 +1120,9 @@ def testzip(self) -> Optional[str]:
for f in self.files:
self.worker.register_filelike(f.id, None)
try:
self.worker.extract(self.fp, parallel=(not self.password_protected), skip_notarget=False) # TODO: print progress
self.worker.extract(
self.fp, None, parallel=(not self.password_protected), skip_notarget=False
) # TODO: print progress
except CrcError as crce:
return crce.args[2]
else:
Expand Down Expand Up @@ -1200,7 +1191,7 @@ def __init__(self, files, src_start: int, header, mp=False) -> None:
else:
self.concurrent = Thread

def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> None:
def extract(self, fp: BinaryIO, path: Optional[pathlib.Path], parallel: bool, skip_notarget=True, q=None) -> None:
"""Extract worker method to handle 7zip folder and decompress each files."""
if hasattr(self.header, "main_streams") and self.header.main_streams is not None:
src_end = self.src_start + self.header.main_streams.packinfo.packpositions[-1]
Expand All @@ -1209,6 +1200,7 @@ def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> N
self.extract_single(
fp,
self.files,
path,
self.src_start,
src_end,
q,
Expand All @@ -1219,14 +1211,15 @@ def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> N
positions = self.header.main_streams.packinfo.packpositions
empty_files = [f for f in self.files if f.emptystream]
if not parallel:
self.extract_single(fp, empty_files, 0, 0, q)
self.extract_single(fp, empty_files, path, 0, 0, q)
for i in range(numfolders):
if skip_notarget:
if not any([self.target_filepath.get(f.id, None) for f in folders[i].files]):
continue
self.extract_single(
fp,
folders[i].files,
path,
self.src_start + positions[i],
self.src_start + positions[i + 1],
q,
Expand All @@ -1236,7 +1229,7 @@ def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> N
if getattr(fp, "name", None) is None:
raise InternalError("Caught unknown variable status error")
filename: str = getattr(fp, "name", "") # do not become "" but it is for type check.
self.extract_single(open(filename, "rb"), empty_files, 0, 0, q)
self.extract_single(open(filename, "rb"), empty_files, path, 0, 0, q)
concurrent_tasks = []
exc_q: queue.Queue = queue.Queue()
for i in range(numfolders):
Expand All @@ -1248,6 +1241,7 @@ def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> N
args=(
filename,
folders[i].files,
path,
self.src_start + positions[i],
self.src_start + positions[i + 1],
q,
Expand All @@ -1266,12 +1260,13 @@ def extract(self, fp: BinaryIO, parallel: bool, skip_notarget=True, q=None) -> N
raise exc_info[1].with_traceback(exc_info[2])
else:
empty_files = [f for f in self.files if f.emptystream]
self.extract_single(fp, empty_files, 0, 0, q)
self.extract_single(fp, empty_files, path, 0, 0, q)

def extract_single(
self,
fp: Union[BinaryIO, str],
files,
path,
src_start: int,
src_end: int,
q: Optional[queue.Queue],
Expand All @@ -1287,7 +1282,7 @@ def extract_single(
if isinstance(fp, str):
fp = open(fp, "rb")
fp.seek(src_start)
self._extract_single(fp, files, src_end, q, skip_notarget)
self._extract_single(fp, files, path, src_end, q, skip_notarget)
except Exception as e:
if exc_q is None:
raise e
Expand All @@ -1299,6 +1294,7 @@ def _extract_single(
self,
fp: BinaryIO,
files,
path,
src_end: int,
q: Optional[queue.Queue],
skip_notarget=True,
Expand Down Expand Up @@ -1331,20 +1327,28 @@ def _extract_single(
with io.BytesIO() as ofp:
self.decompress(fp, f.folder, ofp, f.uncompressed, f.compressed, src_end)
dst: str = ofp.read().decode("utf-8")
# fileish.unlink(missing_ok=True) > py3.7
if fileish.exists():
fileish.unlink()
if sys.platform == "win32": # hint for mypy
_winapi.CreateJunction(str(fileish), dst) # noqa
if is_target_path_valid(path, fileish.parent.joinpath(dst)):
# fileish.unlink(missing_ok=True) > py3.7
if fileish.exists():
fileish.unlink()
if sys.platform == "win32": # hint for mypy
_winapi.CreateJunction(str(fileish), dst) # noqa
else:
raise Bad7zFile("Junction point out of target directory.")
elif f.is_symlink and not isinstance(fileish, MemIO):
with io.BytesIO() as omfp:
self.decompress(fp, f.folder, omfp, f.uncompressed, f.compressed, src_end)
omfp.seek(0)
sym_target = pathlib.Path(omfp.read().decode("utf-8"))
# fileish.unlink(missing_ok=True) > py3.7
if fileish.exists():
fileish.unlink()
fileish.symlink_to(sym_target)
dst = omfp.read().decode("utf-8")
# check sym_target points inside an archive target?
if is_target_path_valid(path, fileish.parent.joinpath(dst)):
sym_target = pathlib.Path(dst)
# fileish.unlink(missing_ok=True) > py3.7
if fileish.exists():
fileish.unlink()
fileish.symlink_to(sym_target)
else:
raise Bad7zFile("Symlink point out of target directory.")
else:
with fileish.open(mode="wb") as obfp:
crc32 = self.decompress(fp, f.folder, obfp, f.uncompressed, f.compressed, src_end)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def test_skip():
for i, cf in enumerate(archive.files):
assert cf is not None
archive.worker.register_filelike(cf.id, None)
archive.worker.extract(archive.fp, parallel=True)
archive.worker.extract(archive.fp, None, parallel=True)
archive.close()


Expand Down
48 changes: 48 additions & 0 deletions tests/test_zipslip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import os

import pytest

from py7zr import SevenZipFile
from py7zr.exceptions import Bad7zFile
from py7zr.helpers import check_archive_path, get_sanitized_output_path
from py7zr.properties import FILTER_LZMA2, PRESET_DEFAULT

testdata_path = os.path.join(os.path.dirname(__file__), "data")


@pytest.mark.misc
def test_check_archive_path():
bad_path = "../../.../../../../../../tmp/evil.sh"
assert not check_archive_path(bad_path)


@pytest.mark.misc
def test_get_sanitized_output_path_1(tmp_path):
bad_path = "../../.../../../../../../tmp/evil.sh"
with pytest.raises(Bad7zFile):
get_sanitized_output_path(bad_path, tmp_path)


@pytest.mark.misc
def test_get_sanitized_output_path_2(tmp_path):
good_path = "good.sh"
expected = tmp_path.joinpath(good_path)
assert expected == get_sanitized_output_path(good_path, tmp_path)


@pytest.mark.misc
def test_extract_path_traversal_attack(tmp_path):
my_filters = [
{"id": FILTER_LZMA2, "preset": PRESET_DEFAULT},
]
target = tmp_path.joinpath("target.7z")
good_data = b"#!/bin/sh\necho good\n"
good_path = "good.sh"
bad_data = b"!#/bin/sh\necho bad\n"
bad_path = "../../.../../../../../../tmp/evil.sh"
with SevenZipFile(target, "w", filters=my_filters) as archive:
archive.writestr(good_data, good_path)
archive._writestr(bad_data, bad_path) # bypass a path check
with pytest.raises(Bad7zFile):
with SevenZipFile(target, "r") as archive:
archive.extractall(path=tmp_path)

0 comments on commit 1bb43f1

Please sign in to comment.