-
Notifications
You must be signed in to change notification settings - Fork 6
/
importer.py
146 lines (135 loc) · 5.32 KB
/
importer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""Implements an importer that looks only in specific path (ignoring
sys.path), and uses a per-path cache in addition to sys.modules. This is
necessary because test modules in different directories frequently have the
same names, which means that the first loaded would mask the rest when using
the builtin importer."""
import logging
import os
import sys
from nose.config import Config
from imp import find_module, load_module, acquire_lock, release_lock
log = logging.getLogger(__name__)
try:
_samefile = os.path.samefile
except AttributeError:
def _samefile(src, dst):
return (os.path.normcase(os.path.realpath(src)) ==
os.path.normcase(os.path.realpath(dst)))
class Importer(object):
"""An importer class that does only path-specific imports. That
is, the given module is not searched for on sys.path, but only at
the path or in the directory specified."""
def __init__(self, config=None):
if config is None:
config = Config()
self.config = config
def importFromPath(self, path, fqname):
"""Import a dotted-name package whose tail is at path. In other words,
given foo.bar and path/to/foo/bar.py, import foo from path/to/foo then
bar from path/to/foo/bar, returning bar."""
# find the base dir of the package
path_parts = os.path.normpath(os.path.abspath(path)).split(os.sep)
name_parts = fqname.split('.')
if path_parts[-1] == '__init__.py':
path_parts.pop()
path_parts = path_parts[:-(len(name_parts))]
dir_path = os.sep.join(path_parts)
return self.importFromDir(dir_path, fqname)
def importFromDir(self, dir, fqname):
"""Import a module *only* from path, ignoring sys.path and
reloading if the version in sys.modules is not the one we want."""
dir = os.path.normpath(os.path.abspath(dir))
log.debug("Import %s from %s", fqname, dir)
if fqname == '__main__':
return sys.modules[fqname]
if self.config.addPaths:
add_path(dir, self.config)
path = [dir]
parts = fqname.split('.')
part_fqname = ''
mod = parent = fh = None
for part in parts:
if part_fqname == '':
part_fqname = part
else:
part_fqname = "%s.%s" % (part_fqname, part)
try:
acquire_lock()
log.debug("find module part %s (%s) in %s",
part, part_fqname, path)
fh, filename, desc = find_module(part, path)
old = sys.modules.get(part_fqname)
if old is not None:
log.debug("sys.modules has %s as %s", part_fqname, old)
if (self.sameModule(old, filename)
or (self.config.firstPackageWins and
getattr(old, '__path__', None))):
mod = old
else:
del sys.modules[part_fqname]
mod = load_module(part_fqname, fh, filename, desc)
else:
mod = load_module(part_fqname, fh, filename, desc)
finally:
if fh:
fh.close()
release_lock()
if parent:
setattr(parent, part, mod)
if hasattr(mod, '__path__'):
path = mod.__path__
parent = mod
return mod
def _dirname_if_file(self, filename):
# We only take the dirname if we have a path to a non-dir,
# because taking the dirname of a symlink to a directory does not
# give the actual directory parent.
if os.path.isdir(filename):
return filename
else:
return os.path.dirname(filename)
def sameModule(self, mod, filename):
mod_paths = []
if hasattr(mod, '__path__'):
for path in mod.__path__:
mod_paths.append(self._dirname_if_file(path))
elif hasattr(mod, '__file__'):
mod_paths.append(self._dirname_if_file(mod.__file__))
else:
return False
new_path = self._dirname_if_file(filename)
for mod_path in mod_paths:
log.debug(
"module already loaded? mod: %s new: %s",
mod_path, new_path)
if _samefile(mod_path, new_path):
return True
return False
def add_path(path, config=None):
"""Ensure that the path, or the root of the current package (if
path is in a package), is in sys.path."""
log.debug('Add path %s' % path)
if not path:
return []
added = []
parent = os.path.dirname(path)
if (
parent
and os.path.exists(os.path.join(path, '__init__.py'))
):
added.extend(add_path(parent, config))
elif path not in sys.path:
log.debug("insert %s into sys.path", path)
sys.path.insert(0, path)
added.append(path)
if config and config.srcDirs:
for dirname in config.srcDirs:
dirpath = os.path.join(path, dirname)
if os.path.isdir(dirpath):
sys.path.insert(0, dirpath)
added.append(dirpath)
return added
def remove_path(path):
log.debug('Remove path %s' % path)
if path in sys.path:
sys.path.remove(path)