forked from common-workflow-language/cwlviewer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
load.py
executable file
·118 lines (99 loc) · 3.19 KB
/
load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python3
import sys
import re
import time
import gzip
from urllib.parse import urljoin
import requests
import json
re.compile
isCommit = re.compile("^[0-9a-fA-F]{40}$")
HEADERS = {
'User-Agent': 'cwlviewer-load/0.0.1',
'Accept': 'application/json'
}
def parse_gitinfos(sourceFp):
doc = json.load(sourceFp)
for wf in doc["content"]:
yield wf["retrievedFrom"]
def is_not_commit(gitinfo):
return not isCommit.match(gitinfo["branch"])
def make_requests(gitinfos):
for git in gitinfos:
# dict_keys(['repoUrl', 'branch', 'path', 'packedId', 'url', 'rawUrl', 'type'])
req = {
"url": git["repoUrl"],
"branch": git["branch"],
"path": git["path"]
}
if git["packedId"]:
req["packedId"] = git["packedId"]
yield req
def send(base, req):
url = urljoin(base, "/workflows")
r = requests.post(url, data=req, allow_redirects=False, headers=HEADERS)
print("Posted: %s" % req)
if r.status_code == 202:
location = urljoin(url, r.headers["Location"])
print(" queued: %s" % location)
# need to check later
return location
if r.status_code == 303:
print (" done: %s" % r.headers["Location"])
return None # Already there, all OK
print("Unhandled HTTP status code: %s %s" %
(r.status_code, r.text))
def send_requests(base, requests):
for req in requests:
yield send(base, req)
def is_running(location):
if not location:
return True
queued = requests.get(location, allow_redirects=False, headers=HEADERS)
if queued.status_code == 303:
# Done!
return False
j = queued.json()
if j["cwltoolStatus"] == "RUNNING":
return True
elif j["cwltoolStatus"] == "ERROR":
print("Failed %s: %s" % (location, j["message"]))
return False
else:
raise Exception("Unhandled queue status: %s %s" % (queued.status_code, queued.text))
MAX_CONCURRENT=6 # Maximum number in queue
SLEEP=0.5 # wait SLEEP seconds if queue is full
def trim_queue(queue):
new_queue = []
for q in queue:
if is_running(q):
#print("Still running %s" % q)
new_queue.append(q)
print("Trimmed queue from %s to %s" % (len(queue), len(new_queue)))
return new_queue
def main(jsonfile="-", base="http://view.commonwl.org:8082/", *args):
if jsonfile == "-":
source = sys.stdin
elif jsonfile.endswith(".gz"):
source = gzip.open(jsonfile, "rb")
else:
source = open(jsonfile, "rb")
gitinfos = parse_gitinfos(source)
if "--no-commits" in args:
gitinfos = filter(is_not_commit, gitinfos)
requests = make_requests(gitinfos)
queued = []
for q in send_requests(base, requests):
if q:
queued.append(q)
while len(queued) >= MAX_CONCURRENT:
time.sleep(SLEEP)
queued = trim_queue(queued)
# Finish the rest of the queue
while queued:
queued = trim_queue(queued)
if __name__ == "__main__":
if "-h" in sys.argv:
print("load.py [jsonfile] [baseurl] [--no-commits]")
sys.exit(1)
main(*sys.argv[1:])