Skip to content

Commit

Permalink
revert submodules
Browse files Browse the repository at this point in the history
  • Loading branch information
DevinLeamy committed Oct 27, 2021
1 parent cca07c0 commit efc7e56
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 25 deletions.
47 changes: 32 additions & 15 deletions selfdrive/athena/athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
MAX_RETRY_COUNT = 30 # Try for at most 5 minutes if upload fails immediately
WS_FRAME_SIZE = 4096

PARAMS = Params()

dispatcher["echo"] = lambda s: s
recv_queue: Any = queue.Queue()
send_queue: Any = queue.Queue()
Expand Down Expand Up @@ -111,6 +113,7 @@ def upload_handler(end_event):

try:
cur_upload_items[tid] = upload_queue.get(timeout=1)._replace(current=True)

if cur_upload_items[tid].id in cancelled_uploads:
cancelled_uploads.remove(cur_upload_items[tid].id)
continue
Expand All @@ -120,6 +123,7 @@ def cb(sz, cur):
cur_upload_items[tid] = cur_upload_items[tid]._replace(progress=cur / sz if sz else 1)

_do_upload(cur_upload_items[tid], cb)
cache_upload_queue(upload_queue)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.SSLError) as e:
cloudlog.warning(f"athena.upload_handler.retry {e} {cur_upload_items[tid]}")

Expand All @@ -131,6 +135,8 @@ def cb(sz, cur):
current=False
)
upload_queue.put_nowait(item)
cache_upload_queue(upload_queue)

cur_upload_items[tid] = None

for _ in range(RETRY_DELAY):
Expand Down Expand Up @@ -188,7 +194,7 @@ def setNavDestination(latitude=0, longitude=0):
"latitude": latitude,
"longitude": longitude,
}
Params().put("NavDestination", json.dumps(destination))
PARAMS.put("NavDestination", json.dumps(destination))

return {"success": 1}

Expand Down Expand Up @@ -246,6 +252,7 @@ def uploadFileToUrl(fn, url, headers):
item = item._replace(id=upload_id)

upload_queue.put_nowait(item)
cache_upload_queue(upload_queue)

return {"enqueued": 1, "item": item._asdict()}

Expand Down Expand Up @@ -278,8 +285,7 @@ def startLocalProxy(global_end_event, remote_ws_uri, local_port):

cloudlog.debug("athena.startLocalProxy.starting")

params = Params()
dongle_id = params.get("DongleId").decode('utf8')
dongle_id = PARAMS.get("DongleId").decode('utf8')
identity_token = Api(dongle_id).get_token()
ws = create_connection(remote_ws_uri,
cookie="jwt=" + identity_token,
Expand Down Expand Up @@ -316,7 +322,7 @@ def getPublicKey():

@dispatcher.add_method
def getSshAuthorizedKeys():
return Params().get("GithubSshKeys", encoding='utf8') or ''
return PARAMS.get("GithubSshKeys", encoding='utf8') or ''


@dispatcher.add_method
Expand Down Expand Up @@ -486,7 +492,7 @@ def ws_recv(ws, end_event):
recv_queue.put_nowait(data)
elif opcode == ABNF.OPCODE_PING:
last_ping = int(sec_since_boot() * 1e9)
Params().put("LastAthenaPingTime", str(last_ping))
PARAMS.put("LastAthenaPingTime", str(last_ping))
except WebSocketTimeoutException:
ns_since_last_ping = int(sec_since_boot() * 1e9) - last_ping
if ns_since_last_ping > RECONNECT_TIMEOUT_S * 1e9:
Expand Down Expand Up @@ -520,9 +526,20 @@ def backoff(retries):
return random.randrange(0, min(128, int(2 ** retries)))


def init_upload_queue(upload_queue):
upload_queue_json = PARAMS.get("AthenadUploadQueue")
for item in json.loads(upload_queue_json):
upload_queue.put(UploadItem(**item))


def cache_upload_queue(upload_queue):
items = [i._asdict() for i in upload_queue.queue if i.id not in cancelled_uploads]
PARAMS.put("AthenadUploadQueue", json.dumps(items))


def main():
params = Params()
dongle_id = params.get("DongleId", encoding='utf-8')
dongle_id = PARAMS.get("DongleId", encoding='utf-8')
init_upload_queue(upload_queue)

ws_uri = ATHENA_HOST + "/ws/v2/" + dongle_id
api = Api(dongle_id)
Expand All @@ -536,7 +553,7 @@ def main():
enable_multithread=True,
timeout=30.0)
cloudlog.event("athenad.main.connected_ws", ws_uri=ws_uri)
params.delete("PrimeRedirected")
PARAMS.delete("PrimeRedirected")

conn_retries = 0
cur_upload_items.clear()
Expand All @@ -546,26 +563,26 @@ def main():
break
except (ConnectionError, TimeoutError, WebSocketException):
conn_retries += 1
params.delete("PrimeRedirected")
params.delete("LastAthenaPingTime")
PARAMS.delete("PrimeRedirected")
PARAMS.delete("LastAthenaPingTime")
except socket.timeout:
try:
r = requests.get("http://api.commadotai.com/v1/me", allow_redirects=False,
headers={"User-Agent": f"openpilot-{version}"}, timeout=15.0)
if r.status_code == 302 and r.headers['Location'].startswith("http://u.web2go.com"):
params.put_bool("PrimeRedirected", True)
PARAMS.put_bool("PrimeRedirected", True)
except Exception:
cloudlog.exception("athenad.socket_timeout.exception")
params.delete("LastAthenaPingTime")
PARAMS.delete("LastAthenaPingTime")
except Exception:
cloudlog.exception("athenad.main.exception")

conn_retries += 1
params.delete("PrimeRedirected")
params.delete("LastAthenaPingTime")
PARAMS.delete("PrimeRedirected")
PARAMS.delete("LastAthenaPingTime")

time.sleep(backoff(conn_retries))


if __name__ == "__main__":
main()
main()
24 changes: 17 additions & 7 deletions selfdrive/athena/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,27 @@ def get_token(self):


class MockParams():
def __init__(self):
self.params = {
"DongleId": b"0000000000000000",
"GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private" # noqa: E501
}
default_params = {
"DongleId": b"0000000000000000",
"GithubSshKeys": b"ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC307aE+nuHzTAgaJhzSf5v7ZZQW9gaperjhCmyPyl4PzY7T1mDGenTlVTN7yoVFZ9UfO9oMQqo0n1OwDIiqbIFxqnhrHU0cYfj88rI85m5BEKlNu5RdaVTj1tcbaPpQc5kZEolaI1nDDjzV0lwS7jo5VYDHseiJHlik3HH1SgtdtsuamGR2T80q1SyW+5rHoMOJG73IH2553NnWuikKiuikGHUYBd00K1ilVAK2xSiMWJp55tQfZ0ecr9QjEsJ+J/efL4HqGNXhffxvypCXvbUYAFSddOwXUPo5BTKevpxMtH+2YrkpSjocWA04VnTYFiPG6U4ItKmbLOTFZtPzoez private", # noqa: E501
"AthenadUploadQueue": '[]'
}
params = default_params.copy()

@staticmethod
def restore_defaults():
MockParams.params = MockParams.default_params.copy()

def get(self, k, encoding=None):
ret = self.params.get(k)
ret = MockParams.params.get(k)
if ret is not None and encoding is not None:
ret = ret.decode(encoding)
return ret

def put(self, k, v):
if k not in MockParams.params:
raise KeyError(f"key: {k} not in MockParams")
MockParams.params[k] = v


class MockWebsocket():
Expand Down Expand Up @@ -112,4 +122,4 @@ def inner(*args, **kwargs):
finally:
p.terminate()

return inner
return inner
20 changes: 18 additions & 2 deletions selfdrive/athena/tests/test_athenad.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ class TestAthenadMethods(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.SOCKET_PORT = 45454
athenad.PARAMS = MockParams()
athenad.ROOT = tempfile.mkdtemp()
athenad.SWAGLOG_DIR = swaglog.SWAGLOG_DIR = tempfile.mkdtemp()
athenad.Params = MockParams
athenad.Api = MockApi
athenad.LOCAL_PORT_WHITELIST = set([cls.SOCKET_PORT])

def setUp(self):
MockParams.restore_defaults()
athenad.upload_queue = queue.Queue()
athenad.cur_upload_items.clear()
athenad.cancelled_uploads.clear()

for i in os.listdir(athenad.ROOT):
p = os.path.join(athenad.ROOT, i)
Expand Down Expand Up @@ -248,6 +250,20 @@ def test_listUploadQueue(self):
athenad.cancelled_uploads.add(item.id)
items = dispatcher["listUploadQueue"]()
self.assertEqual(len(items), 0)

def test_upload_queue_persistence(self):
item = athenad.UploadItem(path="_", url="_", headers={}, created_at=int(time.time()), id='id')

# serialize item
athenad.upload_queue.put_nowait(item)
athenad.cache_upload_queue(athenad.upload_queue)

# deserialize item
athenad.upload_queue.queue.clear()
athenad.init_upload_queue(athenad.upload_queue)

self.assertEqual(athenad.upload_queue.qsize(), 1)
self.assertDictEqual(athenad.upload_queue.queue[-1]._asdict(), item._asdict())

@mock.patch('selfdrive.athena.athenad.create_connection')
def test_startLocalProxy(self, mock_create_connection):
Expand Down Expand Up @@ -319,4 +335,4 @@ def test_get_logs_to_send_sorted(self):
self.assertListEqual(sl, fl[:-1])

if __name__ == '__main__':
unittest.main()
unittest.main()
3 changes: 2 additions & 1 deletion selfdrive/common/params.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class FileLock {
std::unordered_map<std::string, uint32_t> keys = {
{"AccessToken", CLEAR_ON_MANAGER_START | DONT_LOG},
{"AthenadPid", PERSISTENT},
{"AthenadUploadQueue", PERSISTENT},
{"BootedOnroad", CLEAR_ON_MANAGER_START | CLEAR_ON_IGNITION_OFF},
{"CalibrationParams", PERSISTENT},
{"CarBatteryCapacity", PERSISTENT},
Expand Down Expand Up @@ -330,4 +331,4 @@ void Params::clearAll(ParamKeyType key_type) {
// fsync parent directory
path = params_path + "/d";
fsync_dir(path.c_str());
}
}

0 comments on commit efc7e56

Please sign in to comment.