-
Notifications
You must be signed in to change notification settings - Fork 4
/
observed_test.py
338 lines (267 loc) · 10.5 KB
/
observed_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import itertools
import observed
from observed import observable_function, observable_method
import pytest
def get_caller_name(caller):
"""Find the name of a calling (i.e. observed) object.
Args:
caller: The observed object which is calling an observer.
Returns:
The name of the caller. If the caller is a function we return that
function's .__name__. If the caller is a bound method we return the
name of bound object.
"""
if hasattr(caller, "__self__"):
# caller is a Foo instance
name = caller.__self__.name
else:
# caller is a function.
name = caller.__name__
return name
def clear_list(l):
"""Remove all entries from a list in place.
Args:
l: The list to be cleared.
"""
while True:
try:
l.pop(0)
except IndexError:
break
class Foo(object):
"""A class with some observable methods and some normal methods.
Attributes:
name - string: A string naming the instance.
buf - list: A buffer for collecting a record of called methods. My
methods are set up to append a string to buf when they are called.
The strings are formatted in one of two ways. If the method being
called accepts the observed object as a first argument, the string
is:
"%s%s%s"%(self.name, method_name, caller_name)
where method_name is the name of the method being called on me and
caller_name is the name of the calling Foo instance or the name of
the calling function. Note that the name of the calling instance is
NOT the same thing as the name of the calling method, which doesn't
exist. If the method being called does not accept the caller as a
first argument, the string written to buf is:
"%s%s"%(self.name, method_name).
"""
def __init__(self, name, buf):
"""Initialize a Foo object.
Args:
name: A name for this instance.
buf: A buffer (list) into which I write data when my methods are
called. See the class docstring for details.
"""
self.name = name
self.buf = buf
@observable_method()
def bar(self):
self.buf.append("%sbar"%(self.name,))
def baz(self):
self.buf.append("%sbaz"%(self.name,))
@observable_method()
def milton(self, caller):
caller_name = get_caller_name(caller)
self.buf.append("%smilton%s"%(self.name, caller_name))
def waldo(self, caller):
caller_name = get_caller_name(caller)
self.buf.append("%swaldo%s"%(self.name, caller_name))
def observable_methods(self):
"""Get all of this object's observable methods.
We don't include milton because our testing procedure isn't smart
enough to know to call it with an argument.
Returns:
A list of the names of my observable methods.
"""
return [self.bar]
def method_info(self):
"""Get a list of method names and whether they want caller id.
Returns:
A list of tuples. Each tuple is
(str: method name, bool: identify_observed).
"""
return [(self.bar, False),
(self.baz, False),
(self.milton, True),
(self.waldo, True)]
class Goo(Foo):
"""Same as Foo but using the descriptor strategy for observer persistence.
I am entirely similar to Foo except that my observable methods use the
descriptor persistence strategy. See the docstring for observable_method
for a detailed explanation.
"""
def bar(self):
self.buf.append("%sbar"%(self.name,))
bar = observed.get_observable_method(bar, strategy='descriptor')
def milton(self, caller):
caller_name = get_caller_name(caller)
self.buf.append("%smilton%s"%(self.name, caller_name))
milton = observed.get_observable_method(milton, strategy='descriptor')
def get_observables(*objs):
"""Get a list observables from some objects.
Args:
Any number of objects. Each object must be either a Foo (or subclass)
instance or a function.
Returns:
A list of observable things, either functions or methods bound to the
object. Each function passed in as an argument is placed directly into
the returned list. For each Foo instance passed in, we get each of that
instance's observable methods and place each one in the output list.
"""
observables = []
for obj in objs:
if isinstance(obj, Foo):
observables.extend(obj.observable_methods())
elif isinstance(obj, observed.ObservableFunction):
observables.append(obj)
else:
raise TypeError("Object of type %s not observable"%(type(obj),))
return observables
def get_observer_sets(*objs):
"""Get observers from a set of objects.
Returns:
A list of tuples. Each tuple is an observer and a boolean corresponding
to the value of identify_observed which should be used when registering
that observer.
"""
observer_sets = []
single_observers = []
for obj in objs:
if isinstance(obj, Foo):
single_observers.extend(obj.method_info())
else:
single_observers.append((obj[0], obj[1]))
# for num_observers in range(len(single_observers)):
for comb in itertools.combinations(single_observers, 3):
observer_sets.append(comb)
return observer_sets
def get_items(observables, observer_sets):
"""Get all combinations of observer/observed and expected test data.
Returns:
A list of tuples. Each tuple contains:
an observable object
a list of (observer, identify_observed) tuples
expected buffer data for this combination
expected buffer data for calling the obsevable after all observers
have been un-registered.
"""
def get_buff_data(observable, observer, identify_observed):
"""Get the buffer data an object will write."""
if hasattr(observer, '__self__'):
expected = observer.__self__.name+observer.__name__
else:
expected = observer.__name__
if identify_observed:
expected = expected + get_caller_name(observable)
return expected
items = []
for observable, observer_set in itertools.product(observables, observer_sets):
# Don't include this combination if it would cause infinite recursion.
recursion = False
for observer, _ in observer_set:
if type(observer) == type(observable) and observer == observable:
recursion = True
if recursion:
continue
expected_buf = []
if isinstance(observable, observed.ObservableBoundMethod):
final = observable.__self__.name + observable.__name__
elif isinstance(observable, observed.ObservableFunction):
final = observable.__name__
for observer, caller_id in observer_set:
expected_buf.append(get_buff_data(observable, observer, caller_id))
expected_buf.insert(0, final)
expected_buf.sort()
items.append((observable, observer_set, expected_buf, [final]))
return items
class TestBasics:
"""Test that observers are called when the observed object is called."""
@classmethod
def setup_method(self):
self.buf = []
@classmethod
def teardown_class(self):
pass
def test_callbacks(self):
"""
Test all combinations of types acting as observed and observer.
For each combination of observed and observer we check that all
observers are called. We also check that after discarding the
observers subsequent invocations of the observed object do not call
any observers.
"""
a = Foo('a', self.buf)
b = Foo('b', self.buf)
c = Goo('c', self.buf)
d = Goo('d', self.buf)
@observable_function
def f():
self.buf.append('f')
@observable_function
def g(caller):
self.buf.append('g%s'%(get_caller_name(caller),))
# We don't include g in our set of observables because the testing
# code isn't smart enough to call it with an argument.
observables = get_observables(a, b, c, d, f)
observer_sets = get_observer_sets(a, b, c, d, (f, False), (g, True))
items = get_items(observables, observer_sets)
for observed, observer_set, expected_buf, final_buf in items:
for observer, identify_observed in observer_set:
observed.add_observer(observer,
identify_observed=identify_observed)
observed()
self.buf.sort()
assert self.buf == expected_buf
clear_list(self.buf)
for observer, _ in observer_set:
observed.discard_observer(observer)
observed()
assert self.buf == final_buf
clear_list(self.buf)
def test_discard(self):
"""Test that discard_observer prevents future ivocation."""
a = Foo('a', self.buf)
def f():
self.buf.append('f')
a.bar.add_observer(f)
result = a.bar.discard_observer(f)
assert result == True
result = a.bar.discard_observer(f)
assert result == False
a.bar()
assert self.buf == ['abar']
def test_unbound_method(self):
"""Test that calling an unbound method invokes observers."""
buf = []
f = Foo('f', buf)
def func():
buf.append('func')
f.bar.add_observer(func)
Foo.bar(f)
assert buf == ['fbar', 'func']
def test_equality(self):
"""Test equality of observable bound methods."""
f = Foo('f', self.buf)
g = Foo('g', self.buf)
@observable_function
def func():
self.buf.append('func')
assert Foo.bar == Foo.bar
assert f.bar == f.bar
assert f.bar != g.bar
assert func == func
def test_caller_identification(self):
"""The observed object can pass itself as first argument."""
buf = []
a = Foo('a', buf)
@observable_function
def f():
buf.append('f')
def g(caller):
buf.append('g%s'%(caller.__name__,))
f.add_observer(g, identify_observed=True)
f.add_observer(a.milton, identify_observed=True)
f()
buf.sort()
assert buf == ['amiltonf','f', 'gf']