-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
137 lines (104 loc) · 3.68 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
from flask import Flask, request
import jwt
import requests
from yaml import load, SafeLoader
app = Flask(__name__)
@app.route("/health")
def health():
return dict(message="ok")
@app.route("/webhook", methods=["POST"])
def webhook():
data = {
"head_sha": request.json["pull_request"]["head"]["sha"],
"number": request.json["pull_request"]["number"],
"full_name": request.json["pull_request"]["head"]["repo"]["full_name"],
"user": request.json["pull_request"]["user"]["login"],
"token": GitHubAuthentication(config).auth_token(),
}
create = check_run_create(data)
files = config["files"]
owners = config["owners"]
# match files modified in the pr against protected files
protected_files = []
for i in pr_files(data):
if i["filename"] in files:
protected_files.append(i["filename"])
# if workflows found, validate against owners. If the owner is not in the
# owners list, fail the check run and provide messaging. If no workflows
# found, skip the check run.
if len(protected_files) > 0:
if data["user"] in owners:
check_run_update(create["id"], "success", data, protected_files)
else:
check_run_update(create["id"], "failure", data, protected_files)
else:
check_run_update(create["id"], "skipped", data)
return "200"
def load_yaml(file):
with open(file, "r") as stream:
return load(stream, Loader=SafeLoader)
def header(token):
return {"Authorization": f"Bearer {token}"}
def pr_files(meta):
return requests.get(
f'{config["url"]}/repos/{meta["full_name"]}/pulls/{meta["number"]}/files',
headers=header(meta["token"]),
).json()
def check_run_create(meta):
return requests.post(
f'{config["url"]}/repos/{meta["full_name"]}/check-runs',
json={
"name": config["name"],
"head_sha": f'{meta["head_sha"]}',
"status": "in_progress",
},
headers=header(meta["token"]),
).json()
def check_run_update(check_run_id, conclusion, meta, files=None):
if conclusion == "success":
summary = f'''
{meta["user"]} is authorized to modify protected files in this pull request:
`{*files,}`
'''
elif conclusion == "failure":
summary = f'''
{meta["user"]} is not authorized to modify protected files in this pull request:
`{*files,}`
'''
elif conclusion == "skipped":
summary = f'''
No protected files found in this pull request.
'''
data = {
"status": "completed",
"name": "Protected Files",
"output": {
"title": "Protected Files",
"summary": summary,
},
"conclusion": conclusion
}
return requests.patch(
f'{config["url"]}/repos/{meta["full_name"]}/check-runs/{check_run_id}',
json=data,
headers=header(meta["token"]),
).json()
class GitHubAuthentication:
def __init__(self, config):
for attr, value in config.items():
setattr(self, attr, value)
def auth_token(self, expiration=60):
pem = open(self.pem, "r").read()
now = int(time.time())
payload = {"iat": now, "exp": now + expiration, "iss": self.app_id}
jwt_token = jwt.encode(payload, key=pem, algorithm="RS256")
req = requests.post(
f"{self.url}/app/installations/{self.installation_id}/access_tokens",
headers=header(jwt_token),
)
resp = req.json()
return resp["token"]
config = load_yaml("config.yml")
if __name__ == "__main__":
app.run(host="0.0.0.0", debug=False, port=8443)