-
Notifications
You must be signed in to change notification settings - Fork 20
/
manager.py
250 lines (215 loc) · 8.16 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import importlib
import importlib.util
import logging
import os
import re
import sys
import time
import traceback
VALID_NAME = re.compile(r'[a-z][a-z0-9_]*', re.ASCII + re.IGNORECASE)
def report_exception(src=''):
t, ex, tb = sys.exc_info()
lines = traceback.format_exception(t, ex, tb.tb_next)
lines = ''.join(lines[1:])
logging.error('exception in {}:\n{}'.format(src, lines))
class Manager(object):
def __init__(self, path, refs):
self.path = path
self.refs = refs
self.config = refs.config
if not refs.config.has_section('manager'):
refs.config.add_section('manager')
self.period = self.config['manager'].getint('period', 3)
self.priority = self.config['manager'].get('priority', '').split()
self._active = {}
self._mtimes = {}
self._lastload = 0
def run(self):
if time.time() < self._lastload + self.period:
return
self._lastload = time.time()
self.reload()
def _reload(self, fname):
fpath = os.path.join(self.path, fname)
mname, mext = os.path.splitext(fname)
if mext != '.py': # not a module
return False
if VALID_NAME.fullmatch(mname) is None: # bad file name
return False
# avoid any name/keyword collisions
mname = 'plugin_' + mname
# disable current version, if any
if mname in self._active:
del self._active[mname]
# every plugin has a config section
if not self.config.has_section(mname):
self.config.add_section(mname)
# check enabled status so that plugins don't need to
if not self.config.getboolean(mname, 'enabled', fallback=True):
return False
# attempt to load the module
try:
spec = importlib.util.spec_from_file_location(mname, fpath)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
self._active[mname] = module.Plugin(self.refs, self.config[mname])
except Exception:
report_exception(mname)
return False
# run custom init
if self._runCallback(mname, 'onInit') is False:
return False
logging.info('loaded {}'.format(mname))
return True
def reload(self):
# check config file
if os.path.exists(self.refs.CONFIGFILE):
cmt = os.stat(self.refs.CONFIGFILE).st_mtime
if 'config' not in self._mtimes or self._mtimes['config'] != cmt:
self.config.read(self.refs.CONFIGFILE)
self._mtimes['config'] = cmt
# check if enabled state was changed
for name in self.config.sections():
if not name.startswith('plugin_'):
continue
en = self.config.getboolean(name, 'enabled', fallback=True)
if en is False and name in self._active:
# was disabled
del self._active[name]
elif en is True and name not in self._active:
# was enabled, force reload on next check.
# does not matter on first load, but will keep trying
# failing plugins whenever config file is edited
# (possible log spam).
self._mtimes[name[7:] + '.py'] = 0
# reset config caches to update values on next access
for plug in self._active.values():
plug.config._cache = {}
logging.info('loaded config')
# check plugins
for entry in os.scandir(self.path):
try:
if entry.is_file() is False:
continue
mtime = entry.stat().st_mtime
except Exception:
continue # ignore files with permission problems
fname = entry.name
if fname in self._mtimes and self._mtimes[fname] == mtime:
continue # not modified since last check
self._mtimes[fname] = mtime
self._reload(fname)
def _runCallback(self, mname, cbname):
try:
getattr(self._active[mname], cbname)()
return True
except Exception:
report_exception('{}.{}'.format(mname, cbname))
del self._active[mname]
return False
def runCallbacks(self, cbname):
active = set(self._active.keys())
for mname in self.priority:
if mname not in active:
continue
active.remove(mname)
self._runCallback(mname, cbname)
for mname in active:
self._runCallback(mname, cbname)
class PluginBase(object):
def __init__(self, refs, config):
self.refs = refs
self.config = PluginConfig(config)
def onInit(self):
'''plugin-specific init code'''
pass
def beforeUpdate(self):
'''just before the game processes events'''
pass
def afterUpdate(self):
'''immediately after events'''
pass
def onPresent(self):
'''after the game renders a frame'''
pass
DEFAULTS = {
'str': '',
'int': 0,
'float': 0.0,
'bool': False,
'color': 0
}
class PluginConfig():
def __init__(self, section):
self._section = section
self._schema = {}
self._cache = {}
def option(self, name, default=None, type='str'):
if type not in DEFAULTS:
raise TypeError('unknown option type')
if default is None:
default = DEFAULTS[type]
self._schema[name] = (default, type)
if name not in self._section:
self._putvalue(name, default)
def options(self, type, defdict):
for (name, default) in defdict.items():
self.option(name, default, type)
def __getattr__(self, name):
try:
return self._cache[name]
except KeyError:
if name not in self._schema:
raise AttributeError('unknown option name')
val = self._getvalue(name)
self._cache[name] = val
return val
def __setattr__(self, name, value):
if name[0] == '_' or name not in self._schema:
object.__setattr__(self, name, value)
return
self._putvalue(name, value)
# round-trip to ensure correct type
self._cache[name] = self._getvalue(name)
def __getitem__(self, key):
return self.__getattr__(key)
def _getvalue(self, name):
default, t = self._schema[name]
try:
if t == 'str':
return self._section[name]
elif t == 'bool':
return self._section.getboolean(name)
elif t == 'int':
return self._section.getint(name)
elif t == 'float':
return self._section.getfloat(name)
elif t == 'color':
val = self._section[name]
if len(val) == 3: # css-style short format
val = ''.join([x + x for x in val])
if len(val) == 6:
val = 'ff' + val
return int('0x' + val, 16)
except Exception:
report_exception('config.{}, using "{}"'.format(
name, default))
self._putvalue(name, default) # defaults must always work
return default
def _putvalue(self, name, value):
default, t = self._schema[name]
try:
if t == 'str' or t == 'int' or t == 'float':
self._section[name] = str(value)
elif t == 'bool':
self._section[name] = 'yes' if value else 'no'
elif t == 'color':
if value == 0:
self._section[name] = '0'
if value >> 24 >= 0xff:
value = value & 0xffffff
self._section[name] = '{:06x}'.format(value)
except Exception:
report_exception('config.{} = {}, using "{}"'.format(
name, value, default))
self._putvalue(name, default) # defaults must always work