Skip to content

Commit

Permalink
pythonGH-127381: pathlib ABCs: remove PathBase.samefile() and rarer…
Browse files Browse the repository at this point in the history
… `is_*()`

Remove `PathBase.samefile()`, which is fairly specific to the local FS, and
relies on `stat()`, which we're aiming to remove from `PathBase`.

Also remove `PathBase.is_mount()`, `is_junction()`, `is_block_device()`,
`is_char_device()`, `is_fifo()` and `is_socket()`. These rely on POSIX
file type numbers that we're aiming to remove from the `PathBase` API.
  • Loading branch information
barneygale committed Dec 6, 2024
1 parent 5b6635f commit 1e09d70
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 186 deletions.
98 changes: 2 additions & 96 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import posixpath
from errno import EINVAL
from glob import _GlobberBase, _no_recurse_symlinks
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from stat import S_ISDIR, S_ISLNK, S_ISREG
from pathlib._os import copyfileobj


Expand Down Expand Up @@ -472,26 +472,6 @@ def is_file(self, *, follow_symlinks=True):
except (OSError, ValueError):
return False

def is_mount(self):
"""
Check if this path is a mount point
"""
# Need to exist and be a dir
if not self.exists() or not self.is_dir():
return False

try:
parent_dev = self.parent.stat().st_dev
except OSError:
return False

dev = self.stat().st_dev
if dev != parent_dev:
return True
ino = self.stat().st_ino
parent_ino = self.parent.stat().st_ino
return ino == parent_ino

def is_symlink(self):
"""
Whether this path is a symbolic link.
Expand All @@ -501,77 +481,6 @@ def is_symlink(self):
except (OSError, ValueError):
return False

def is_junction(self):
"""
Whether this path is a junction.
"""
# Junctions are a Windows-only feature, not present in POSIX nor the
# majority of virtual filesystems. There is no cross-platform idiom
# to check for junctions (using stat().st_mode).
return False

def is_block_device(self):
"""
Whether this path is a block device.
"""
try:
return S_ISBLK(self.stat().st_mode)
except (OSError, ValueError):
return False

def is_char_device(self):
"""
Whether this path is a character device.
"""
try:
return S_ISCHR(self.stat().st_mode)
except (OSError, ValueError):
return False

def is_fifo(self):
"""
Whether this path is a FIFO.
"""
try:
return S_ISFIFO(self.stat().st_mode)
except (OSError, ValueError):
return False

def is_socket(self):
"""
Whether this path is a socket.
"""
try:
return S_ISSOCK(self.stat().st_mode)
except (OSError, ValueError):
return False

def samefile(self, other_path):
"""Return whether other_path is the same or not as this file
(as returned by os.path.samefile()).
"""
st = self.stat()
try:
other_st = other_path.stat()
except AttributeError:
other_st = self.with_segments(other_path).stat()
return (st.st_ino == other_st.st_ino and
st.st_dev == other_st.st_dev)

def _ensure_different_file(self, other_path):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
try:
if not self.samefile(other_path):
return
except (OSError, ValueError):
return
err = OSError(EINVAL, "Source and target are the same file")
err.filename = str(self)
err.filename2 = str(other_path)
raise err

def _ensure_distinct_path(self, other_path):
"""
Raise OSError(EINVAL) if the other path is within this path.
Expand Down Expand Up @@ -845,7 +754,6 @@ def _copy_file(self, target):
"""
Copy the contents of this file to the given target.
"""
self._ensure_different_file(target)
with self.open('rb') as source_f:
try:
with target.open('wb') as target_f:
Expand Down Expand Up @@ -953,9 +861,7 @@ def _delete(self):
"""
Delete this file or directory (including all sub-directories).
"""
if self.is_symlink() or self.is_junction():
self.unlink()
elif self.is_dir():
if self.is_dir(follow_symlinks=False):
self._rmtree()
else:
self.unlink()
Expand Down
80 changes: 73 additions & 7 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import os
import posixpath
import sys
from errno import EXDEV
from errno import EINVAL, EXDEV
from glob import _StringGlobber
from itertools import chain
from stat import S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from _collections_abc import Sequence

try:
Expand Down Expand Up @@ -596,6 +597,69 @@ def is_junction(self):
"""
return os.path.isjunction(self)

def is_block_device(self):
"""
Whether this path is a block device.
"""
try:
return S_ISBLK(self.stat().st_mode)
except (OSError, ValueError):
return False

def is_char_device(self):
"""
Whether this path is a character device.
"""
try:
return S_ISCHR(self.stat().st_mode)
except (OSError, ValueError):
return False

def is_fifo(self):
"""
Whether this path is a FIFO.
"""
try:
return S_ISFIFO(self.stat().st_mode)
except (OSError, ValueError):
return False

def is_socket(self):
"""
Whether this path is a socket.
"""
try:
return S_ISSOCK(self.stat().st_mode)
except (OSError, ValueError):
return False


def samefile(self, other_path):
"""Return whether other_path is the same or not as this file
(as returned by os.path.samefile()).
"""
st = self.stat()
try:
other_st = other_path.stat()
except AttributeError:
other_st = self.with_segments(other_path).stat()
return (st.st_ino == other_st.st_ino and
st.st_dev == other_st.st_dev)

def _ensure_different_file(self, other_path):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
try:
if not self.samefile(other_path):
return
except (OSError, ValueError):
return
err = OSError(EINVAL, "Source and target are the same file")
err.filename = str(self)
err.filename2 = str(other_path)
raise err

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
Expand Down Expand Up @@ -809,19 +873,21 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
_read_metadata = read_file_metadata
_write_metadata = write_file_metadata

if copyfile:
def _copy_file(self, target):
"""
Copy the contents of this file to the given target.
"""
def _copy_file(self, target):
"""
Copy the contents of this file to the given target.
"""
if copyfile:
try:
target = os.fspath(target)
except TypeError:
if not isinstance(target, PathBase):
raise
PathBase._copy_file(self, target)
else:
copyfile(os.fspath(self), target)
return
self._ensure_different_file(target)
PathBase._copy_file(self, target)

def chmod(self, mode, *, follow_symlinks=True):
"""
Expand Down
77 changes: 75 additions & 2 deletions Lib/test/test_pathlib/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,13 +1365,31 @@ def test_lstat_nosymlink(self):
st = p.stat()
self.assertEqual(st, p.lstat())

def test_is_junction(self):
def test_is_junction_false(self):
P = self.cls(self.base)
self.assertFalse((P / 'fileA').is_junction())
self.assertFalse((P / 'dirA').is_junction())
self.assertFalse((P / 'non-existing').is_junction())
self.assertFalse((P / 'fileA' / 'bah').is_junction())
self.assertFalse((P / 'fileA\udfff').is_junction())
self.assertFalse((P / 'fileA\x00').is_junction())

def test_is_junction_true(self):
P = self.cls(self.base)

with mock.patch.object(P.parser, 'isjunction'):
self.assertEqual(P.is_junction(), P.parser.isjunction.return_value)
P.parser.isjunction.assert_called_once_with(P)

def test_is_fifo_false(self):
P = self.cls(self.base)
self.assertFalse((P / 'fileA').is_fifo())
self.assertFalse((P / 'dirA').is_fifo())
self.assertFalse((P / 'non-existing').is_fifo())
self.assertFalse((P / 'fileA' / 'bah').is_fifo())
self.assertIs((P / 'fileA\udfff').is_fifo(), False)
self.assertIs((P / 'fileA\x00').is_fifo(), False)

@unittest.skipUnless(hasattr(os, "mkfifo"), "os.mkfifo() required")
@unittest.skipIf(sys.platform == "vxworks",
"fifo requires special path on VxWorks")
Expand All @@ -1387,6 +1405,15 @@ def test_is_fifo_true(self):
self.assertIs(self.cls(self.base, 'myfifo\udfff').is_fifo(), False)
self.assertIs(self.cls(self.base, 'myfifo\x00').is_fifo(), False)

def test_is_socket_false(self):
P = self.cls(self.base)
self.assertFalse((P / 'fileA').is_socket())
self.assertFalse((P / 'dirA').is_socket())
self.assertFalse((P / 'non-existing').is_socket())
self.assertFalse((P / 'fileA' / 'bah').is_socket())
self.assertIs((P / 'fileA\udfff').is_socket(), False)
self.assertIs((P / 'fileA\x00').is_socket(), False)

@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
@unittest.skipIf(
is_emscripten, "Unix sockets are not implemented on Emscripten."
Expand All @@ -1410,6 +1437,24 @@ def test_is_socket_true(self):
self.assertIs(self.cls(self.base, 'mysock\udfff').is_socket(), False)
self.assertIs(self.cls(self.base, 'mysock\x00').is_socket(), False)

def test_is_block_device_false(self):
P = self.cls(self.base)
self.assertFalse((P / 'fileA').is_block_device())
self.assertFalse((P / 'dirA').is_block_device())
self.assertFalse((P / 'non-existing').is_block_device())
self.assertFalse((P / 'fileA' / 'bah').is_block_device())
self.assertIs((P / 'fileA\udfff').is_block_device(), False)
self.assertIs((P / 'fileA\x00').is_block_device(), False)

def test_is_char_device_false(self):
P = self.cls(self.base)
self.assertFalse((P / 'fileA').is_char_device())
self.assertFalse((P / 'dirA').is_char_device())
self.assertFalse((P / 'non-existing').is_char_device())
self.assertFalse((P / 'fileA' / 'bah').is_char_device())
self.assertIs((P / 'fileA\udfff').is_char_device(), False)
self.assertIs((P / 'fileA\x00').is_char_device(), False)

def test_is_char_device_true(self):
# os.devnull should generally be a char device.
P = self.cls(os.devnull)
Expand All @@ -1421,14 +1466,42 @@ def test_is_char_device_true(self):
self.assertIs(self.cls(f'{os.devnull}\udfff').is_char_device(), False)
self.assertIs(self.cls(f'{os.devnull}\x00').is_char_device(), False)

def test_is_mount_root(self):
def test_is_mount(self):
P = self.cls(self.base)
self.assertFalse((P / 'fileA').is_mount())
self.assertFalse((P / 'dirA').is_mount())
self.assertFalse((P / 'non-existing').is_mount())
self.assertFalse((P / 'fileA' / 'bah').is_mount())
if self.can_symlink:
self.assertFalse((P / 'linkA').is_mount())
if os.name == 'nt':
R = self.cls('c:\\')
else:
R = self.cls('/')
self.assertTrue(R.is_mount())
self.assertFalse((R / '\udfff').is_mount())

def test_samefile(self):
parser = self.parser
fileA_path = parser.join(self.base, 'fileA')
fileB_path = parser.join(self.base, 'dirB', 'fileB')
p = self.cls(fileA_path)
pp = self.cls(fileA_path)
q = self.cls(fileB_path)
self.assertTrue(p.samefile(fileA_path))
self.assertTrue(p.samefile(pp))
self.assertFalse(p.samefile(fileB_path))
self.assertFalse(p.samefile(q))
# Test the non-existent file case
non_existent = parser.join(self.base, 'foo')
r = self.cls(non_existent)
self.assertRaises(FileNotFoundError, p.samefile, r)
self.assertRaises(FileNotFoundError, p.samefile, non_existent)
self.assertRaises(FileNotFoundError, r.samefile, p)
self.assertRaises(FileNotFoundError, r.samefile, non_existent)
self.assertRaises(FileNotFoundError, r.samefile, r)
self.assertRaises(FileNotFoundError, r.samefile, non_existent)

def test_passing_kwargs_errors(self):
with self.assertRaises(TypeError):
self.cls(foo="bar")
Expand Down
Loading

0 comments on commit 1e09d70

Please sign in to comment.