Skip to content

Commit

Permalink
uploader: compress with zstd (commaai#32736)
Browse files Browse the repository at this point in the history
* zstd uploader

* fix that

* fix name of function

* comment

* log failed

* fix comma_api_source for routes with both bz2 and zst rlogs

* TODO

* 10-14 achieves almost no benefit on qlogs in a few cases, but takes 2x the time

* these aren't written out

* regen: specify any list of sources

ooh this is pretty nice

* regen and process replay

* damn, actually we don't need all this (cool tho)

Revert "regen: specify any list of sources"

This reverts commit ceb0b4a.

* just let it auto resolve

* fix athenad/uploader tests

* zst here too

* TODOs

* yes

* Revert "TODOs"

This reverts commit 8c7da1d.

* Revert "zst here too"

This reverts commit 23b0023.

* Revert "just let it auto resolve"

This reverts commit f296d62.

* Revert "regen and process replay"

This reverts commit 0768330.

* revert readme

* not in save_log either

* lfg

* Revert "lfg"

This reverts commit 3718559.
  • Loading branch information
sshane authored Jul 27, 2024
1 parent 6f1ea5a commit 7dec7c3
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 46 deletions.
17 changes: 9 additions & 8 deletions system/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import annotations

import base64
import bz2
import hashlib
import io
import json
Expand All @@ -15,6 +14,7 @@
import tempfile
import threading
import time
import zstd
from dataclasses import asdict, dataclass, replace
from datetime import datetime
from functools import partial
Expand All @@ -35,6 +35,7 @@
from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import HARDWARE, PC
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_build_metadata
Expand Down Expand Up @@ -103,8 +104,8 @@ def from_dict(cls, d: dict) -> UploadItem:
cur_upload_items: dict[int, UploadItem | None] = {}


def strip_bz2_extension(fn: str) -> str:
if fn.endswith('.bz2'):
def strip_zst_extension(fn: str) -> str:
if fn.endswith('.zst'):
return fn[:-4]
return fn

Expand Down Expand Up @@ -283,16 +284,16 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R
path = upload_item.path
compress = False

# If file does not exist, but does exist without the .bz2 extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)):
path = strip_bz2_extension(path)
# If file does not exist, but does exist without the .zst extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_zst_extension(path)):
path = strip_zst_extension(path)
compress = True

with open(path, "rb") as f:
content = f.read()
if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
content = bz2.compress(content)
content = zstd.compress(content, LOG_COMPRESSION_LEVEL)

with io.BytesIO(content) as data:
return requests.put(upload_item.url,
Expand Down Expand Up @@ -375,7 +376,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
continue

path = os.path.join(Paths.log_root(), file.fn)
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
if not os.path.exists(path) and not os.path.exists(strip_zst_extension(path)):
failed.append(file.fn)
continue

Expand Down
59 changes: 32 additions & 27 deletions system/athena/tests/test_athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def seed_athena_server(host, port):
with Timeout(2, 'HTTP Server seeding failed'):
while True:
try:
requests.put(f'http://{host}:{port}/qlog.bz2', data='', timeout=10)
requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10)
break
except requests.exceptions.ConnectionError:
time.sleep(0.1)
Expand Down Expand Up @@ -174,54 +174,59 @@ def test_list_data_directory(self):
assert resp, 'list empty!'
assert len(resp) == len(expected)

def test_strip_bz2_extension(self):
def test_strip_extension(self):
# any requested log file with an invalid extension won't return as existing
fn = self._create_file('qlog.bz2')
if fn.endswith('.bz2'):
assert athenad.strip_bz2_extension(fn) == fn[:-4]
assert athenad.strip_zst_extension(fn) == fn

fn = self._create_file('qlog.zst')
if fn.endswith('.zst'):
assert athenad.strip_zst_extension(fn) == fn[:-4]

@pytest.mark.parametrize("compress", [True, False])
def test_do_upload(self, host, compress):
# random bytes to ensure rather large object post-compression
fn = self._create_file('qlog', data=os.urandom(10000 * 1024))

upload_fn = fn + ('.bz2' if compress else '')
upload_fn = fn + ('.zst' if compress else '')
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with pytest.raises(requests.exceptions.ConnectionError):
athenad._do_upload(item)

item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item)
assert resp.status_code == 201

def test_upload_file_to_url(self, host):
fn = self._create_file('qlog.bz2')
fn = self._create_file('qlog.zst')

resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
resp = dispatcher["uploadFileToUrl"]("qlog.zst", f"{host}/qlog.zst", {})
assert resp['enqueued'] == 1
assert 'failed' not in resp
assert {"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}.items() <= resp['items'][0].items()
assert {"path": fn, "url": f"{host}/qlog.zst", "headers": {}}.items() <= resp['items'][0].items()
assert resp['items'][0].get('id') is not None
assert athenad.upload_queue.qsize() == 1

def test_upload_file_to_url_duplicate(self, host):
self._create_file('qlog.bz2')
self._create_file('qlog.zst')

url1 = f"{host}/qlog.bz2?sig=sig1"
dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {})
url1 = f"{host}/qlog.zst?sig=sig1"
dispatcher["uploadFileToUrl"]("qlog.zst", url1, {})

# Upload same file again, but with different signature
url2 = f"{host}/qlog.bz2?sig=sig2"
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {})
url2 = f"{host}/qlog.zst?sig=sig2"
resp = dispatcher["uploadFileToUrl"]("qlog.zst", url2, {})
assert resp == {'enqueued': 0, 'items': []}

def test_upload_file_to_url_does_not_exist(self, host):
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {})
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']}
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.zst", "http://localhost:1238", {})
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.zst']}

@with_upload_handler
def test_upload_handler(self, host):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
Expand All @@ -236,8 +241,8 @@ def test_upload_handler(self, host):
def test_upload_handler_retry(self, mocker, host, status, retry):
mock_put = mocker.patch('requests.put')
mock_put.return_value.status_code = status
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
Expand All @@ -251,8 +256,8 @@ def test_upload_handler_retry(self, mocker, host, status, retry):
@with_upload_handler
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT)

athenad.upload_queue.put_nowait(item_no_retry)
Expand All @@ -272,7 +277,7 @@ def test_upload_handler_timeout(self):

@with_upload_handler
def test_cancel_upload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
dispatcher["cancelUpload"](item.id)
Expand All @@ -291,8 +296,8 @@ def test_cancel_expiry(self):
ts = int(t_future.strftime("%s")) * 1000

# Item that would time out if actually uploaded
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=ts, id='', allow_cellular=True)

athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
Expand All @@ -306,8 +311,8 @@ def test_list_upload_queue_empty(self):

@with_upload_handler
def test_list_upload_queue_current(self, host: str):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)

athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
Expand All @@ -317,7 +322,7 @@ def test_list_upload_queue_current(self, host: str):
assert items[0]['current']

def test_list_upload_queue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item)

Expand Down
10 changes: 5 additions & 5 deletions system/loggerd/tests/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ def gen_files(self, lock=False, xattr: bytes = None, boot=True) -> list[Path]:
def gen_order(self, seg1: list[int], seg2: list[int], boot=True) -> list[str]:
keys = []
if boot:
keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1]
keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2]
keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
keys += [f"boot/{self.seg_format.format(i)}.zst" for i in seg1]
keys += [f"boot/{self.seg_format2.format(i)}.zst" for i in seg2]
keys += [f"{self.seg_format.format(i)}/qlog.zst" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.zst" for i in seg2]
return keys

def test_upload(self):
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_no_upload_with_lock_file(self):
self.join_thread()

for f_path in f_paths:
fn = f_path.with_suffix(f_path.suffix.replace(".bz2", ""))
fn = f_path.with_suffix(f_path.suffix.replace(".zst", ""))
uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
assert not uploaded, "File upload when locked"

Expand Down
13 changes: 7 additions & 6 deletions system/loggerd/uploader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python3
import bz2
import io
import json
import os
Expand All @@ -9,6 +8,7 @@
import time
import traceback
import datetime
import zstd
from typing import BinaryIO
from collections.abc import Iterator

Expand All @@ -26,6 +26,7 @@
UPLOAD_ATTR_VALUE = b'1'

UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change

allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1"))
force_wifi = os.getenv("FORCEWIFI") is not None
Expand Down Expand Up @@ -83,7 +84,7 @@ def __init__(self, dongle_id: str, root: str):
self.last_filename = ""

self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
self.immediate_priority = {"qlog": 0, "qlog.zst": 0, "qcamera.ts": 1}

def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]:
r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8")
Expand Down Expand Up @@ -152,8 +153,8 @@ def do_upload(self, key: str, fn: str):

with open(fn, "rb") as f:
data: BinaryIO
if key.endswith('.bz2') and not fn.endswith('.bz2'):
compressed = bz2.compress(f.read())
if key.endswith('.zst') and not fn.endswith('.zst'):
compressed = zstd.compress(f.read(), LOG_COMPRESSION_LEVEL)
data = io.BytesIO(compressed)
else:
data = f
Expand Down Expand Up @@ -218,8 +219,8 @@ def step(self, network_type: int, metered: bool) -> bool | None:
name, key, fn = d

# qlogs and bootlogs need to be compressed before uploading
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.zst')):
key += ".zst"

return self.upload(name, key, fn, network_type, metered)

Expand Down

0 comments on commit 7dec7c3

Please sign in to comment.