Skip to content

Commit

Permalink
Add is_resource() and contents() (python#37)
Browse files Browse the repository at this point in the history
Support for is_resource() and contents()
  • Loading branch information
warsaw authored Dec 5, 2017
1 parent 3e9cda3 commit e82b567
Show file tree
Hide file tree
Showing 14 changed files with 492 additions and 54 deletions.
6 changes: 4 additions & 2 deletions importlib_resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@


if sys.version_info >= (3,):
from importlib_resources._py3 import open, path, read
from importlib_resources._py3 import (
contents, is_resource, open, path, read)
else:
from importlib_resources._py2 import open, path, read
from importlib_resources._py2 import (
contents, is_resource, open, path, read)
114 changes: 114 additions & 0 deletions importlib_resources/_py2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import errno
import tempfile

from ._compat import FileNotFoundError
Expand All @@ -7,6 +8,7 @@
from importlib import import_module
from io import BytesIO, open as io_open
from pathlib2 import Path
from zipfile import ZipFile


def _get_package(package):
Expand Down Expand Up @@ -120,3 +122,115 @@ def path(package, file_name):
os.remove(raw_path)
except FileNotFoundError:
pass


def is_resource(package, file_name):
"""True if file_name is a resource inside package.
Directories are *not* resources.
"""
package = _get_package(package)
_normalize_path(file_name)
try:
package_contents = set(contents(package))
except OSError as error:
if error.errno not in (errno.ENOENT, errno.ENOTDIR):
# We won't hit this in the Python 2 tests, so it'll appear
# uncovered. We could mock os.listdir() to return a non-ENOENT or
# ENOTDIR, but then we'd have to depend on another external
# library since Python 2 doesn't have unittest.mock. It's not
# worth it.
raise # pragma: ge3
return False
if file_name not in package_contents:
return False
# Just because the given file_name lives as an entry in the package's
# contents doesn't necessarily mean it's a resource. Directories are not
# resources, so let's try to find out if it's a directory or not.
path = Path(package.__file__).parent / file_name
if path.is_file():
return True
if path.is_dir():
return False
# If it's not a file and it's not a directory, what is it? Well, this
# means the file doesn't exist on the file system, so it probably lives
# inside a zip file. We have to crack open the zip, look at its table of
# contents, and make sure that this entry doesn't have sub-entries.
archive_path = package.__loader__.archive # type: ignore
package_directory = Path(package.__file__).parent
with ZipFile(archive_path) as zf:
toc = zf.namelist()
relpath = package_directory.relative_to(archive_path)
candidate_path = relpath / file_name
for entry in toc: # pragma: nobranch
try:
relative_to_candidate = Path(entry).relative_to(candidate_path)
except ValueError:
# The two paths aren't relative to each other so we can ignore it.
continue
# Since directories aren't explicitly listed in the zip file, we must
# infer their 'directory-ness' by looking at the number of path
# components in the path relative to the package resource we're
# looking up. If there are zero additional parts, it's a file, i.e. a
# resource. If there are more than zero it's a directory, i.e. not a
# resource. It has to be one of these two cases.
return len(relative_to_candidate.parts) == 0
# I think it's impossible to get here. It would mean that we are looking
# for a resource in a zip file, there's an entry matching it in the return
# value of contents(), but we never actually found it in the zip's table of
# contents.
raise AssertionError('Impossible situation')


def contents(package):
"""Return the list of entries in package.
Note that not all entries are resources. Specifically, directories are
not considered resources. Use `is_resource()` on each entry returned here
to check if it is a resource or not.
"""
package = _get_package(package)
package_directory = Path(package.__file__).parent
try:
# Python 2 doesn't support `yield from`. We fall back to using
# os.listdir() here to simplify the returning of just the name.
for entry in os.listdir(str(package_directory)):
yield entry
except OSError as error:
if error.errno not in (errno.ENOENT, errno.ENOTDIR):
# We won't hit this in the Python 2 tests, so it'll appear
# uncovered. We could mock os.listdir() to return a non-ENOENT or
# ENOTDIR, but then we'd have to depend on another external
# library since Python 2 doesn't have unittest.mock. It's not
# worth it.
raise # pragma: ge3
# The package is probably in a zip file.
archive_path = getattr(package.__loader__, 'archive', None)
if archive_path is None:
raise
relpath = package_directory.relative_to(archive_path)
with ZipFile(archive_path) as zf:
toc = zf.namelist()
subdirs_seen = set() # type: Set
for filename in toc:
path = Path(filename)
# Strip off any path component parts that are in common with the
# package directory, relative to the zip archive's file system
# path. This gives us all the parts that live under the named
# package inside the zip file. If the length of these subparts is
# exactly 1, then it is situated inside the package. The resulting
# length will be 0 if it's above the package, and it will be
# greater than 1 if it lives in a subdirectory of the package
# directory.
#
# However, since directories themselves don't appear in the zip
# archive as a separate entry, we need to return the first path
# component for any case that has > 1 subparts -- but only once!
subparts = path.parts[len(relpath.parts):]
if len(subparts) == 1:
yield subparts[0]
elif len(subparts) > 1: # pragma: nobranch
subdir = subparts[0]
if subdir not in subdirs_seen:
subdirs_seen.add(subdir)
yield subdir
170 changes: 142 additions & 28 deletions importlib_resources/_py3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
from io import BytesIO, TextIOWrapper
from pathlib import Path
from types import ModuleType
from typing import Iterator, Union
from typing import Iterator, Optional, Set, Union # noqa: F401
from typing import cast
from typing.io import IO
from zipfile import ZipFile


Package = Union[ModuleType, str]
Expand Down Expand Up @@ -47,42 +48,53 @@ def _normalize_path(path) -> str:
return file_name


def _get_resource_reader(
package: ModuleType) -> Optional[resources_abc.ResourceReader]:
# Return the package's loader if it's a ResourceReader. We can't use
# a issubclass() check here because apparently abc.'s __subclasscheck__()
# hook wants to create a weak reference to the object, but
# zipimport.zipimporter does not support weak references, resulting in a
# TypeError. That seems terrible.
if hasattr(package.__spec__.loader, 'open_resource'):
return cast(resources_abc.ResourceReader, package.__spec__.loader)
return None


def open(package: Package,
file_name: FileName,
encoding: str = None,
errors: str = None) -> IO:
"""Return a file-like object opened for reading of the resource."""
file_name = _normalize_path(file_name)
package = _get_package(package)
if hasattr(package.__spec__.loader, 'open_resource'):
reader = cast(resources_abc.ResourceReader, package.__spec__.loader)
reader = _get_resource_reader(package)
if reader is not None:
return _wrap_file(reader.open_resource(file_name), encoding, errors)
# Using pathlib doesn't work well here due to the lack of 'strict'
# argument for pathlib.Path.resolve() prior to Python 3.6.
absolute_package_path = os.path.abspath(package.__spec__.origin)
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, file_name)
if encoding is None:
args = dict(mode='rb')
else:
# Using pathlib doesn't work well here due to the lack of 'strict'
# argument for pathlib.Path.resolve() prior to Python 3.6.
absolute_package_path = os.path.abspath(package.__spec__.origin)
package_path = os.path.dirname(absolute_package_path)
full_path = os.path.join(package_path, file_name)
if encoding is None:
args = dict(mode='rb')
else:
args = dict(mode='r', encoding=encoding, errors=errors)
args = dict(mode='r', encoding=encoding, errors=errors)
try:
return builtins_open(full_path, **args) # type: ignore
except IOError:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, package.__spec__.loader)
try:
return builtins_open(full_path, **args) # type: ignore
data = loader.get_data(full_path)
except IOError:
# Just assume the loader is a resource loader; all the relevant
# importlib.machinery loaders are and an AttributeError for
# get_data() will make it clear what is needed from the loader.
loader = cast(ResourceLoader, package.__spec__.loader)
try:
data = loader.get_data(full_path)
except IOError:
package_name = package.__spec__.name
message = '{!r} resource not found in {!r}'.format(
file_name, package_name)
raise FileNotFoundError(message)
else:
return _wrap_file(BytesIO(data), encoding, errors)
package_name = package.__spec__.name
message = '{!r} resource not found in {!r}'.format(
file_name, package_name)
raise FileNotFoundError(message)
else:
return _wrap_file(BytesIO(data), encoding, errors)


def read(package: Package,
Expand Down Expand Up @@ -119,8 +131,8 @@ def path(package: Package, file_name: FileName) -> Iterator[Path]:
"""
file_name = _normalize_path(file_name)
package = _get_package(package)
if hasattr(package.__spec__.loader, 'resource_path'):
reader = cast(resources_abc.ResourceReader, package.__spec__.loader)
reader = _get_resource_reader(package)
if reader is not None:
try:
yield Path(reader.resource_path(file_name))
return
Expand Down Expand Up @@ -148,3 +160,105 @@ def path(package: Package, file_name: FileName) -> Iterator[Path]:
os.remove(raw_path)
except FileNotFoundError:
pass


def is_resource(package: Package, file_name: str) -> bool:
"""True if file_name is a resource inside package.
Directories are *not* resources.
"""
package = _get_package(package)
_normalize_path(file_name)
reader = _get_resource_reader(package)
if reader is not None:
return reader.is_resource(file_name)
try:
package_contents = set(contents(package))
except (NotADirectoryError, FileNotFoundError):
return False
if file_name not in package_contents:
return False
# Just because the given file_name lives as an entry in the package's
# contents doesn't necessarily mean it's a resource. Directories are not
# resources, so let's try to find out if it's a directory or not.
path = Path(package.__spec__.origin).parent / file_name
if path.is_file():
return True
if path.is_dir():
return False
# If it's not a file and it's not a directory, what is it? Well, this
# means the file doesn't exist on the file system, so it probably lives
# inside a zip file. We have to crack open the zip, look at its table of
# contents, and make sure that this entry doesn't have sub-entries.
archive_path = package.__spec__.loader.archive # type: ignore
package_directory = Path(package.__spec__.origin).parent
with ZipFile(archive_path) as zf:
toc = zf.namelist()
relpath = package_directory.relative_to(archive_path)
candidate_path = relpath / file_name
for entry in toc: # pragma: nobranch
try:
relative_to_candidate = Path(entry).relative_to(candidate_path)
except ValueError:
# The two paths aren't relative to each other so we can ignore it.
continue
# Since directories aren't explicitly listed in the zip file, we must
# infer their 'directory-ness' by looking at the number of path
# components in the path relative to the package resource we're
# looking up. If there are zero additional parts, it's a file, i.e. a
# resource. If there are more than zero it's a directory, i.e. not a
# resource. It has to be one of these two cases.
return len(relative_to_candidate.parts) == 0
# I think it's impossible to get here. It would mean that we are looking
# for a resource in a zip file, there's an entry matching it in the return
# value of contents(), but we never actually found it in the zip's table of
# contents.
raise AssertionError('Impossible situation')


def contents(package: Package) -> Iterator[str]:
"""Return the list of entries in package.
Note that not all entries are resources. Specifically, directories are
not considered resources. Use `is_resource()` on each entry returned here
to check if it is a resource or not.
"""
package = _get_package(package)
reader = _get_resource_reader(package)
if reader is not None:
yield from reader.contents()
return
package_directory = Path(package.__spec__.origin).parent
try:
yield from os.listdir(str(package_directory))
except (NotADirectoryError, FileNotFoundError):
# The package is probably in a zip file.
archive_path = getattr(package.__spec__.loader, 'archive', None)
if archive_path is None:
raise
relpath = package_directory.relative_to(archive_path)
with ZipFile(archive_path) as zf:
toc = zf.namelist()
subdirs_seen = set() # type: Set
for filename in toc:
path = Path(filename)
# Strip off any path component parts that are in common with the
# package directory, relative to the zip archive's file system
# path. This gives us all the parts that live under the named
# package inside the zip file. If the length of these subparts is
# exactly 1, then it is situated inside the package. The resulting
# length will be 0 if it's above the package, and it will be
# greater than 1 if it lives in a subdirectory of the package
# directory.
#
# However, since directories themselves don't appear in the zip
# archive as a separate entry, we need to return the first path
# component for any case that has > 1 subparts -- but only once!
subparts = path.parts[len(relpath.parts):]
if len(subparts) == 1:
yield subparts[0]
elif len(subparts) > 1: # pragma: nobranch
subdir = subparts[0]
if subdir not in subdirs_seen:
subdirs_seen.add(subdir)
yield subdir
17 changes: 16 additions & 1 deletion importlib_resources/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# We use mypy's comment syntax here since this file must be compatible with
# both Python 2 and 3.
try:
from typing import BinaryIO, Text # noqa: F401
from typing import BinaryIO, Iterator, Text # noqa: F401
except ImportError:
# Python 2
pass
Expand Down Expand Up @@ -41,3 +41,18 @@ def resource_path(self, path):
# NotImplementedError so that if this method is accidentally called,
# it'll still do the right thing.
raise FileNotFoundError

@abstractmethod
def is_resource(self, path):
# type: (Text) -> bool
"""Return True if the named path is a resource.
Files are resources, directories are not.
"""
raise FileNotFoundError

@abstractmethod
def contents(self):
# type: () -> Iterator[str]
"""Return an iterator over the string contents of the resource."""
raise FileNotFoundError
Loading

0 comments on commit e82b567

Please sign in to comment.