Skip to content

Commit

Permalink
Split out from_npe1 setuptools package inspection into new module (#…
Browse files Browse the repository at this point in the history
…206)

* tests working

* test coverage

* fix lint
  • Loading branch information
tlambert03 authored Jul 1, 2022
1 parent 77e4383 commit 33ce63f
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 95 deletions.
111 changes: 17 additions & 94 deletions npe2/_from_npe1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import sys
import warnings
from configparser import ConfigParser
from dataclasses import dataclass
from functools import lru_cache
from importlib import import_module, metadata
from logging import getLogger
Expand Down Expand Up @@ -34,6 +33,8 @@
from npe2.manifest.utils import SHIM_NAME_PREFIX, import_python_name, merge_manifests
from npe2.types import WidgetCreator

from ._setuputils import PackageInfo, get_package_dir_info

logger = getLogger(__name__)
NPE1_EP = "napari.plugin"
NPE2_EP = "napari.manifest"
Expand Down Expand Up @@ -75,33 +76,22 @@ def iter_hookimpls(
yield HookImplementation(method, module, plugin_name, **hookimpl_opts)


@dataclass
class PluginPackage:
package_name: str
ep_name: str
ep_value: str
top_module: str
setup_cfg: Optional[Path] = None


@lru_cache
def plugin_packages() -> List[PluginPackage]:
def plugin_packages() -> List[PackageInfo]:
"""List of all packages with napari entry points.
This is useful to help resolve naming issues (due to the terrible confusion
around *what* a npe1 plugin name actually was).
"""

packages = []
packages: List[PackageInfo] = []
for dist in metadata.distributions():
for ep in dist.entry_points:
if ep.group != NPE1_EP:
continue # pragma: no cover
top = dist.read_text("top_level.txt")
top = top.splitlines()[0] if top else ep.value.split(".")[0]
packages.append(
PluginPackage(dist.metadata["Name"], ep.name, ep.value, top)
)
packages.extend(
PackageInfo(package_name=dist.metadata["Name"], entry_points=[ep])
for ep in dist.entry_points
if ep.group == NPE1_EP
)

return packages


Expand Down Expand Up @@ -543,6 +533,12 @@ def convert_repository(

# get the info we need and create a manifest
info = get_package_dir_info(path)
if not (info.package_name and info._ep1):
msg = f'Could not detect first gen napari plugin package at "{path}".'
if info._ep2 is not None:
msg += f" Found a {NPE2_EP} entry_point. Is this package already converted?"
raise ValueError(msg)

manifest = manifest_from_npe1(info.package_name)
top_module = get_top_module_path(info.package_name, info.top_module)
if not top_module.is_dir():
Expand Down Expand Up @@ -579,7 +575,7 @@ def convert_repository(
return manifest, mf_path


def _write_new_setup_cfg_ep(info: PluginPackage, mf_name: str):
def _write_new_setup_cfg_ep(info: PackageInfo, mf_name: str):
assert info.setup_cfg
p = ConfigParser(comment_prefixes="/", allow_no_value=True) # preserve comments
p.read(info.setup_cfg)
Expand All @@ -599,79 +595,6 @@ def _write_new_setup_cfg_ep(info: PluginPackage, mf_name: str):
p.write(fh)


def get_package_dir_info(path: Union[Path, str]) -> PluginPackage:
"""Attempts to *statically* get plugin info from a package directory."""
path = Path(path).absolute()
if not path.is_dir(): # pragma: no cover
raise ValueError(f"Provided path is not a directory: {path}")

_name = None
_entry_points: List[List[str]] = []
_setup_cfg = None
p = None

# check for setup.cfg
setup_cfg = path / "setup.cfg"
if setup_cfg.exists():
_setup_cfg = setup_cfg
p = ConfigParser()
p.read(setup_cfg)
_name = p.get("metadata", "name", fallback=None)
eps = p.get("options.entry_points", NPE1_EP, fallback="").strip()
_entry_points = [[i.strip() for i in ep.split("=")] for ep in eps.splitlines()]

if not _name or not _entry_points:
# check for setup.py
setup_py = path / "setup.py"
if setup_py.exists():
node = ast.parse(setup_py.read_text())
visitor = _SetupVisitor()
visitor.visit(node)
_name = _name or visitor._name
if visitor._entry_points and not _entry_points:
_entry_points = visitor._entry_points
_setup_cfg = None # the ep metadata wasn't in setupcfg

if _name and _entry_points:
ep_name, ep_value = next(iter(_entry_points), ["", ""])
top_mod = ep_value.split(".", 1)[0]
return PluginPackage(_name, ep_name, ep_value, top_mod, _setup_cfg)

msg = f'Could not detect first gen napari plugin package at "{path}".'
if p is not None and p.get("options.entry_points", NPE2_EP, fallback=False):
msg += f" Found a {NPE2_EP} entry_point. Is this package already converted?"
raise ValueError(msg)


class _SetupVisitor(ast.NodeVisitor):
"""Visitor to statically determine metadata from setup.py"""

def __init__(self) -> None:
super().__init__()
self._name: str = ""
self._entry_points: List[List[str]] = [] # [[name, value], ...]

def visit_Call(self, node: ast.Call) -> Any:
if getattr(node.func, "id", "") != "setup":
return # pragma: no cover
for kw in node.keywords:
if kw.arg == "name":
self._name = getattr(kw.value, "value", "") or getattr(
kw.value, "id", ""
)

if kw.arg == "entry_points":
eps: dict = ast.literal_eval(kw.value)
for k, v in eps.items():
if k == NPE1_EP:
if type(v) is str:
v = [v]
for item in v:
self._entry_points.append(
[i.strip() for i in item.split("=")]
)


def _guess_fname_patterns(func):
"""Try to guess filename extension patterns from source code. Fallback to "*"."""

Expand Down
127 changes: 127 additions & 0 deletions npe2/_setuputils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import ast
from configparser import ConfigParser
from dataclasses import dataclass, field
from functools import cached_property
from importlib.metadata import EntryPoint
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

NPE1_EP = "napari.plugin"
NPE2_EP = "napari.manifest"


@dataclass
class PackageInfo:
src_root: Optional[Path] = None
package_name: str = ""
entry_points: List[EntryPoint] = field(default_factory=list)
setup_cfg: Optional[Path] = None
setup_py: Optional[Path] = None
pyproject_toml: Optional[Path] = None

# @property
# def packages(self) -> Optional[List[Path]]:
# return Path(self.top_module)

@cached_property
def _ep1(self) -> Optional[EntryPoint]:
return next((ep for ep in self.entry_points if ep.group == NPE1_EP), None)

@cached_property
def _ep2(self) -> Optional[EntryPoint]:
return next((ep for ep in self.entry_points if ep.group == NPE2_EP), None)

@property
def ep_name(self):
if ep := self._ep1:
return ep.name

@property
def ep_value(self):
if ep := self._ep1:
return ep.value

@property
def top_module(self) -> str:
if ep := (self._ep1 or self._ep2):
return ep.value.split(".", 1)[0].split(":", 1)[0]
return "" # pragma: no cover


def get_package_dir_info(path: Union[Path, str]) -> PackageInfo:
"""Attempt to *statically* get plugin info from a package directory."""
path = Path(path).resolve()
if not path.is_dir(): # pragma: no cover
raise ValueError(f"Provided path is not a directory: {path}")

info = PackageInfo(src_root=path)
p = None

# check for setup.cfg
setup_cfg = path / "setup.cfg"
if setup_cfg.exists():
info.setup_cfg = setup_cfg
p = ConfigParser()
p.read(setup_cfg)
info.package_name = p.get("metadata", "name", fallback="")
if p.has_section("options.entry_points"):
for group, val in p.items("options.entry_points"):
name, _, value = val.partition("=")
info.entry_points.append(EntryPoint(name.strip(), value.strip(), group))

# check for setup.py
setup_py = path / "setup.py"
if setup_py.exists():
info.setup_py = setup_py
node = ast.parse(setup_py.read_text())
visitor = _SetupVisitor()
visitor.visit(node)
info.package_name = visitor.get("name")
for group, vals in visitor.get("entry_points", {}).items():
for val in vals if isinstance(vals, list) else [vals]:
name, _, value = val.partition("=")
info.entry_points.append(EntryPoint(name.strip(), value.strip(), group))

return info


class _SetupVisitor(ast.NodeVisitor):
"""Visitor to statically determine metadata from setup.py"""

def __init__(self) -> None:
super().__init__()
self._names: Dict[str, Any] = {}
self._setup_kwargs: Dict[str, Any] = {}

def visit_Assign(self, node: ast.Assign) -> Any:
if len(node.targets) == 1:
target = node.targets[0]
if isinstance(target, ast.Name) and isinstance(target.ctx, ast.Store):
self._names[target.id] = self._get_val(node.value)

def visit_Call(self, node: ast.Call) -> Any:
if getattr(node.func, "id", "") == "setup":
for k in node.keywords:
key = k.arg
value = self._get_val(k.value)
self._setup_kwargs[str(key)] = value

def _get_val(self, node: Optional[ast.expr]) -> Any:
if isinstance(node, ast.Constant):
return node.value
if isinstance(node, ast.Name):
return (
self._names.get(node.id) if isinstance(node.ctx, ast.Load) else node.id
)
if isinstance(node, ast.Dict):
keys = [self._get_val(k) for k in node.keys]
values = [self._get_val(k) for k in node.values]
return dict(zip(keys, values))
if isinstance(node, ast.List):
return [self._get_val(k) for k in node.elts]
if isinstance(node, ast.Tuple): # pragma: no cover
return tuple(self._get_val(k) for k in node.elts)
return str(node) # pragma: no cover

def get(self, key: str, default: Any = None) -> Any:
return self._setup_kwargs.get(key, default)
3 changes: 2 additions & 1 deletion tests/test_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ def test_conversion_from_package_setup_py(npe1_repo, mock_npe1_pm_with_plugin):
(npe1_repo / "setup.py").write_text(
"""from setuptools import setup
NAME = 'npe1-plugin'
setup(
name='npe1-plugin',
name=NAME,
entry_points={"napari.plugin": ["npe1-plugin = npe1_module"]}
)
"""
Expand Down

0 comments on commit 33ce63f

Please sign in to comment.