-
Notifications
You must be signed in to change notification settings - Fork 25
/
gae_bingo.py
582 lines (447 loc) · 21.6 KB
/
gae_bingo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
import datetime
import hashlib
import logging
import re
import time
import urllib
from google.appengine.api import memcache
from google.appengine.ext import ndb
import cache
from .cache import BingoCache, BingoIdentityCache, bingo_and_identity_cache
from .models import create_experiment_and_alternatives, ConversionTypes
from .identity import can_control_experiments, identity
from .cookies import get_cookie_value
from .persist import PersistLock
# gae/bingo supports up to four alternatives per experiment due to
# synchronized_counter's limit of 4 synchronized counters per combination.
# See synchronized_counter.py for more.
MAX_ALTERNATIVES_PER_EXPERIMENT = 4
def create_unique_experiments(canonical_name,
alternative_params,
conversion_names,
conversion_types,
family_name,
unique_experiment_names,
bingo_cache,
experiments):
"""Once we have a lock, create all of the unique experiments.
canonical_name to family_name are all as in ab_test, except that
conversion_names, conversion_types must be lists.
unique_experiment_names are names unique to each experiment,
generated in ab_test.
bingo_cache and experiments are created in ab_test and passed to here,
giving the current bingo_cache and current cached list of experiments.
"""
if not(len(conversion_names) ==
len(conversion_types) ==
len(unique_experiment_names)):
# The arguments should be correct length, since ab_test ensures that.
# If they're not the same length, we don't know that ab_test ran
# successfully, so we should abort (we might not even have a lock!)
raise Exception("create_unique_experiments called with"
"arguments of mismatched length!")
for i in range(len(conversion_names)):
# We don't want to create a unique_experiment more than once
# (note: it's fine to add experiments to one canonical name,
# which is how we can have one experiment with multiple conversions)
if unique_experiment_names[i] not in experiments:
exp, alts = create_experiment_and_alternatives(
unique_experiment_names[i],
canonical_name,
alternative_params,
conversion_names[i],
conversion_types[i],
family_name)
bingo_cache.add_experiment(exp, alts)
bingo_cache.store_if_dirty()
@ndb.tasklet
def participate_in_experiments_async(experiments,
alternative_lists,
bingo_identity_cache):
""" Given a list of experiments (with unique names), alternatives for each,
and an identity cache:
--Enroll the current user in each experiment
--return a value indicating which bucket a user is sorted into
(this will be one of the entries in alternative_lists)
"""
returned_content = [None]
@ndb.tasklet
def participate_async(experiment, alternatives):
if not experiment.live:
# Experiment has ended. Short-circuit and use selected winner
# before user has had a chance to remove relevant ab_test code.
returned_content[0] = experiment.short_circuit_content
else:
alternative = _find_alternative_for_user(experiment,
alternatives)
if experiment.name not in bingo_identity_cache.participating_tests:
if (yield alternative.increment_participants_async()):
bingo_identity_cache.participate_in(experiment.name)
# It shouldn't matter which experiment's alternative content
# we send back -- alternative N should be the same across
# all experiments w/ same canonical name.
returned_content[0] = alternative.content
yield [participate_async(e, a)
for e, a in zip(experiments, alternative_lists)]
raise ndb.Return(returned_content[0])
def participate_in_experiments(*args):
return participate_in_experiments_async(*args).get_result()
def ab_test(canonical_name,
alternative_params = None,
conversion_name = None,
conversion_type = ConversionTypes.Binary,
family_name = None):
if (alternative_params is not None and
len(alternative_params) > MAX_ALTERNATIVES_PER_EXPERIMENT):
raise Exception("Cannot ab test with more than 4 alternatives")
bingo_cache, bingo_identity_cache = bingo_and_identity_cache()
# Make sure our conversion names and types are lists so that
# we can more simply create one experiment for each one later.
if isinstance(conversion_name, list):
conversion_names = conversion_name
else:
conversion_names = [conversion_name]
if isinstance(conversion_type, list):
conversion_types = conversion_type
else:
conversion_types = [conversion_type] * len(conversion_names)
# Unique name will have both canonical name and conversion.
# This way, order of arguments in input list doesn't matter and
# we still have unique experiment names.
unique_experiment_names = ["%s (%s)" % (canonical_name, conv)
if conv != None else canonical_name for conv in conversion_names]
# Only create the experiment if it's necessary
if any([conv not in bingo_cache.experiments
for conv in unique_experiment_names]):
# Creation logic w/ high concurrency protection
client = memcache.Client()
lock_key = "_gae_bingo_test_creation_lock"
got_lock = False
try:
# Make sure only one experiment gets created
while not got_lock:
locked = client.gets(lock_key)
while locked is None:
# Initialize the lock if necessary
client.set(lock_key, False)
locked = client.gets(lock_key)
if not locked:
# Lock looks available, try to take it with compare
# and set (expiration of 10 seconds)
got_lock = client.cas(lock_key, True, time=10)
if not got_lock:
# If we didn't get it, wait a bit and try again
time.sleep(0.1)
# We have the lock, go ahead and create the experiment
experiments = BingoCache.get().experiments
if len(conversion_names) != len(conversion_types):
# we were called improperly with mismatched lists lengths.
# Default everything to Binary
logging.warning("ab_test(%s) called with lists of mismatched"
"length. Defaulting all conversions to binary!"
% canonical_name)
conversion_types = ([ConversionTypes.Binary] *
len(conversion_names))
# Handle multiple conversions for a single experiment by just
# quietly creating multiple experiments (one for each conversion).
create_unique_experiments(canonical_name,
alternative_params,
conversion_names,
conversion_types,
family_name,
unique_experiment_names,
bingo_cache,
experiments)
finally:
if got_lock:
# Release the lock
client.set(lock_key, False)
# We might have multiple experiments connected to this single canonical
# experiment name if it was started w/ multiple conversion possibilities.
experiments, alternative_lists = (
bingo_cache.experiments_and_alternatives_from_canonical_name(
canonical_name))
if not experiments or not alternative_lists:
raise Exception(
"Could not find experiment or alternatives with experiment_name %s"
% canonical_name)
return participate_in_experiments(experiments,
alternative_lists,
bingo_identity_cache)
def bingo(param, identity_val=None):
bingo_async(param, identity_val).get_result()
@ndb.tasklet
def bingo_async(param, identity_val=None):
if isinstance(param, list):
# Bingo for all conversions in list
yield [bingo_async(conversion_name, identity_val)
for conversion_name in param]
else:
conv_name = str(param)
bingo_cache = BingoCache.get()
experiments = bingo_cache.get_experiment_names_by_conversion_name(
conv_name)
# Bingo for all experiments associated with this conversion
yield [score_conversion_async(e, identity_val) for e in experiments]
@ndb.tasklet
def score_conversion_async(experiment_name, identity_val=None):
bingo_cache, bingo_identity_cache = bingo_and_identity_cache(identity_val)
if experiment_name not in bingo_identity_cache.participating_tests:
return
experiment = bingo_cache.get_experiment(experiment_name)
if not experiment or not experiment.live:
# Don't count conversions for short-circuited
# experiments that are no longer live
return
if (experiment.conversion_type != ConversionTypes.Counting and
experiment_name in bingo_identity_cache.converted_tests):
# Only allow multiple conversions for
# ConversionTypes.Counting experiments
return
alternative = _find_alternative_for_user(
experiment,
bingo_cache.get_alternatives(experiment_name),
identity_val)
# TODO(kamens): remove this! Temporary protection from an experiment that
# has more than 4 alternatives while we migrate to the new gae/bingo
# alternative restriction.
if alternative.number >= 4:
return
if (yield alternative.increment_conversions_async()):
bingo_identity_cache.convert_in(experiment_name)
class ExperimentModificationException(Exception):
"""An exception raised when calls to control or modify an experiment
is unable to do so safely due to contention with background tasks.
If there is too much contention between mutating an experiment and
constantly running persist tasks, this exception is raised.
See ExperimentController for more details.
"""
pass
class ExperimentController(object):
"""A context that can be used to build monitors to modify experiments.
Since modifications of the bingo data need to happen atomically across
multiple items, the constantly running persist tasks could interfere with
clients attempting to do control operations that modify experiments.
Use this in conjunction with a with statement before calling any
experiment modifying methods. This context will also flush the bingo
cache on exit.
"""
_lock_set = False
def __enter__(self):
self.lock = PersistLock()
if not self.lock.spin_and_take():
raise ExperimentModificationException(
"Unable to acquire lock to modify experiments")
ExperimentController._lock_set = True
def __exit__(self, exc_type, exc_value, traceback):
# Forcefully flush the cache, since this must be done inside of
# the monitor. The mutation methods (e.g. choose_alternative) are
# implemented in such a way that they rely on the gae/bingo middleware
# to flush the data. But by that point the lock will have been released
cache.store_if_dirty()
ExperimentController._lock_set = False
logging.info(
"Exiting monitor from ExperimentController. About to "
"release the lock (current value: [%s])" %
self.lock.is_active())
self.lock.release()
@staticmethod
def assert_safe():
"""Assert that caller is in a monitor that can modify experiments."""
if not ExperimentController._lock_set:
raise ExperimentModificationException(
"Attempting to modify experiment outside of monitor. "
"Use with ExperimentController(): ... around "
"your snippet.")
def choose_alternative(canonical_name, alternative_number):
ExperimentController.assert_safe()
bingo_cache = BingoCache.get()
# Need to end all experiments that may have been kicked off
# by an experiment with multiple conversions
experiments, alternative_lists = (
bingo_cache.experiments_and_alternatives_from_canonical_name(
canonical_name))
if not experiments or not alternative_lists:
return
for i in range(len(experiments)):
experiment, alternatives = experiments[i], alternative_lists[i]
alternative_chosen = filter(
lambda alt: alt.number == alternative_number,
alternatives)
if len(alternative_chosen) == 1:
experiment.live = False
experiment.set_short_circuit_content(
alternative_chosen[0].content)
bingo_cache.update_experiment(experiment)
else:
logging.warning(
"Skipping choose alternative for %s (chosen: %s)" %
(experiment.name, alternative_chosen))
def delete_experiment(canonical_name, retrieve_archives=False):
ExperimentController.assert_safe()
if retrieve_archives:
bingo_cache = BingoCache.load_from_datastore(archives=True)
else:
bingo_cache = BingoCache.get()
# Need to delete all experiments that may have been kicked off
# by an experiment with multiple conversions
experiments, alternative_lists = (
bingo_cache.experiments_and_alternatives_from_canonical_name(
canonical_name))
if not experiments or not alternative_lists:
return
for experiment in experiments:
bingo_cache.delete_experiment_and_alternatives(experiment)
def archive_experiment(canonical_name):
"""Archive named experiment permanently, removing it from active cache."""
ExperimentController.assert_safe()
bingo_cache = BingoCache.get()
# Need to archive all experiments that may have been kicked off
# by an experiment with multiple conversions
experiments, alternative_lists = (
bingo_cache.experiments_and_alternatives_from_canonical_name(
canonical_name))
if not experiments or not alternative_lists:
logging.error("Can't find experiments named %s" % canonical_name)
return
for experiment in experiments:
if not experiment:
logging.error("Found empty experiment under %s" % canonical_name)
else:
logging.info("Archiving %s" % experiment.name)
bingo_cache.archive_experiment_and_alternatives(experiment)
def resume_experiment(canonical_name):
ExperimentController.assert_safe()
bingo_cache = BingoCache.get()
# Need to resume all experiments that may have been kicked off
# by an experiment with multiple conversions
experiments, alternative_lists = (
bingo_cache.experiments_and_alternatives_from_canonical_name(
canonical_name))
if not experiments or not alternative_lists:
return
for experiment in experiments:
experiment.live = True
bingo_cache.update_experiment(experiment)
def get_experiment_participation(identity_val=None):
"""Get the the experiments and alternatives the user participated in.
Returns a dict of canonical name: alternative for every experiment that
this user participated in, even if the experiment has ended.
"""
bingo_cache, bingo_identity_cache = bingo_and_identity_cache(identity_val)
tests = bingo_identity_cache.participating_tests
# HACK: tests is actually a list of conversions, so try to reduce them to
# canonical names. Just use the full name if there's no paren.
expts = set()
for t in tests:
i = t.rfind(" (")
expts.add(t if i == -1 else t[0:i])
# now get the alternative this user is participating in, as long as it is
# actually a canonical name (just skip the ones that are not)
return {e: find_alternative_for_user(e, identity_val) for e in expts
if e in bingo_cache.experiment_names_by_canonical_name}
def find_alternative_for_user(canonical_name, identity_val):
""" Returns the alternative that the specified bingo identity belongs to.
If the experiment does not exist, this will return None.
If the experiment has ended, this will return the chosen alternative.
Note that the user may not have been opted into the experiment yet - this
is just a way to probe what alternative will be selected, or has been
selected for the user without causing side effects.
If an experiment has multiple instances (because it was created with
different alternative sets), will operate on the last experiment.
canonical_name -- the canonical name of the experiment
identity_val -- a string or instance of GAEBingoIdentity
"""
bingo_cache = BingoCache.get()
experiment_names = bingo_cache.get_experiment_names_by_canonical_name(
canonical_name)
if not experiment_names:
return None
experiment_name = experiment_names[-1]
experiment = bingo_cache.get_experiment(experiment_name)
if not experiment:
return None
if not experiment.live:
# Experiment has ended - return result that was selected.
return experiment.short_circuit_content
return _find_alternative_for_user(experiment,
bingo_cache.get_alternatives(experiment_name),
identity_val).content
def find_cookie_val_for_user(experiment_name):
""" For gae_bingo admins, return the value of a cookie associated with the
given experiment name. """
if not can_control_experiments():
return None
# This escaping must be consistent with what's done in
# static/js/dashboard.js
cookie_val = get_cookie_value(
"GAEBingo_%s" % re.sub(r'\W', '+', experiment_name))
if not cookie_val:
return None
return int(cookie_val)
def find_cookie_alt_param_for_user(experiment_name, alternative_params):
""" If gae_bingo administrator, allow possible override of alternative.
Return the cookie value set when gae_bingo adminstrators click the
"preview" button for an experiment alternative in the gae_bingo dashboard.
"""
index = find_cookie_val_for_user(experiment_name)
if index is None or index >= len(alternative_params):
return None
return alternative_params[index]
def _find_cookie_alternative_for_user(experiment, alternatives):
index = find_cookie_val_for_user(experiment.hashable_name)
if index is None:
return None
return next((x for x in alternatives if x.number == index), None)
def _find_alternative_for_user(experiment,
alternatives,
identity_val=None):
return (_find_cookie_alternative_for_user(experiment, alternatives) or
modulo_choose(experiment, alternatives, identity(identity_val)))
def modulo_choose(experiment, alternatives, identity):
alternatives_weight = sum(map(lambda alt: alt.weight, alternatives))
sig = hashlib.md5(experiment.hashable_name + str(identity)).hexdigest()
sig_num = int(sig, base=16)
index_weight = sig_num % alternatives_weight
current_weight = alternatives_weight
# TODO(eliana) remove once current expts end
if experiment.dt_started > datetime.datetime(2013, 3, 26, 18, 0, 0, 0):
sorter = lambda alt: (alt.weight, alt.number)
else:
sorter = lambda alt: alt.weight
for alternative in sorted(alternatives,
key=sorter,
reverse=True):
current_weight -= alternative.weight
if index_weight >= current_weight:
return alternative
def create_redirect_url(destination, conversion_names):
""" Create a URL that redirects to destination after scoring conversions
in all listed conversion names
"""
result = "/gae_bingo/redirect?continue=%s" % urllib.quote(destination)
if type(conversion_names) != list:
conversion_names = [conversion_names]
for conversion_name in conversion_names:
result += "&conversion_name=%s" % urllib.quote(conversion_name)
return result
def _iri_to_uri(iri):
"""Convert an Internationalized Resource Identifier (IRI) for use in a URL.
This function follows the algorithm from section 3.1 of RFC 3987 and is
idempotent, iri_to_uri(iri_to_uri(s)) == iri_to_uri(s)
Args:
iri: A unicode string.
Returns:
An ASCII string with the encoded result. If iri is not unicode it
is returned unmodified.
"""
# Implementation heavily inspired by django.utils.encoding.iri_to_uri()
# for its simplicity. We make the further assumption that the incoming
# argument is a unicode string or is ignored.
#
# See also werkzeug.urls.iri_to_uri() for a more complete handling of
# internationalized domain names.
if isinstance(iri, unicode):
byte_string = iri.encode("utf-8")
return urllib.quote(byte_string, safe="/#%[]=:;$&()+,!?*@'~")
return iri