Skip to content

Commit

Permalink
Statsd (commaai#23413)
Browse files Browse the repository at this point in the history
* device side of statsd

* need to start it

* enable in manager

* add sleep

* cleanup

* remove aggregates for now and standardize on industry terms

* manager needs main

* need to have a try/except

* atomic_write_on_fs_tmp does not work

* cleaner

* use dump

Co-authored-by: Willem Melching <willem.melching@gmail.com>

* one file at a time

* limit amount of files

* move to influx line protocol and cleanup

* needs to be a list

* fix timezone bug

* actually rate limit

* add to release

* normalized origin

* also log deviceType

* more stats

Co-authored-by: Willem Melching <willem.melching@gmail.com>
  • Loading branch information
robbederks and pd0wm authored Jan 10, 2022
1 parent 8eec818 commit 1b49ce6
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 3 deletions.
1 change: 1 addition & 0 deletions release/files_common
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ selfdrive/tombstoned.py
selfdrive/pandad.py
selfdrive/updated.py
selfdrive/rtshield.py
selfdrive/statsd.py

selfdrive/athena/__init__.py
selfdrive/athena/athenad.py
Expand Down
35 changes: 32 additions & 3 deletions selfdrive/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import socket
import threading
import time
import tempfile
from collections import namedtuple
from functools import partial
from typing import Any
Expand All @@ -31,6 +32,7 @@
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.swaglog import cloudlog, SWAGLOG_DIR
from selfdrive.version import get_version, get_origin, get_short_branch, get_commit
from selfdrive.statsd import STATS_DIR

ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai')
HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4"))
Expand All @@ -48,7 +50,7 @@
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
upload_queue: Any = queue.Queue()
log_send_queue: Any = queue.Queue()
low_priority_send_queue: Any = queue.Queue()
log_recv_queue: Any = queue.Queue()
cancelled_uploads: Any = set()
UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', 'id', 'retry_count', 'current', 'progress'], defaults=(0, False, 0))
Expand Down Expand Up @@ -86,6 +88,7 @@ def handle_long_poll(ws):
threading.Thread(target=ws_send, args=(ws, end_event), name='ws_send'),
threading.Thread(target=upload_handler, args=(end_event,), name='upload_handler'),
threading.Thread(target=log_handler, args=(end_event,), name='log_handler'),
threading.Thread(target=stat_handler, args=(end_event,), name='stat_handler'),
] + [
threading.Thread(target=jsonrpc_handler, args=(end_event,), name=f'worker_{x}')
for x in range(HANDLER_THREADS)
Expand Down Expand Up @@ -447,7 +450,7 @@ def log_handler(end_event):
"jsonrpc": "2.0",
"id": log_entry
}
log_send_queue.put_nowait(json.dumps(jsonrpc))
low_priority_send_queue.put_nowait(json.dumps(jsonrpc))
curr_log = log_entry
except OSError:
pass # file could be deleted by log rotation
Expand Down Expand Up @@ -478,6 +481,32 @@ def log_handler(end_event):
cloudlog.exception("athena.log_handler.exception")


def stat_handler(end_event):
while not end_event.is_set():
last_scan = 0
curr_scan = sec_since_boot()
try:
if curr_scan - last_scan > 10:
stat_filenames = list(filter(lambda name: not name.startswith(tempfile.gettempprefix()), os.listdir(STATS_DIR)))
if len(stat_filenames) > 0:
stat_path = os.path.join(STATS_DIR, stat_filenames[0])
with open(stat_path) as f:
jsonrpc = {
"method": "storeStats",
"params": {
"stats": f.read()
},
"jsonrpc": "2.0",
"id": stat_filenames[0]
}
low_priority_send_queue.put_nowait(json.dumps(jsonrpc))
os.remove(stat_path)
last_scan = curr_scan
except Exception:
cloudlog.exception("athena.stat_handler.exception")
time.sleep(0.1)


def ws_proxy_recv(ws, local_sock, ssock, end_event, global_end_event):
while not (end_event.is_set() or global_end_event.is_set()):
try:
Expand Down Expand Up @@ -550,7 +579,7 @@ def ws_send(ws, end_event):
try:
data = send_queue.get_nowait()
except queue.Empty:
data = log_send_queue.get(timeout=1)
data = low_priority_send_queue.get(timeout=1)
for i in range(0, len(data), WS_FRAME_SIZE):
frame = data[i:i+WS_FRAME_SIZE]
last = i + WS_FRAME_SIZE >= len(data)
Expand Down
7 changes: 7 additions & 0 deletions selfdrive/loggerd/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
CAMERA_FPS = 20
SEGMENT_LENGTH = 60

STATS_DIR_FILE_LIMIT = 10000
STATS_SOCKET = "ipc:///tmp/stats"
if PC:
STATS_DIR = os.path.join(str(Path.home()), ".comma", "stats")
else:
STATS_DIR = "/data/stats/"
STATS_FLUSH_TIME_S = 60

def get_available_percent(default=None):
try:
Expand Down
1 change: 1 addition & 0 deletions selfdrive/manager/process_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
PythonProcess("tombstoned", "selfdrive.tombstoned", enabled=not PC, persistent=True),
PythonProcess("updated", "selfdrive.updated", enabled=not PC, persistent=True),
PythonProcess("uploader", "selfdrive.loggerd.uploader", persistent=True),
PythonProcess("statsd", "selfdrive.statsd", persistent=True),

# EON only
PythonProcess("rtshield", "selfdrive.rtshield", enabled=EON),
Expand Down
122 changes: 122 additions & 0 deletions selfdrive/statsd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
import os
import zmq
import time
from pathlib import Path
from datetime import datetime, timezone
from common.params import Params
from cereal.messaging import SubMaster
from selfdrive.swaglog import cloudlog
from selfdrive.hardware import HARDWARE
from common.file_helpers import atomic_write_in_dir
from selfdrive.version import get_normalized_origin, get_short_branch, get_short_version, is_dirty
from selfdrive.loggerd.config import STATS_DIR, STATS_DIR_FILE_LIMIT, STATS_SOCKET, STATS_FLUSH_TIME_S


class METRIC_TYPE:
GAUGE = 'g'

class StatLog:
def __init__(self):
self.pid = None

def connect(self):
self.zctx = zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10)
self.sock.connect(STATS_SOCKET)
self.pid = os.getpid()

def _send(self, metric: str):
if os.getpid() != self.pid:
self.connect()

try:
self.sock.send_string(metric, zmq.NOBLOCK)
except zmq.error.Again:
# drop :/
pass

def gauge(self, name: str, value: float):
self._send(f"{name}:{value}|{METRIC_TYPE.GAUGE}")


def main():
def get_influxdb_line(measurement: str, value: float, timestamp: datetime, tags: dict):
res = f"{measurement}"
for tag_key in tags.keys():
res += f",{tag_key}={str(tags[tag_key])}"
res += f" value={value} {int(timestamp.timestamp() * 1e9)}\n"
return res

# open statistics socket
ctx = zmq.Context().instance()
sock = ctx.socket(zmq.PULL)
sock.bind(STATS_SOCKET)

# initialize stats directory
Path(STATS_DIR).mkdir(parents=True, exist_ok=True)

# initialize tags
tags = {
'dongleId': Params().get("DongleId", encoding='utf-8'),
'started': False,
'version': get_short_version(),
'branch': get_short_branch(),
'dirty': is_dirty(),
'origin': get_normalized_origin(),
'deviceType': HARDWARE.get_device_type(),
}

# subscribe to deviceState for started state
sm = SubMaster(['deviceState'])

last_flush_time = time.monotonic()
gauges = {}
while True:
try:
metric = sock.recv_string(zmq.NOBLOCK)
try:
metric_type = metric.split('|')[1]
metric_name = metric.split(':')[0]
metric_value = metric.split('|')[0].split(':')[1]

if metric_type == METRIC_TYPE.GAUGE:
gauges[metric_name] = metric_value
else:
cloudlog.event("unknown metric type", metric_type=metric_type)
except Exception:
cloudlog.event("malformed metric", metric=metric)
except zmq.error.Again:
time.sleep(1e-3)

started_prev = sm['deviceState'].started
sm.update(0)

# flush when started state changes or after FLUSH_TIME_S
if (time.monotonic() > last_flush_time + STATS_FLUSH_TIME_S) or (sm['deviceState'].started != started_prev):
result = ""
current_time = datetime.utcnow().replace(tzinfo=timezone.utc)
tags['started'] = sm['deviceState'].started

for gauge_key in gauges.keys():
result += get_influxdb_line(f"gauge.{gauge_key}", gauges[gauge_key], current_time, tags)

# clear intermediate data
gauges = {}
last_flush_time = time.monotonic()

# check that we aren't filling up the drive
if len(os.listdir(STATS_DIR)) < STATS_DIR_FILE_LIMIT:
if len(result) > 0:
stats_path = os.path.join(STATS_DIR, str(int(current_time.timestamp())))
with atomic_write_in_dir(stats_path) as f:
f.write(result)
else:
cloudlog.error("stats dir full")


if __name__ == "__main__":
main()
else:
statlog = StatLog()
2 changes: 2 additions & 0 deletions selfdrive/thermald/power_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from common.realtime import sec_since_boot
from selfdrive.hardware import HARDWARE
from selfdrive.swaglog import cloudlog
from selfdrive.statsd import statlog

CAR_VOLTAGE_LOW_PASS_K = 0.091 # LPF gain for 5s tau (dt/tau / (dt/tau + 1))

Expand Down Expand Up @@ -56,6 +57,7 @@ def calculate(self, peripheralState, ignition):
# Low-pass battery voltage
self.car_voltage_instant_mV = peripheralState.voltage
self.car_voltage_mV = ((peripheralState.voltage * CAR_VOLTAGE_LOW_PASS_K) + (self.car_voltage_mV * (1 - CAR_VOLTAGE_LOW_PASS_K)))
statlog.gauge("car_voltage", self.car_voltage_mV / 1e3)

# Cap the car battery power and save it in a param every 10-ish seconds
self.car_battery_capacity_uWh = max(self.car_battery_capacity_uWh, 0)
Expand Down
22 changes: 22 additions & 0 deletions selfdrive/thermald/thermald.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from selfdrive.swaglog import cloudlog
from selfdrive.thermald.power_monitoring import PowerMonitoring
from selfdrive.version import terms_version, training_version
from selfdrive.statsd import statlog

ThermalStatus = log.DeviceState.ThermalStatus
NetworkType = log.DeviceState.NetworkType
Expand Down Expand Up @@ -291,8 +292,12 @@ def thermald_thread() -> NoReturn:
msg.deviceState.networkInfo = network_info
if nvme_temps is not None:
msg.deviceState.nvmeTempC = nvme_temps
for i, temp in enumerate(nvme_temps):
statlog.gauge(f"nvme_temperature{i}", temp)
if modem_temps is not None:
msg.deviceState.modemTempC = modem_temps
for i, temp in enumerate(modem_temps):
statlog.gauge(f"modem_temperature{i}", temp)

msg.deviceState.screenBrightnessPercent = HARDWARE.get_screen_brightness()
msg.deviceState.batteryPercent = HARDWARE.get_battery_capacity()
Expand Down Expand Up @@ -409,6 +414,23 @@ def thermald_thread() -> NoReturn:
should_start_prev = should_start
startup_conditions_prev = startup_conditions.copy()

# log more stats
statlog.gauge("free_space_percent", msg.deviceState.freeSpacePercent)
statlog.gauge("gpu_usage_percent", msg.deviceState.gpuUsagePercent)
statlog.gauge("memory_usage_percent", msg.deviceState.memoryUsagePercent)
for i, usage in enumerate(msg.deviceState.cpuUsagePercent):
statlog.gauge(f"cpu{i}_usage_percent", usage)
for i, temp in enumerate(msg.deviceState.cpuTempC):
statlog.gauge(f"cpu{i}_temperature", temp)
for i, temp in enumerate(msg.deviceState.gpuTempC):
statlog.gauge(f"gpu{i}_temperature", temp)
statlog.gauge("memory_temperature", msg.deviceState.memoryTempC)
statlog.gauge("ambient_temperature", msg.deviceState.ambientTempC)
for i, temp in enumerate(msg.deviceState.pmicTempC):
statlog.gauge(f"pmic{i}_temperature", temp)
statlog.gauge("fan_speed_percent_desired", msg.deviceState.fanSpeedPercentDesired)
statlog.gauge("screen_brightness_percent", msg.deviceState.screenBrightnessPercent)

# report to server once every 10 minutes
if (count % int(600. / DT_TRML)) == 0:
if EON and started_ts is None and msg.deviceState.memoryUsagePercent > 40:
Expand Down
14 changes: 14 additions & 0 deletions selfdrive/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,24 @@ def get_origin(default: Optional[str] = None) -> Optional[str]:
return run_cmd_default(["git", "config", "--get", "remote.origin.url"], default=default)


@cache
def get_normalized_origin(default: Optional[str] = None) -> Optional[str]:
return get_origin()\
.replace("git@", "", 1)\
.replace(".git", "", 1)\
.replace("https://", "", 1)\
.replace(":", "/", 1)


@cache
def get_version() -> str:
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "common", "version.h")) as _versionf:
version = _versionf.read().split('"')[1]
return version

@cache
def get_short_version() -> str:
return get_version().split('-')[0]

@cache
def is_prebuilt() -> bool:
Expand Down Expand Up @@ -117,7 +129,9 @@ def is_dirty() -> bool:

print(f"Dirty: {is_dirty()}")
print(f"Version: {get_version()}")
print(f"Short version: {get_short_version()}")
print(f"Origin: {get_origin()}")
print(f"Normalized origin: {get_normalized_origin()}")
print(f"Branch: {get_branch()}")
print(f"Short branch: {get_short_branch()}")
print(f"Prebuilt: {is_prebuilt()}")

0 comments on commit 1b49ce6

Please sign in to comment.