forked from broadinstitute/cromwell-monitor-deprecated
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.py
executable file
·141 lines (123 loc) · 4.75 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python3
import logging
from signal import SIGTERM, signal
from time import time
import pynvml
from typing import List
import requests
import gcp_monitor
logging.getLogger().setLevel(logging.INFO)
# flag to keep track of container termination
container_running = True
# Detect container termination
def signal_handler(signum, frame):
global container_running
container_running = False
def get_access_token() -> str:
"""
https://cloud.google.com/docs/authentication/rest#metadata-server
"""
res = requests.get(
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token",
headers={"Metadata-Flavor": "Google"},
)
res.raise_for_status()
if "access_token" not in res.json():
raise ValueError(
f"Error getting authentication access token for GCP REST API: {res.json()}, access token not in response"
)
return res.json()["access_token"]
def get_pricelist_dict() -> List[dict]:
"""Query the cloudbilling api for current compute engine SKUs and prices,
then collate the paginated json responses into a single json file. Then returns
the skus in a list of dict objects
:raises requests.exceptions.RequestException: If the request to the cloudbilling
api fails
:return: A List of dicts containing the SKUs and prices of all compute engine
services. See link below for how each sku dict is organized
https://cloud.google.com/billing/docs/reference/rest/v1/services.skus/list#Sku
:rtype: List[dict]
"""
query_params = {
"pageSize": 5000,
}
# Access token expires in 1hr but this is ok because we only call
# the api at the start of monitoring
headers = {
"Authorization": f"Bearer {get_access_token()}",
}
res = requests.get(
"https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus",
params=query_params,
headers=headers,
)
try:
res.raise_for_status()
except requests.HTTPError as e:
logging.error(f"Error getting pricelist: {e}")
raise
services_json = res.json()
next_page_token = services_json.get("nextPageToken", "")
services_dict = services_json.get("skus", [])
while next_page_token != "":
query_params["pageToken"] = next_page_token
res = requests.get(
# 6F81-5844-456A is the service id for compute engine
"https://cloudbilling.googleapis.com/v1/services/6F81-5844-456A/skus",
params=query_params,
headers=headers,
)
services_json = res.json()
next_page_token = services_json.get("nextPageToken", "")
services_dict += services_json.get("skus", [])
return services_dict
def main():
"""
This function continuously measures runtime metrics every MEASUREMENT_TIME_SEC,
and reports them to Stackdriver Monitoring API every REPORT_TIME_SEC.
However, if it detects a container termination signal,
it *should* report the final metric
right after the current measurement, and then exit normally.
"""
nvml_ok = True
try:
pynvml.nvmlInit()
except pynvml.NVMLError as e:
# expected if the machine does not have an NVIDIA GPU
logging.info(f"NVML initialization failed (probably no GPUs): {e}")
nvml_ok = False
pricing_available = False
try:
pricing_available = True
services_pricelist: dict = get_pricelist_dict()
gcp_instance, metrics_client = gcp_monitor.initialize_gcp_variables(
nvml_ok, services_pricelist, pricing_available
)
except (requests.exceptions.RequestException, ValueError) as e:
pricing_available = False
logging.error(f"Error using pricing data for metrics: {e}")
logging.warning("Will attempt to continue monitoring without pricing data...")
gcp_instance, metrics_client = gcp_monitor.initialize_gcp_variables(nvml_ok)
try:
signal(SIGTERM, signal_handler)
gcp_instance = gcp_monitor.reset(gcp_instance)
while container_running:
gcp_instance = gcp_monitor.measure(gcp_instance)
if (
not container_running
or (time() - gcp_instance["last_time"])
>= gcp_instance["REPORT_TIME_SEC"]
):
gcp_instance = gcp_monitor.report(
gcp_instance, metrics_client, nvml_ok, pricing_available
)
gcp_instance = gcp_monitor.reset(gcp_instance)
finally:
if nvml_ok:
try:
pynvml.nvmlShutdown()
print("NVML shutdown successfully")
except pynvml.NVMLError:
print("Failed to shutdown NVML")
if __name__ == "__main__":
main()