Skip to content

Commit

Permalink
uploader compression change
Browse files Browse the repository at this point in the history
  • Loading branch information
sshane committed Apr 25, 2019
1 parent e6f4027 commit 9dc369a
Showing 1 changed file with 31 additions and 22 deletions.
53 changes: 31 additions & 22 deletions selfdrive/loggerd/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

fake_upload = os.getenv("FAKEUPLOAD") is not None


def raise_on_thread(t, exctype):
for ctid, tobj in threading._active.items():
if tobj is t:
Expand All @@ -43,6 +44,7 @@ def raise_on_thread(t, exctype):
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")


def listdir_with_creation_date(d):
lst = os.listdir(d)
for fn in lst:
Expand All @@ -54,10 +56,12 @@ def listdir_with_creation_date(d):
cloudlog.exception("listdir_with_creation_date: stat failed?")
yield (None, fn)


def listdir_by_creation_date(d):
times_and_paths = list(listdir_with_creation_date(d))
return [path for _, path in sorted(times_and_paths)]


def clear_locks(root):
for logname in os.listdir(root):
path = os.path.join(root, logname)
Expand All @@ -68,6 +72,7 @@ def clear_locks(root):
except OSError:
cloudlog.exception("clear_locks failed")


def is_on_wifi():
# ConnectivityManager.getActiveNetworkInfo()
try:
Expand All @@ -79,6 +84,7 @@ def is_on_wifi():

return "\x00".join("WIFI") in data


def is_on_hotspot():
try:
result = subprocess.check_output(["ifconfig", "wlan0"])
Expand All @@ -90,6 +96,7 @@ def is_on_hotspot():
except:
return False


class Uploader(object):
def __init__(self, dongle_id, access_token, root):
self.dongle_id = dongle_id
Expand Down Expand Up @@ -146,7 +153,7 @@ def next_file_to_compress(self):
def next_file_to_upload(self, with_video):
# try to upload log files first
for name, key, fn in self.gen_upload_files():
if name == "rlog.bz2":
if name == "rlog.bz2":
return (key, fn, 0)

if with_video:
Expand All @@ -164,20 +171,21 @@ def next_file_to_upload(self, with_video):

return None


def do_upload(self, key, fn):
try:
url_resp = api_get("v1.2/"+self.dongle_id+"/upload_url/", timeout=2, path=key, access_token=self.access_token)
url_resp = api_get("v1.2/" + self.dongle_id + "/upload_url/", timeout=2, path=key, access_token=self.access_token)
url_resp_json = json.loads(url_resp.text)
url = url_resp_json['url']
headers = url_resp_json['headers']
cloudlog.info("upload_url v1.2 %s %s", url, str(headers))

if fake_upload:
cloudlog.info("*** WARNING, THIS IS A FAKE UPLOAD TO %s ***" % url)

class FakeResponse(object):
def __init__(self):
self.status_code = 200

self.last_resp = FakeResponse()
else:
with open(fn, "rb") as f:
Expand All @@ -198,15 +206,15 @@ def normal_upload(self, key, fn):
return self.last_resp

def killable_upload(self, key, fn):
self.last_resp = None
self.last_exc = None
self.last_resp = None
self.last_exc = None

self.upload_thread = threading.Thread(target=lambda: self.do_upload(key, fn))
self.upload_thread.start()
self.upload_thread.join()
self.upload_thread = None
self.upload_thread = threading.Thread(target=lambda: self.do_upload(key, fn))
self.upload_thread.start()
self.upload_thread.join()
self.upload_thread = None

return self.last_resp
return self.last_resp

def abort_upload(self):
thread = self.upload_thread
Expand All @@ -221,7 +229,7 @@ def compress(self, key, fn):
# write out the bz2 compress
if fn.endswith("log"):
ext = ".bz2"
cloudlog.info("compressing %r to %r", fn, fn+ext)
cloudlog.info("compressing %r to %r", fn, fn + ext)
if os.system("nice -n 19 bzip2 -c %s > %s.tmp && mv %s.tmp %s%s && rm %s" % (fn, fn, fn, fn, ext, fn)) != 0:
cloudlog.exception("upload: bzip2 compression failed")
return False
Expand All @@ -245,7 +253,7 @@ def upload(self, key, fn):

if sz == 0:
# can't upload files of 0 size
os.unlink(fn) # delete the file
os.unlink(fn) # delete the file
success = True
else:
cloudlog.info("uploading %r", fn)
Expand All @@ -254,10 +262,10 @@ def upload(self, key, fn):
if stat is not None and stat.status_code in (200, 201):
cloudlog.event("upload_success", key=key, fn=fn, sz=sz)
try:
os.unlink(fn) # delete the file
os.unlink(fn) # delete the file
except OSError:
pass

success = True
else:
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz)
Expand All @@ -268,7 +276,6 @@ def upload(self, key, fn):
return success



def uploader_fn(exit_event):
cloudlog.info("uploader_fn")

Expand All @@ -290,16 +297,16 @@ def uploader_fn(exit_event):

if exit_event.is_set():
return

d = uploader.next_file_to_compress()
if d is not None:
key, fn, _ = d
uploader.compress(key, fn)
continue

if not should_upload:
time.sleep(5)
continue
else: # wait to compress files until the user is at home, on wifi
d = uploader.next_file_to_compress()
if d is not None:
key, fn, _ = d
uploader.compress(key, fn)
continue

d = uploader.next_file_to_upload(with_video=True)
if d is None:
Expand All @@ -316,11 +323,13 @@ def uploader_fn(exit_event):
else:
cloudlog.info("backoff %r", backoff)
time.sleep(backoff + random.uniform(0, backoff))
backoff = min(backoff*2, 120)
backoff = min(backoff * 2, 120)
cloudlog.info("upload done, success=%r", success)


def main(gctx=None):
uploader_fn(threading.Event())


if __name__ == "__main__":
main()

0 comments on commit 9dc369a

Please sign in to comment.