Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
DevinLeamy authored and pd0wm committed Nov 1, 2021
1 parent 1f53d0e commit c8a3c9f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 28 deletions.
50 changes: 28 additions & 22 deletions selfdrive/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
WS_FRAME_SIZE = 4096

params = Params()

dispatcher["echo"] = lambda s: s
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
Expand All @@ -58,6 +56,24 @@
cur_upload_items = {}


class UploadQueueCache():
params = Params()

@staticmethod
def initialize(upload_queue):
upload_queue_json = UploadQueueCache.params.get("AthenadUploadQueue")
try:
for item in json.loads(upload_queue_json):
upload_queue.put(UploadItem(**item))
except (json.JSONDecodeError):
cloudlog.exception("athena.UploadQueueCache.initialize.exception")

@staticmethod
def cache(upload_queue):
items = [i._asdict() for i in upload_queue.queue if i.id not in cancelled_uploads]
UploadQueueCache.params.put("AthenadUploadQueue", json.dumps(items))


def handle_long_poll(ws):
end_event = threading.Event()

Expand Down Expand Up @@ -123,7 +139,7 @@ def cb(sz, cur):
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)

_do_upload(cur_upload_items[tid], cb)
cache_upload_queue(upload_queue)
UploadQueueCache.cache(upload_queue)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")

Expand All @@ -135,7 +151,7 @@ def cb(sz, cur):
current=False
)
upload_queue.put_nowait(item)
cache_upload_queue(upload_queue)
UploadQueueCache.cache(upload_queue)

cur_upload_items[tid] = None

Expand Down Expand Up @@ -194,7 +210,7 @@ def setNavDestination(latitude=0, longitude=0):
"latitude": latitude,
"longitude": longitude,
}
params.put("NavDestination", json.dumps(destination))
Params().put("NavDestination", json.dumps(destination))

return {"success": 1}

Expand Down Expand Up @@ -252,7 +268,7 @@ def uploadFileToUrl(fn, url, headers):
item = item._replace(id=upload_id)

upload_queue.put_nowait(item)
cache_upload_queue(upload_queue)
UploadQueueCache.cache(upload_queue)

return {"enqueued": 1, "item": item._asdict()}

Expand Down Expand Up @@ -285,7 +301,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):

cloudlog.debug("athena.startLocalProxy.starting")

dongle_id = params.get("DongleId").decode('utf8')
dongle_id = Params().get("DongleId").decode('utf8')
identity_token = Api(dongle_id).get_token()
ws = create_connection(remote_ws_uri,
cookie="jwt=" + identity_token,
Expand Down Expand Up @@ -322,7 +338,7 @@ def getPublicKey():

@dispatcher.add_method
def getSshAuthorizedKeys():
return params.get("GithubSshKeys", encoding='utf8') or ''
return Params().get("GithubSshKeys", encoding='utf8') or ''


@dispatcher.add_method
Expand Down Expand Up @@ -492,7 +508,7 @@ def ws_recv(ws, end_event):
recv_queue.put_nowait(data)
elif opcode == ABNF.OPCODE_PING:
last_ping = int(sec_since_boot() * 1e9)
params.put("LastAthenaPingTime", str(last_ping))
Params().put("LastAthenaPingTime", str(last_ping))
except WebSocketTimeoutException:
ns_since_last_ping = int(sec_since_boot() * 1e9) - last_ping
if ns_since_last_ping > RECONNECT_TIMEOUT_S * 1e9:
Expand Down Expand Up @@ -526,20 +542,10 @@ def backoff(retries):
return random.randrange(0, min(128, int(2 ** retries)))


def init_upload_queue(upload_queue):
upload_queue_json = params.get("AthenadUploadQueue")
for item in json.loads(upload_queue_json):
upload_queue.put(UploadItem(**item))


def cache_upload_queue(upload_queue):
items = [i._asdict() for i in upload_queue.queue if i.id not in cancelled_uploads]
params.put("AthenadUploadQueue", json.dumps(items))


def main():
params = Params()
dongle_id = params.get("DongleId", encoding='utf-8')
init_upload_queue(upload_queue)
UploadQueueCache.initialize(upload_queue)

ws_uri = ATHENA_HOST + "/ws/v2/" + dongle_id
api = Api(dongle_id)
Expand Down Expand Up @@ -585,4 +591,4 @@ def main():


if __name__ == "__main__":
main()
main()
2 changes: 1 addition & 1 deletion selfdrive/athena/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ def inner(*args, **kwargs):
finally:
p.terminate()

return inner
return inner
8 changes: 4 additions & 4 deletions selfdrive/athena/tests/test_athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TestAthenadMethods(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.SOCKET_PORT = 45454
athenad.params = MockParams()
athenad.Params = MockParams
athenad.ROOT = tempfile.mkdtemp()
athenad.SWAGLOG_DIR = swaglog.SWAGLOG_DIR = tempfile.mkdtemp()
athenad.Api = MockApi
Expand Down Expand Up @@ -256,11 +256,11 @@ def test_upload_queue_persistence(self):

# serialize item
athenad.upload_queue.put_nowait(item)
athenad.cache_upload_queue(athenad.upload_queue)
athenad.UploadQueueCache.cache(athenad.upload_queue)

# deserialize item
athenad.upload_queue.queue.clear()
athenad.init_upload_queue(athenad.upload_queue)
athenad.UploadQueueCache.initialize(athenad.upload_queue)

self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertDictEqual(athenad.upload_queue.queue[-1]._asdict(), item._asdict())
Expand Down Expand Up @@ -335,4 +335,4 @@ def test_get_logs_to_send_sorted(self):
self.assertListEqual(sl, fl[:-1])

if __name__ == '__main__':
unittest.main()
unittest.main()
2 changes: 1 addition & 1 deletion selfdrive/common/params.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,4 @@ void Params::clearAll(ParamKeyType key_type) {
// fsync parent directory
path = params_path + "/d";
fsync_dir(path.c_str());
}
}

0 comments on commit c8a3c9f

Please sign in to comment.