-
Notifications
You must be signed in to change notification settings - Fork 27
/
workload-executor.py
87 lines (72 loc) · 2.91 KB
/
workload-executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import inspect
import json
import os
import signal
import sys
import time
from collections import defaultdict
import pymongo
# we insert this path into the sys.path because we want to be able to import
# test.unified_format directly from the git repo that was used to install
# pymongo (this works because pymongo is installed with pip flag -e)
test_path = os.path.dirname(os.path.dirname(inspect.getfile(pymongo)))
sys.path.insert(0, test_path)
from test.unified_format import UnifiedSpecTestMixinV1, interrupt_loop # noqa: E402
WIN32 = sys.platform in ("win32", "cygwin")
def interrupt_handler(signum, frame):
interrupt_loop()
if WIN32:
# CTRL_BREAK_EVENT is mapped to SIGBREAK
signal.signal(signal.SIGBREAK, interrupt_handler)
else:
signal.signal(signal.SIGINT, interrupt_handler)
def workload_runner(mongodb_uri, test_workload):
runner = UnifiedSpecTestMixinV1()
runner.TEST_SPEC = test_workload
UnifiedSpecTestMixinV1.TEST_SPEC = test_workload
runner.setUp()
# this is necessary because there isn't a mongo instance on
# localhost:27017 on evergreen, so we have to patch it to use the client
# specified in the workload runner
runner.client = pymongo.MongoClient(mongodb_uri)
try:
assert len(test_workload["tests"]) == 1
runner.run_scenario(test_workload["tests"][0], uri=mongodb_uri)
except Exception as exc:
runner.entity_map["errors"] = [
{"error": str(exc), "time": time.time(), "type": type(exc).__name__}
]
entity_map = defaultdict(list, runner.entity_map._entities)
for entity_type in ["successes", "iterations"]:
if entity_type not in entity_map:
entity_map[entity_type] = -1
results = {
"numErrors": len(entity_map["errors"]),
"numFailures": len(entity_map["failures"]),
"numSuccesses": entity_map["successes"],
"numIterations": entity_map["iterations"],
}
print(f"Workload statistics: {results!r}")
events = {
"events": entity_map["events"],
"errors": entity_map["errors"],
"failures": entity_map["failures"],
}
cur_dir = os.path.abspath(os.curdir)
print(f"Writing statistics to directory {cur_dir!r}")
with open("results.json", "w") as fr:
json.dump(results, fr)
with open("events.json", "w") as fr:
json.dump(events, fr)
if __name__ == "__main__":
connection_string, driver_workload = sys.argv[1], sys.argv[2]
try:
workload_spec = json.loads(driver_workload)
except json.decoder.JSONDecodeError:
# We also support passing in a raw workload YAML file to this
# script to make it easy to run the script in debug mode.
# PyYAML is imported locally to avoid ImportErrors on EVG.
import yaml
with open(driver_workload) as fp:
workload_spec = yaml.safe_load(fp, Loader=yaml.FullLoader)
workload_runner(connection_string, workload_spec)