diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b4f9b8b6a719fd..96c34b0984f4b4 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -2,7 +2,6 @@ from __future__ import annotations import base64 -import bz2 import hashlib import io import json @@ -15,6 +14,7 @@ import tempfile import threading import time +import zstd from dataclasses import asdict, dataclass, replace from datetime import datetime from functools import partial @@ -35,6 +35,7 @@ from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware import HARDWARE, PC +from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.common.swaglog import cloudlog from openpilot.system.version import get_build_metadata @@ -103,8 +104,8 @@ def from_dict(cls, d: dict) -> UploadItem: cur_upload_items: dict[int, UploadItem | None] = {} -def strip_bz2_extension(fn: str) -> str: - if fn.endswith('.bz2'): +def strip_zst_extension(fn: str) -> str: + if fn.endswith('.zst'): return fn[:-4] return fn @@ -283,16 +284,16 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R path = upload_item.path compress = False - # If file does not exist, but does exist without the .bz2 extension we will compress on the fly - if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)): - path = strip_bz2_extension(path) + # If file does not exist, but does exist without the .zst extension we will compress on the fly + if not os.path.exists(path) and os.path.exists(strip_zst_extension(path)): + path = strip_zst_extension(path) compress = True with open(path, "rb") as f: content = f.read() if compress: cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) - content = bz2.compress(content) + content = zstd.compress(content, LOG_COMPRESSION_LEVEL) with io.BytesIO(content) as data: return requests.put(upload_item.url, @@ -375,7 +376,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo continue path = os.path.join(Paths.log_root(), file.fn) - if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)): + if not os.path.exists(path) and not os.path.exists(strip_zst_extension(path)): failed.append(file.fn) continue diff --git a/system/athena/tests/test_athenad.py b/system/athena/tests/test_athenad.py index 48519a0ffd2a0e..413255b587618a 100644 --- a/system/athena/tests/test_athenad.py +++ b/system/athena/tests/test_athenad.py @@ -29,7 +29,7 @@ def seed_athena_server(host, port): with Timeout(2, 'HTTP Server seeding failed'): while True: try: - requests.put(f'http://{host}:{port}/qlog.bz2', data='', timeout=10) + requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10) break except requests.exceptions.ConnectionError: time.sleep(0.1) @@ -174,54 +174,59 @@ def test_list_data_directory(self): assert resp, 'list empty!' assert len(resp) == len(expected) - def test_strip_bz2_extension(self): + def test_strip_extension(self): + # any requested log file with an invalid extension won't return as existing fn = self._create_file('qlog.bz2') if fn.endswith('.bz2'): - assert athenad.strip_bz2_extension(fn) == fn[:-4] + assert athenad.strip_zst_extension(fn) == fn + + fn = self._create_file('qlog.zst') + if fn.endswith('.zst'): + assert athenad.strip_zst_extension(fn) == fn[:-4] @pytest.mark.parametrize("compress", [True, False]) def test_do_upload(self, host, compress): # random bytes to ensure rather large object post-compression fn = self._create_file('qlog', data=os.urandom(10000 * 1024)) - upload_fn = fn + ('.bz2' if compress else '') + upload_fn = fn + ('.zst' if compress else '') item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='') with pytest.raises(requests.exceptions.ConnectionError): athenad._do_upload(item) - item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='') + item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='') resp = athenad._do_upload(item) assert resp.status_code == 201 def test_upload_file_to_url(self, host): - fn = self._create_file('qlog.bz2') + fn = self._create_file('qlog.zst') - resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {}) + resp = dispatcher["uploadFileToUrl"]("qlog.zst", f"{host}/qlog.zst", {}) assert resp['enqueued'] == 1 assert 'failed' not in resp - assert {"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}.items() <= resp['items'][0].items() + assert {"path": fn, "url": f"{host}/qlog.zst", "headers": {}}.items() <= resp['items'][0].items() assert resp['items'][0].get('id') is not None assert athenad.upload_queue.qsize() == 1 def test_upload_file_to_url_duplicate(self, host): - self._create_file('qlog.bz2') + self._create_file('qlog.zst') - url1 = f"{host}/qlog.bz2?sig=sig1" - dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {}) + url1 = f"{host}/qlog.zst?sig=sig1" + dispatcher["uploadFileToUrl"]("qlog.zst", url1, {}) # Upload same file again, but with different signature - url2 = f"{host}/qlog.bz2?sig=sig2" - resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {}) + url2 = f"{host}/qlog.zst?sig=sig2" + resp = dispatcher["uploadFileToUrl"]("qlog.zst", url2, {}) assert resp == {'enqueued': 0, 'items': []} def test_upload_file_to_url_does_not_exist(self, host): - not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {}) - assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']} + not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.zst", "http://localhost:1238", {}) + assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.zst']} @with_upload_handler def test_upload_handler(self, host): - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + fn = self._create_file('qlog.zst') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() @@ -236,8 +241,8 @@ def test_upload_handler(self, host): def test_upload_handler_retry(self, mocker, host, status, retry): mock_put = mocker.patch('requests.put') mock_put.return_value.status_code = status - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + fn = self._create_file('qlog.zst') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() @@ -251,8 +256,8 @@ def test_upload_handler_retry(self, mocker, host, status, retry): @with_upload_handler def test_upload_handler_timeout(self): """When an upload times out or fails to connect it should be placed back in the queue""" - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + fn = self._create_file('qlog.zst') + item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT) athenad.upload_queue.put_nowait(item_no_retry) @@ -272,7 +277,7 @@ def test_upload_handler_timeout(self): @with_upload_handler def test_cancel_upload(self): - item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, + item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) athenad.upload_queue.put_nowait(item) dispatcher["cancelUpload"](item.id) @@ -291,8 +296,8 @@ def test_cancel_expiry(self): ts = int(t_future.strftime("%s")) * 1000 # Item that would time out if actually uploaded - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True) + fn = self._create_file('qlog.zst') + item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=ts, id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() @@ -306,8 +311,8 @@ def test_list_upload_queue_empty(self): @with_upload_handler def test_list_upload_queue_current(self, host: str): - fn = self._create_file('qlog.bz2') - item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) + fn = self._create_file('qlog.zst') + item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True) athenad.upload_queue.put_nowait(item) self._wait_for_upload() @@ -317,7 +322,7 @@ def test_list_upload_queue_current(self, host: str): assert items[0]['current'] def test_list_upload_queue(self): - item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={}, + item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='id', allow_cellular=True) athenad.upload_queue.put_nowait(item) diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py index 6591198281b992..fa716002635a08 100644 --- a/system/loggerd/tests/test_uploader.py +++ b/system/loggerd/tests/test_uploader.py @@ -62,10 +62,10 @@ def gen_files(self, lock=False, xattr: bytes = None, boot=True) -> list[Path]: def gen_order(self, seg1: list[int], seg2: list[int], boot=True) -> list[str]: keys = [] if boot: - keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1] - keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2] - keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1] - keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2] + keys += [f"boot/{self.seg_format.format(i)}.zst" for i in seg1] + keys += [f"boot/{self.seg_format2.format(i)}.zst" for i in seg2] + keys += [f"{self.seg_format.format(i)}/qlog.zst" for i in seg1] + keys += [f"{self.seg_format2.format(i)}/qlog.zst" for i in seg2] return keys def test_upload(self): @@ -159,7 +159,7 @@ def test_no_upload_with_lock_file(self): self.join_thread() for f_path in f_paths: - fn = f_path.with_suffix(f_path.suffix.replace(".bz2", "")) + fn = f_path.with_suffix(f_path.suffix.replace(".zst", "")) uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE assert not uploaded, "File upload when locked" diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 832a227798e49d..ee16193cd259d1 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import bz2 import io import json import os @@ -9,6 +8,7 @@ import time import traceback import datetime +import zstd from typing import BinaryIO from collections.abc import Iterator @@ -26,6 +26,7 @@ UPLOAD_ATTR_VALUE = b'1' UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB +LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) force_wifi = os.getenv("FORCEWIFI") is not None @@ -83,7 +84,7 @@ def __init__(self, dongle_id: str, root: str): self.last_filename = "" self.immediate_folders = ["crash/", "boot/"] - self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} + self.immediate_priority = {"qlog": 0, "qlog.zst": 0, "qcamera.ts": 1} def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]: r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8") @@ -152,8 +153,8 @@ def do_upload(self, key: str, fn: str): with open(fn, "rb") as f: data: BinaryIO - if key.endswith('.bz2') and not fn.endswith('.bz2'): - compressed = bz2.compress(f.read()) + if key.endswith('.zst') and not fn.endswith('.zst'): + compressed = zstd.compress(f.read(), LOG_COMPRESSION_LEVEL) data = io.BytesIO(compressed) else: data = f @@ -218,8 +219,8 @@ def step(self, network_type: int, metered: bool) -> bool | None: name, key, fn = d # qlogs and bootlogs need to be compressed before uploading - if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): - key += ".bz2" + if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.zst')): + key += ".zst" return self.upload(name, key, fn, network_type, metered)