diff --git a/selfdrive/loggerd/uploader.py b/selfdrive/loggerd/uploader.py index a727fba7c120e7..c03e8a240feaef 100644 --- a/selfdrive/loggerd/uploader.py +++ b/selfdrive/loggerd/uploader.py @@ -21,6 +21,7 @@ fake_upload = os.getenv("FAKEUPLOAD") is not None + def raise_on_thread(t, exctype): for ctid, tobj in threading._active.items(): if tobj is t: @@ -43,6 +44,7 @@ def raise_on_thread(t, exctype): ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) raise SystemError("PyThreadState_SetAsyncExc failed") + def listdir_with_creation_date(d): lst = os.listdir(d) for fn in lst: @@ -54,10 +56,12 @@ def listdir_with_creation_date(d): cloudlog.exception("listdir_with_creation_date: stat failed?") yield (None, fn) + def listdir_by_creation_date(d): times_and_paths = list(listdir_with_creation_date(d)) return [path for _, path in sorted(times_and_paths)] + def clear_locks(root): for logname in os.listdir(root): path = os.path.join(root, logname) @@ -68,6 +72,7 @@ def clear_locks(root): except OSError: cloudlog.exception("clear_locks failed") + def is_on_wifi(): # ConnectivityManager.getActiveNetworkInfo() try: @@ -79,6 +84,7 @@ def is_on_wifi(): return "\x00".join("WIFI") in data + def is_on_hotspot(): try: result = subprocess.check_output(["ifconfig", "wlan0"]) @@ -90,6 +96,7 @@ def is_on_hotspot(): except: return False + class Uploader(object): def __init__(self, dongle_id, access_token, root): self.dongle_id = dongle_id @@ -146,7 +153,7 @@ def next_file_to_compress(self): def next_file_to_upload(self, with_video): # try to upload log files first for name, key, fn in self.gen_upload_files(): - if name == "rlog.bz2": + if name == "rlog.bz2": return (key, fn, 0) if with_video: @@ -164,10 +171,9 @@ def next_file_to_upload(self, with_video): return None - def do_upload(self, key, fn): try: - url_resp = api_get("v1.2/"+self.dongle_id+"/upload_url/", timeout=2, path=key, access_token=self.access_token) + url_resp = api_get("v1.2/" + self.dongle_id + "/upload_url/", timeout=2, path=key, access_token=self.access_token) url_resp_json = json.loads(url_resp.text) url = url_resp_json['url'] headers = url_resp_json['headers'] @@ -175,9 +181,11 @@ def do_upload(self, key, fn): if fake_upload: cloudlog.info("*** WARNING, THIS IS A FAKE UPLOAD TO %s ***" % url) + class FakeResponse(object): def __init__(self): self.status_code = 200 + self.last_resp = FakeResponse() else: with open(fn, "rb") as f: @@ -198,15 +206,15 @@ def normal_upload(self, key, fn): return self.last_resp def killable_upload(self, key, fn): - self.last_resp = None - self.last_exc = None + self.last_resp = None + self.last_exc = None - self.upload_thread = threading.Thread(target=lambda: self.do_upload(key, fn)) - self.upload_thread.start() - self.upload_thread.join() - self.upload_thread = None + self.upload_thread = threading.Thread(target=lambda: self.do_upload(key, fn)) + self.upload_thread.start() + self.upload_thread.join() + self.upload_thread = None - return self.last_resp + return self.last_resp def abort_upload(self): thread = self.upload_thread @@ -221,7 +229,7 @@ def compress(self, key, fn): # write out the bz2 compress if fn.endswith("log"): ext = ".bz2" - cloudlog.info("compressing %r to %r", fn, fn+ext) + cloudlog.info("compressing %r to %r", fn, fn + ext) if os.system("nice -n 19 bzip2 -c %s > %s.tmp && mv %s.tmp %s%s && rm %s" % (fn, fn, fn, fn, ext, fn)) != 0: cloudlog.exception("upload: bzip2 compression failed") return False @@ -245,7 +253,7 @@ def upload(self, key, fn): if sz == 0: # can't upload files of 0 size - os.unlink(fn) # delete the file + os.unlink(fn) # delete the file success = True else: cloudlog.info("uploading %r", fn) @@ -254,10 +262,10 @@ def upload(self, key, fn): if stat is not None and stat.status_code in (200, 201): cloudlog.event("upload_success", key=key, fn=fn, sz=sz) try: - os.unlink(fn) # delete the file + os.unlink(fn) # delete the file except OSError: pass - + success = True else: cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz) @@ -268,7 +276,6 @@ def upload(self, key, fn): return success - def uploader_fn(exit_event): cloudlog.info("uploader_fn") @@ -290,16 +297,16 @@ def uploader_fn(exit_event): if exit_event.is_set(): return - - d = uploader.next_file_to_compress() - if d is not None: - key, fn, _ = d - uploader.compress(key, fn) - continue if not should_upload: time.sleep(5) continue + else: # wait to compress files until the user is at home, on wifi + d = uploader.next_file_to_compress() + if d is not None: + key, fn, _ = d + uploader.compress(key, fn) + continue d = uploader.next_file_to_upload(with_video=True) if d is None: @@ -316,11 +323,13 @@ def uploader_fn(exit_event): else: cloudlog.info("backoff %r", backoff) time.sleep(backoff + random.uniform(0, backoff)) - backoff = min(backoff*2, 120) + backoff = min(backoff * 2, 120) cloudlog.info("upload done, success=%r", success) + def main(gctx=None): uploader_fn(threading.Event()) + if __name__ == "__main__": main()