Skip to content

Commit

Permalink
fix fs permission under windows
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurbanski-reef committed Aug 15, 2024
1 parent 72344a1 commit 57c2f5f
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 29 deletions.
24 changes: 20 additions & 4 deletions b2sdk/_internal/scan/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import logging
import re
import threading
import time
from dataclasses import dataclass
Expand All @@ -20,6 +21,21 @@

logger = logging.getLogger(__name__)

_REMOVE_EXTENDED_PATH_PREFIX = re.compile(r'\\\\\?\\')


def _safe_path_print(path: str) -> str:
"""
Print a path, escaping control characters if necessary.
Windows extended path prefix is removed from the path before printing for better readability.
Since Windows 10 the prefix is not needed.
:param path: a path to print
:return: a path that can be printed
"""
return escape_control_chars(_REMOVE_EXTENDED_PATH_PREFIX.sub('', path))


@dataclass
class ProgressReport:
Expand Down Expand Up @@ -180,7 +196,7 @@ def local_access_error(self, path: str) -> None:
:param path: file path
"""
self.warnings.append(
f'WARNING: {escape_control_chars(path)} could not be accessed (broken symlink?)'
f'WARNING: {_safe_path_print(path)} could not be accessed (broken symlink?)'
)

def local_permission_error(self, path: str) -> None:
Expand All @@ -190,7 +206,7 @@ def local_permission_error(self, path: str) -> None:
:param path: file path
"""
self.warnings.append(
f'WARNING: {escape_control_chars(path)} could not be accessed (no permissions to read?)'
f'WARNING: {_safe_path_print(path)} could not be accessed (no permissions to read?)'
)

def symlink_skipped(self, path: str) -> None:
Expand All @@ -203,7 +219,7 @@ def circular_symlink_skipped(self, path: str) -> None:
:param path: file path
"""
self.warnings.append(
f'WARNING: {escape_control_chars(path)} is a circular symlink, which was already visited. Skipping.'
f'WARNING: {_safe_path_print(path)} is a circular symlink, which was already visited. Skipping.'
)

def invalid_name(self, path: str, error: str) -> None:
Expand All @@ -213,7 +229,7 @@ def invalid_name(self, path: str, error: str) -> None:
:param path: file path
"""
self.warnings.append(
f'WARNING: {escape_control_chars(path)} path contains invalid name ({error}). Skipping.'
f'WARNING: {_safe_path_print(path)} path contains invalid name ({error}). Skipping.'
)


Expand Down
25 changes: 24 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ classifiers = [
"Programming Language :: Python :: 3.12",
]

[project.optional-dependencies]

[project.urls]
Homepage = "https://github.com/Backblaze/b2-sdk-python"

Expand Down Expand Up @@ -174,6 +172,7 @@ test = [
# remove `and platform_python_implementation!='PyPy'` after dropping Python 3.7 support as that
# will allow us to update pydantic to a version which installs properly under PyPy
"pydantic>=2.0.1,<3; python_version>='3.8' and platform_python_implementation!='PyPy'",
"pywin32>=306; sys_platform == \"win32\"",
]
release = [
"towncrier==23.11.0; python_version>='3.8'",
Expand Down
70 changes: 65 additions & 5 deletions test/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
from glob import glob
from pathlib import Path

try:
import ntsecuritycon
import win32api
import win32security
except ImportError:
pass

import pytest

pytest.register_assert_rewrite('test.unit')
Expand Down Expand Up @@ -195,26 +202,79 @@ def file_info():
return {'key': 'value'}


class PermTool:
def allow_access(self, path):
pass

def deny_access(self, path):
pass


class UnixPermTool(PermTool):
def allow_access(self, path):
path.chmod(0o700)

def deny_access(self, path):
path.chmod(0o000)


class WindowsPermTool(PermTool):
def __init__(self):
self.user_sid = win32security.GetTokenInformation(
win32security.OpenProcessToken(win32api.GetCurrentProcess(), win32security.TOKEN_QUERY),
win32security.TokenUser
)[0]

def allow_access(self, path):
dacl = win32security.ACL()
dacl.AddAccessAllowedAce(
win32security.ACL_REVISION, ntsecuritycon.FILE_ALL_ACCESS, self.user_sid
)

security_desc = win32security.GetFileSecurity(
str(path), win32security.DACL_SECURITY_INFORMATION
)
security_desc.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(
str(path), win32security.DACL_SECURITY_INFORMATION, security_desc
)

def deny_access(self, path):
dacl = win32security.ACL()
dacl.AddAccessDeniedAce(
win32security.ACL_REVISION, ntsecuritycon.FILE_ALL_ACCESS, self.user_sid
)

security_desc = win32security.GetFileSecurity(
str(path), win32security.DACL_SECURITY_INFORMATION
)
security_desc.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(
str(path), win32security.DACL_SECURITY_INFORMATION, security_desc
)


@pytest.fixture
def tmp_path_permission_cleanup(tmp_path):
def fs_perm_tool(tmp_path):
"""
Ensure tmp_path is delete-able after the test.
Important for the tests that mess with filesystem permissions.
"""
yield
perm_tool = UnixPermTool() if os.name != 'nt' else WindowsPermTool()
yield perm_tool

try:
shutil.rmtree(tmp_path)
except OSError:
tmp_path.chmod(0o700)
perm_tool.allow_access(tmp_path)

for root, dirs, files in os.walk(tmp_path, topdown=True):
for name in dirs:
(Path(root) / name).chmod(0o700)
perm_tool.allow_access(Path(root) / name)
for name in files:
file_path = Path(root) / name
file_path.chmod(0o600)
perm_tool.allow_access(file_path)
file_path.unlink()

for root, dirs, files in os.walk(tmp_path, topdown=False):
Expand Down
35 changes: 18 additions & 17 deletions test/unit/scan/test_folder_traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import codecs
import os
import pathlib
import platform
import re
import sys
Expand Down Expand Up @@ -686,7 +687,7 @@ def test_excluded_no_access_check(self, tmp_path):
platform.system() == 'Windows',
reason="Unix-only filesystem permissions are tested",
)
def test_dir_without_exec_permission(self, tmp_path, tmp_path_permission_cleanup):
def test_dir_without_exec_permission(self, tmp_path, fs_perm_tool):
"""Test that a excluded directory/file without permissions emits warnings."""
no_perm_dir = tmp_path / "no_perm_dir"
no_perm_dir.mkdir()
Expand All @@ -705,13 +706,13 @@ def test_dir_without_exec_permission(self, tmp_path, tmp_path_permission_cleanup

# Check that no access warnings are issued for the excluded directory/file
assert set(reporter.warnings) == {
f'WARNING: {tmp_path}/no_perm_dir/file.txt could not be accessed (no permissions to read?)',
f'WARNING: {tmp_path}/no_perm_dir/file2.txt could not be accessed (no permissions to read?)',
f'WARNING: {tmp_path/"no_perm_dir/file.txt"} could not be accessed (no permissions to read?)',
f'WARNING: {tmp_path/"no_perm_dir/file2.txt"} could not be accessed (no permissions to read?)',
}

reporter.close()

def test_without_permissions(self, tmp_path, tmp_path_permission_cleanup):
def test_without_permissions(self, tmp_path, fs_perm_tool):
"""Test that a excluded directory/file without permissions emits warnings."""
no_perm_dir = tmp_path / "no_perm_dir"
no_perm_dir.mkdir()
Expand All @@ -723,28 +724,32 @@ def test_without_permissions(self, tmp_path, tmp_path_permission_cleanup):
(included_dir / "included_file.txt").touch()

# Modify directory permissions to simulate lack of access
(included_dir / "no_perm_file.txt").chmod(0o000)
no_perm_dir.chmod(0o000)
fs_perm_tool.deny_access(included_dir / "no_perm_file.txt")
fs_perm_tool.deny_access(no_perm_dir)

scan_policy = ScanPoliciesManager()
reporter = ProgressReport(sys.stdout, False)

folder = LocalFolder(str(tmp_path))
local_paths = folder.all_files(reporter=reporter, policies_manager=scan_policy)
absolute_paths = [path.absolute_path for path in local_paths]
absolute_paths = [pathlib.Path(path.absolute_path) for path in local_paths]

for absolute_path in absolute_paths:
with open(absolute_path):
pass # TODO REMOVE ME

# Check that only included_dir/included_file.txt was return
assert any('included_file.txt' in path for path in absolute_paths)
assert {path.name for path in absolute_paths} == {"included_file.txt"}

# Check that no access warnings are issued for the excluded directory/file
assert set(reporter.warnings) == {
f'WARNING: {tmp_path}/no_perm_dir could not be accessed (no permissions to read?)',
f'WARNING: {tmp_path}/included_dir/no_perm_file.txt could not be accessed (no permissions to read?)'
f'WARNING: {tmp_path / "no_perm_dir"} could not be accessed (no permissions to read?)',
f'WARNING: {tmp_path / "included_dir/no_perm_file.txt"} could not be accessed (no permissions to read?)'
}

reporter.close()

def test_excluded_without_permissions(self, tmp_path, tmp_path_permission_cleanup):
def test_excluded_without_permissions(self, tmp_path, fs_perm_tool):
"""Test that a excluded directory/file without permissions is not processed and no warning is issued."""
no_perm_dir = tmp_path / "no_perm_dir"
no_perm_dir.mkdir()
Expand All @@ -756,8 +761,8 @@ def test_excluded_without_permissions(self, tmp_path, tmp_path_permission_cleanu
(included_dir / "included_file.txt").touch()

# Modify directory permissions to simulate lack of access
(included_dir / "no_perm_file.txt").chmod(0o000)
no_perm_dir.chmod(0o000)
fs_perm_tool.deny_access(included_dir / "no_perm_file.txt")
fs_perm_tool.deny_access(no_perm_dir)

scan_policy = ScanPoliciesManager(
exclude_dir_regexes=[r"no_perm_dir$"], exclude_file_regexes=[r'.*no_perm_file.txt']
Expand All @@ -768,10 +773,6 @@ def test_excluded_without_permissions(self, tmp_path, tmp_path_permission_cleanu
local_paths = folder.all_files(reporter=reporter, policies_manager=scan_policy)
absolute_paths = [path.absolute_path for path in local_paths]

# Restore directory permissions to clean up
(included_dir / "no_perm_file.txt").chmod(0o755)
no_perm_dir.chmod(0o755)

# Check that only included_dir/included_file.txt was return
assert any('included_file.txt' in path for path in absolute_paths)

Expand Down

0 comments on commit 57c2f5f

Please sign in to comment.