From 016a23c4c6d06ed14cff348ad18a2e87b10ab3ed Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 11:13:36 +0100 Subject: [PATCH 01/11] ignore the thread logs --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index a73270fd4d8e14..44116837ff27f0 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,8 @@ selfdrive/camerad/camerad selfdrive/modeld/_modeld selfdrive/modeld/_dmonitoringmodeld selfdrive/data_collection/gps-data +selfdrive/mapd/QueryThread-Thread.log +selfdrive/mapd/MapsdThread-Thread.log /src/ one From 81a296378af943983e0bf10a052c9949fe75ebcd Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 11:14:20 +0100 Subject: [PATCH 02/11] refactor mapd thanks to Patreon supports --- selfdrive/mapd/mapd.py | 747 ++++++++++++++++++++++------------------- 1 file changed, 407 insertions(+), 340 deletions(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index 7f2ae1b85af7dc..86a5273fbc87f0 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -1,7 +1,12 @@ #!/usr/bin/env python3 +# setup logging +import logging +import logging.handlers + # Add phonelibs openblas to LD_LIBRARY_PATH if import fails from scipy import spatial + import selfdrive.crash as crash #DEFAULT_SPEEDS_BY_REGION_JSON_FILE = BASEDIR + "/selfdrive/mapd/default_speeds_by_region.json" @@ -23,352 +28,414 @@ from selfdrive.mapd.mapd_helpers import MAPS_LOOKAHEAD_DISTANCE, Way, circle_through_points, rate_curvature_points -OVERPASS_API_URL = "https://z.overpass-api.de/api/interpreter" -OVERPASS_API_URL2 = "https://lz4.overpass-api.de/api/interpreter" -OVERPASS_HEADERS = { - 'User-Agent': 'NEOS (comma.ai)', - 'Accept-Encoding': 'gzip' -} - -last_gps = None -query_lock = threading.Lock() -last_query_result = None -last_query_pos = None -cache_valid = False - -def connected_to_internet(url='https://z.overpass-api.de/api/interpreter', timeout=5): - try: - requests.get(url, timeout=timeout) - return True - except: - print("No internet connection available.") - return False - -def connected_to_internet2(url='https://lz4.overpass-api.de/api/interpreter', timeout=5): - try: - requests.get(url, timeout=timeout) - return True - except: - print("No internet connection available.") - return False - -def build_way_query(lat, lon, radius=50): - """Builds a query to find all highways within a given radius around a point""" - pos = " (around:%f,%f,%f)" % (radius, lat, lon) - lat_lon = "(%f,%f)" % (lat, lon) - q = """( - way - """ + pos + """ - [highway][highway!~"^(footway|path|bridleway|steps|cycleway|construction|bus_guideway|escape)$"]; - >;);out;""" + """is_in""" + lat_lon + """;area._[admin_level~"[24]"]; - convert area ::id = id(), admin_level = t['admin_level'], - name = t['name'], "ISO3166-1:alpha2" = t['ISO3166-1:alpha2'];out; - """ - return q - - -def query_thread(): - global last_query_result, last_query_pos, cache_valid - #api = overpy.Overpass(url=OVERPASS_API_URL, headers=OVERPASS_HEADERS, timeout=20.) - api = overpy.Overpass(url=OVERPASS_API_URL) - - while True: - time.sleep(1) - if last_gps is not None: - fix_ok = last_gps.flags & 1 - if not fix_ok: - continue - - if last_query_pos is not None: - cur_ecef = geodetic2ecef((last_gps.latitude, last_gps.longitude, last_gps.altitude)) - prev_ecef = geodetic2ecef((last_query_pos.latitude, last_query_pos.longitude, last_query_pos.altitude)) - dist = np.linalg.norm(cur_ecef - prev_ecef) - if dist < 3000: #updated when we are 1km from the edge of the downloaded circle - continue - - if dist > 4000: - cache_valid = False - - q = build_way_query(last_gps.latitude, last_gps.longitude, radius=4000) - if connected_to_internet() or connected_to_internet2(): - try: - try: - new_result = api.query(q) - except: - api2 = overpy.Overpass(url=OVERPASS_API_URL2) - print("Using backup Server") - new_result = api2.query(q) - - # Build kd-tree - nodes = [] - real_nodes = [] - node_to_way = defaultdict(list) - location_info = {} - - for n in new_result.nodes: - nodes.append((float(n.lat), float(n.lon), 0)) - real_nodes.append(n) - - for way in new_result.ways: - for n in way.nodes: - node_to_way[n.id].append(way) - - for area in new_result.areas: - if area.tags.get('admin_level', '') == "2": - location_info['country'] = area.tags.get('ISO3166-1:alpha2', '') - if area.tags.get('admin_level', '') == "4": - location_info['region'] = area.tags.get('name', '') - - nodes = np.asarray(nodes) - nodes = geodetic2ecef(nodes) - tree = spatial.cKDTree(nodes) - - query_lock.acquire() - last_query_result = new_result, tree, real_nodes, node_to_way, location_info - last_query_pos = last_gps - cache_valid = True - query_lock.release() - - except Exception as e: - print(e) - crash.capture_warning(e) - query_lock.acquire() - last_query_result = None - query_lock.release() - else: - query_lock.acquire() - last_query_result = None - query_lock.release() - -def save_gps_data(gps): - try: - location = [gps.speed, gps.bearing, gps.latitude, gps.longitude, gps.altitude, gps.accuracy, time.time()] - with open("/data/openpilot/selfdrive/data_collection/gps-data", "a") as f: - f.write("{}\n".format(location)) - except: - pass - -def mapsd_thread(): - global last_gps - sm = messaging.SubMaster(['gpsLocationExternal', 'liveMapData']) - arne_sm = messaging_arne.SubMaster(['liveTrafficData']) - pm = messaging.PubMaster(['liveMapData']) - - cur_way = None - curvature_valid = False - curvature = None - upcoming_curvature = 0. - dist_to_turn = 0. - road_points = None - speedLimittraffic = 0 - speedLimittraffic_prev = 0 - max_speed = None - max_speed_ahead = None - max_speed_ahead_dist = None - - max_speed_prev = 0 - speedLimittrafficvalid = False - - while True: - sm.update(0) - arne_sm.update(0) - gps_ext = sm['gpsLocationExternal'] - traffic = arne_sm['liveTrafficData'] - if True: # todo: should this be `if sm.updated['liveTrafficData']:`? - if traffic.speedLimitValid: - speedLimittraffic = traffic.speedLimit - if abs(speedLimittraffic_prev - speedLimittraffic) > 0.1: - speedLimittrafficvalid = True - speedLimittraffic_prev = speedLimittraffic - else: - speedLimittrafficvalid = False - if traffic.speedAdvisoryValid: - speedLimittrafficAdvisory = traffic.speedAdvisory - speedLimittrafficAdvisoryvalid = True - else: - speedLimittrafficAdvisoryvalid = False - else: - speedLimittrafficAdvisoryvalid = False - - if sm.updated['gpsLocationExternal']: - gps = gps_ext - else: - continue - - save_gps_data(gps) - - last_gps = gps +# define LoggerThread class to implement logging functionality +class LoggerThread(threading.Thread): + def __init__(self, threadID, name): + threading.Thread.__init__(self) + self.threadID = threadID + self.name = name + self.logger = logging.getLogger(name) + h = logging.handlers.RotatingFileHandler(str(name)+'-Thread.log', 'a', 10*1024*1024, 5) + f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') + h.setFormatter(f) + self.logger.addHandler(h) + self.logger.setLevel(logging.DEBUG) - fix_ok = gps.flags & 1 - - if gps.accuracy > 2.0 and not speedLimittrafficvalid: - fix_ok = False - if not fix_ok or last_query_result is None or not cache_valid: - cur_way = None - curvature = None - max_speed_ahead = None - max_speed_ahead_dist = None - curvature_valid = False - upcoming_curvature = 0. - dist_to_turn = 0. - road_points = None - map_valid = False - else: - map_valid = True - lat = gps.latitude - lon = gps.longitude - heading = gps.bearing - speed = gps.speed - - query_lock.acquire() - cur_way = Way.closest(last_query_result, lat, lon, heading, cur_way) - if cur_way is not None: - pnts, curvature_valid = cur_way.get_lookahead(lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) - if pnts is not None: - xs = pnts[:, 0] - ys = pnts[:, 1] - road_points = [float(x) for x in xs], [float(y) for y in ys] - - if speed < 5: - curvature_valid = False - if curvature_valid and pnts.shape[0] <= 3: - curvature_valid = False - else: - curvature_valid = False - upcoming_curvature = 0. - curvature = None - dist_to_turn = 0. - # The curvature is valid when at least MAPS_LOOKAHEAD_DISTANCE of road is found - if curvature_valid: - # Compute the curvature for each point - with np.errstate(divide='ignore'): - circles = [circle_through_points(*p, direction=True) for p in zip(pnts, pnts[1:], pnts[2:])] - circles = np.asarray(circles) - radii = np.nan_to_num(circles[:, 2]) - radii[abs(radii) < 15.] = 10000 - - if cur_way.way.tags['highway'] == 'trunk': - radii = radii*1.6 # https://media.springernature.com/lw785/springer-static/image/chp%3A10.1007%2F978-3-658-01689-0_21/MediaObjects/298553_35_De_21_Fig65_HTML.gif - if cur_way.way.tags['highway'] == 'motorway' or cur_way.way.tags['highway'] == 'motorway_link': - radii = radii*2.8 - - curvature = 1. / radii - rate = [rate_curvature_points(*p) for p in zip(pnts[1:], pnts[2:],curvature[0:],curvature[1:])] - rate = ([0] + rate) - - curvature = np.abs(curvature) - curvature = np.multiply(np.minimum(np.multiply(rate,4000)+0.7,1.1),curvature) - # Index of closest point - closest = np.argmin(np.linalg.norm(pnts, axis=1)) - dist_to_closest = pnts[closest, 0] # We can use x distance here since it should be close - - # Compute distance along path - dists = list() - dists.append(0) - for p, p_prev in zip(pnts, pnts[1:, :]): - dists.append(dists[-1] + np.linalg.norm(p - p_prev)) - dists = np.asarray(dists) - dists = dists - dists[closest] + dist_to_closest - dists = dists[1:-1] - - close_idx = np.logical_and(dists > 0, dists < 500) - dists = dists[close_idx] - curvature = curvature[close_idx] - - if len(curvature): - curvature = np.nan_to_num(curvature) - - - upcoming_curvature = np.amax(curvature) - dist_to_turn =np.amin(dists[np.logical_and(curvature >= np.amax(curvature), curvature <= np.amax(curvature))]) - - - else: - upcoming_curvature = 0. - dist_to_turn = 999 - - query_lock.release() - - dat = messaging.new_message() - dat.init('liveMapData') - - if last_gps is not None: # TODO: this should never be None with SubMaster now - dat.liveMapData.lastGps = last_gps - - if cur_way is not None: - dat.liveMapData.wayId = cur_way.id - - # Speed limit - max_speed = cur_way.max_speed(heading) - max_speed_ahead = None - max_speed_ahead_dist = None - if max_speed is not None: - max_speed_ahead, max_speed_ahead_dist = cur_way.max_speed_ahead(max_speed, lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) - else: - max_speed_ahead, max_speed_ahead_dist = cur_way.max_speed_ahead(speed*1.1, lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) - # TODO: anticipate T junctions and right and left hand turns based on indicator - - if max_speed_ahead is not None and max_speed_ahead_dist is not None: - dat.liveMapData.speedLimitAheadValid = True - dat.liveMapData.speedLimitAhead = float(max_speed_ahead) - dat.liveMapData.speedLimitAheadDistance = float(max_speed_ahead_dist) - if max_speed is not None: - if abs(max_speed - max_speed_prev) > 0.1: - speedLimittrafficvalid = False - max_speed_prev = max_speed - advisory_max_speed = cur_way.advisory_max_speed() - if speedLimittrafficAdvisoryvalid: - dat.liveMapData.speedAdvisoryValid = True - dat.liveMapData.speedAdvisory = speedLimittrafficAdvisory / 3.6 - else: - if advisory_max_speed is not None: - dat.liveMapData.speedAdvisoryValid = True - dat.liveMapData.speedAdvisory = advisory_max_speed - - # Curvature - dat.liveMapData.curvatureValid = curvature_valid - dat.liveMapData.curvature = float(upcoming_curvature) - dat.liveMapData.distToTurn = float(dist_to_turn) - if road_points is not None: - dat.liveMapData.roadX, dat.liveMapData.roadY = road_points - if curvature is not None: - dat.liveMapData.roadCurvatureX = [float(x) for x in dists] - dat.liveMapData.roadCurvature = [float(x) for x in curvature] - - if speedLimittrafficvalid: - if speedLimittraffic > 0.1: - dat.liveMapData.speedLimitValid = True - dat.liveMapData.speedLimit = speedLimittraffic / 3.6 - map_valid = False - else: + def run(self): + pass # will be overridden in the child class + +class QueryThread(LoggerThread): + def __init__(self, threadID, name, sharedParams={}): # sharedParams is dict of params shared between two threads + # invoke parent constructor https://stackoverflow.com/questions/2399307/how-to-invoke-the-super-constructor-in-python + LoggerThread.__init__(self, threadID, name) + self.sharedParams = sharedParams + # memorize some parameters + self.OVERPASS_API_URL = "https://z.overpass-api.de/api/interpreter" + self.OVERPASS_API_URL2 = "https://lz4.overpass-api.de/api/interpreter" + self.OVERPASS_HEADERS = { + 'User-Agent': 'NEOS (comma.ai)', + 'Accept-Encoding': 'gzip' + } + + def is_connected_to_internet(self, timeout=5): + try: + requests.get(self.OVERPASS_API_URL, timeout=timeout) + self.logger.debug("connection 1 active") + return True + except: + self.logger.error("No internet connection available.") + return False + + def is_connected_to_internet2(self, timeout=5): + try: + requests.get(self.OVERPASS_API_URL2, timeout=timeout) + self.logger.debug("connection 2 active") + return True + except: + self.logger.error("No internet connection available.") + return False + + def build_way_query(self, lat, lon, radius=50): + """Builds a query to find all highways within a given radius around a point""" + pos = " (around:%f,%f,%f)" % (radius, lat, lon) + lat_lon = "(%f,%f)" % (lat, lon) + q = """( + way + """ + pos + """ + [highway][highway!~"^(footway|path|bridleway|steps|cycleway|construction|bus_guideway|escape)$"]; + >;);out;""" + """is_in""" + lat_lon + """;area._[admin_level~"[24]"]; + convert area ::id = id(), admin_level = t['admin_level'], + name = t['name'], "ISO3166-1:alpha2" = t['ISO3166-1:alpha2'];out; + """ + self.logger.debug("build_way_query : %s" % str(q)) + return q + + + def run(self): + self.logger.debug("run method started for thread %s" % self.name) + api = overpy.Overpass(url=self.OVERPASS_API_URL) + # for now we follow old logic, will be optimized later + while True: + time.sleep(1) + self.logger.debug("Starting after sleeping for 1 second ...") + self.logger.debug("sharedParams = %s" % str(self.sharedParams)) + last_gps = self.sharedParams.get('last_gps', None) + self.logger.debug("last_gps = %s" % str(last_gps)) + if last_gps is not None: + fix_ok = last_gps.flags & 1 + if not fix_ok: + continue + + last_query_pos = self.sharedParams.get('last_query_pos', None) + if last_query_pos is not None: + cur_ecef = geodetic2ecef((last_gps.latitude, last_gps.longitude, last_gps.altitude)) + prev_ecef = geodetic2ecef((last_query_pos.latitude, last_query_pos.longitude, last_query_pos.altitude)) + dist = np.linalg.norm(cur_ecef - prev_ecef) + if dist < 3000: #updated when we are 1km from the edge of the downloaded circle + continue + self.logger.debug("parameters, cur_ecef = %s, prev_ecef = %s, dist=%s" % (str(cur_ecef), str(prev_ecef), str(dist))) + + if dist > 4000: + cache_valid = False + + # this line was added because logic broke + if last_gps is not None: + q = self.build_way_query(last_gps.latitude, last_gps.longitude, radius=4000) + if self.is_connected_to_internet() or self.is_connected_to_internet2(): + try: + try: + new_result = api.query(q) + self.logger.debug("new_result = %s" % str(new_result)) + except: + api2 = overpy.Overpass(url=OVERPASS_API_URL2) + self.logger.error("Using backup Server") + new_result = api2.query(q) + + # Build kd-tree + nodes = [] + real_nodes = [] + node_to_way = defaultdict(list) + location_info = {} + + for n in new_result.nodes: + nodes.append((float(n.lat), float(n.lon), 0)) + real_nodes.append(n) + + for way in new_result.ways: + for n in way.nodes: + node_to_way[n.id].append(way) + + for area in new_result.areas: + if area.tags.get('admin_level', '') == "2": + location_info['country'] = area.tags.get('ISO3166-1:alpha2', '') + if area.tags.get('admin_level', '') == "4": + location_info['region'] = area.tags.get('name', '') + + nodes = np.asarray(nodes) + nodes = geodetic2ecef(nodes) + tree = spatial.cKDTree(nodes) + self.logger.debug("query thread, ...", nodes, tree) + + # write result + query_lock = self.sharedParams.get('query_lock', None) + query_lock.acquire() + self.sharedParams['last_query_result'] = new_result, tree, real_nodes, node_to_way, location_info + self.sharedParams['last_query_pos'] = last_gps + self.sharedParams['cache_valid'] = True + + query_lock.release() + + except Exception as e: + self.logger.error("ERROR :" + str(e)) + crash.capture_warning(e) + query_lock = self.sharedParams.get('query_lock', None) + query_lock.acquire() + last_query_result = None + query_lock.release() + else: + query_lock = self.sharedParams.get('query_lock', None) + query_lock.acquire() + last_query_result = None + query_lock.release() + + self.logger.debug("end of one cycle in endless loop ...") + +class MapsdThread(LoggerThread): + def __init__(self, threadID, name, sharedParams={}): + # invoke parent constructor + LoggerThread.__init__(self, threadID, name) + self.sharedParams = sharedParams + self.sm = messaging.SubMaster(['gpsLocationExternal', 'liveMapData']) + self.arne_sm = messaging_arne.SubMaster(['liveTrafficData']) + self.pm = messaging.PubMaster(['liveMapData']) + self.logger.debug("entered mapsd_thread, ... %s, %s, %s" % (str(self.sm), str(self.arne_sm), str(self.pm))) + + def run(self): + self.logger.debug("Entered run method for thread :" + str(self.name)) + cur_way = None + curvature_valid = False + curvature = None + upcoming_curvature = 0. + dist_to_turn = 0. + road_points = None + speedLimittraffic = 0 + speedLimittraffic_prev = 0 + max_speed = None + max_speed_ahead = None + max_speed_ahead_dist = None + + max_speed_prev = 0 speedLimittrafficvalid = False - else: - if max_speed is not None and map_valid: - dat.liveMapData.speedLimitValid = True - dat.liveMapData.speedLimit = max_speed - - dat.liveMapData.mapValid = map_valid - pm.send('liveMapData', dat) + while True: + time.sleep(1) + self.logger.debug("starting new cycle in endless loop") + self.sm.update(0) + self.arne_sm.update(0) + gps_ext = self.sm['gpsLocationExternal'] + traffic = self.arne_sm['liveTrafficData'] + # if True: # todo: should this be `if sm.updated['liveTrafficData']:`? + # commented out the previous line because it does not make sense + + self.logger.debug("got gps_ext = %s" % str(gps_ext)) + if traffic.speedLimitValid: + speedLimittraffic = traffic.speedLimit + if abs(speedLimittraffic_prev - speedLimittraffic) > 0.1: + speedLimittrafficvalid = True + speedLimittraffic_prev = speedLimittraffic + else: + speedLimittrafficvalid = False + if traffic.speedAdvisoryValid: + speedLimittrafficAdvisory = traffic.speedAdvisory + speedLimittrafficAdvisoryvalid = True + else: + speedLimittrafficAdvisoryvalid = False + # commented out because it is never going to happen + # else: + # speedLimittrafficAdvisoryvalid = False + + if self.sm.updated['gpsLocationExternal']: + gps = gps_ext + else: + continue + + # save_gps_data(gps) + query_lock = self.sharedParams.get('query_lock', None) + # last_gps = self.sharedParams.get('last_gps', None) + query_lock.acquire() + self.sharedParams['last_gps'] = gps + self.logger.debug("setting last_gps to %s" % str(gps)) + query_lock.release() + + fix_ok = gps.flags & 1 + self.logger.debug("fix_ok = %s" % str(fix_ok)) + + if gps.accuracy > 2.0 and not speedLimittrafficvalid: + fix_ok = False + if not fix_ok or last_query_result is None or not cache_valid: + self.logger.debug("fix_ok %s" % fix_ok) + self.logger.error("Error in fix_ok logic") + cur_way = None + curvature = None + max_speed_ahead = None + max_speed_ahead_dist = None + curvature_valid = False + upcoming_curvature = 0. + dist_to_turn = 0. + road_points = None + map_valid = False + else: + map_valid = True + lat = gps.latitude + lon = gps.longitude + heading = gps.bearing + speed = gps.speed + + query_lock.acquire() + cur_way = Way.closest(last_query_result, lat, lon, heading, cur_way) + if cur_way is not None: + self.logger.debug("cur_way is not None ...") + pnts, curvature_valid = cur_way.get_lookahead(lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) + if pnts is not None: + xs = pnts[:, 0] + ys = pnts[:, 1] + road_points = [float(x) for x in xs], [float(y) for y in ys] + + if speed < 5: + curvature_valid = False + if curvature_valid and pnts.shape[0] <= 3: + curvature_valid = False + else: + curvature_valid = False + upcoming_curvature = 0. + curvature = None + dist_to_turn = 0. + # The curvature is valid when at least MAPS_LOOKAHEAD_DISTANCE of road is found + if curvature_valid: + # Compute the curvature for each point + with np.errstate(divide='ignore'): + circles = [circle_through_points(*p, direction=True) for p in zip(pnts, pnts[1:], pnts[2:])] + circles = np.asarray(circles) + radii = np.nan_to_num(circles[:, 2]) + radii[abs(radii) < 15.] = 10000 + + if cur_way.way.tags['highway'] == 'trunk': + radii = radii*1.6 # https://media.springernature.com/lw785/springer-static/image/chp%3A10.1007%2F978-3-658-01689-0_21/MediaObjects/298553_35_De_21_Fig65_HTML.gif + if cur_way.way.tags['highway'] == 'motorway' or cur_way.way.tags['highway'] == 'motorway_link': + radii = radii*2.8 + + curvature = 1. / radii + rate = [rate_curvature_points(*p) for p in zip(pnts[1:], pnts[2:],curvature[0:],curvature[1:])] + rate = ([0] + rate) + + curvature = np.abs(curvature) + curvature = np.multiply(np.minimum(np.multiply(rate,4000)+0.7,1.1),curvature) + # Index of closest point + closest = np.argmin(np.linalg.norm(pnts, axis=1)) + dist_to_closest = pnts[closest, 0] # We can use x distance here since it should be close + + # Compute distance along path + dists = list() + dists.append(0) + for p, p_prev in zip(pnts, pnts[1:, :]): + dists.append(dists[-1] + np.linalg.norm(p - p_prev)) + dists = np.asarray(dists) + dists = dists - dists[closest] + dist_to_closest + dists = dists[1:-1] + + close_idx = np.logical_and(dists > 0, dists < 500) + dists = dists[close_idx] + curvature = curvature[close_idx] + + if len(curvature): + curvature = np.nan_to_num(curvature) + + + upcoming_curvature = np.amax(curvature) + dist_to_turn =np.amin(dists[np.logical_and(curvature >= np.amax(curvature), curvature <= np.amax(curvature))]) + + + else: + upcoming_curvature = 0. + dist_to_turn = 999 + + query_lock.release() + + dat = messaging.new_message() + dat.init('liveMapData') + + if last_gps is not None: # TODO: this should never be None with SubMaster now + dat.liveMapData.lastGps = last_gps + + if cur_way is not None: + dat.liveMapData.wayId = cur_way.id + + # Speed limit + max_speed = cur_way.max_speed(heading) + max_speed_ahead = None + max_speed_ahead_dist = None + if max_speed is not None: + max_speed_ahead, max_speed_ahead_dist = cur_way.max_speed_ahead(max_speed, lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) + else: + max_speed_ahead, max_speed_ahead_dist = cur_way.max_speed_ahead(speed*1.1, lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) + # TODO: anticipate T junctions and right and left hand turns based on indicator + + if max_speed_ahead is not None and max_speed_ahead_dist is not None: + dat.liveMapData.speedLimitAheadValid = True + dat.liveMapData.speedLimitAhead = float(max_speed_ahead) + dat.liveMapData.speedLimitAheadDistance = float(max_speed_ahead_dist) + if max_speed is not None: + if abs(max_speed - max_speed_prev) > 0.1: + speedLimittrafficvalid = False + max_speed_prev = max_speed + advisory_max_speed = cur_way.advisory_max_speed() + if speedLimittrafficAdvisoryvalid: + dat.liveMapData.speedAdvisoryValid = True + dat.liveMapData.speedAdvisory = speedLimittrafficAdvisory / 3.6 + else: + if advisory_max_speed is not None: + dat.liveMapData.speedAdvisoryValid = True + dat.liveMapData.speedAdvisory = advisory_max_speed + + # Curvature + dat.liveMapData.curvatureValid = curvature_valid + dat.liveMapData.curvature = float(upcoming_curvature) + dat.liveMapData.distToTurn = float(dist_to_turn) + if road_points is not None: + dat.liveMapData.roadX, dat.liveMapData.roadY = road_points + if curvature is not None: + dat.liveMapData.roadCurvatureX = [float(x) for x in dists] + dat.liveMapData.roadCurvature = [float(x) for x in curvature] + + if speedLimittrafficvalid: + if speedLimittraffic > 0.1: + dat.liveMapData.speedLimitValid = True + dat.liveMapData.speedLimit = speedLimittraffic / 3.6 + map_valid = False + else: + speedLimittrafficvalid = False + else: + if max_speed is not None and map_valid: + dat.liveMapData.speedLimitValid = True + dat.liveMapData.speedLimit = max_speed + + dat.liveMapData.mapValid = map_valid + # + self.logger.debug("Sending ... liveMapData ... %s", str(dat)) + self.pm.send('liveMapData', dat) -def main(gctx=None): - params = Params() - dongle_id = params.get("DongleId") - crash.bind_user(id=dongle_id) - crash.bind_extra(version=version, dirty=dirty, is_eon=True) - crash.install() - main_thread = threading.Thread(target=mapsd_thread) - main_thread.daemon = True - main_thread.start() - q_thread = threading.Thread(target=query_thread) - q_thread.daemon = True - q_thread.start() - while True: - time.sleep(0.1) +if __name__ == "__main__": + # initialize gps parameters + # initialize last_gps + import time + current_milli_time = lambda: int(round(time.time() * 1000)) + #from collections import namedtuple + #MyGpsData = namedtuple('MyGpsData','latitude, longitude, altitude, speed, bearing, accuracy, flags, timestamp') -if __name__ == "__main__": - main() + #last_gps = MyGpsData(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1, current_milli_time) + # + + + + # setup shared parameters + last_gps = None + query_lock = threading.Lock() + last_query_result = None + last_query_pos = None + cache_valid = False + + sharedParams = {'last_gps' : last_gps, 'query_lock' : query_lock, 'last_query_result' : last_query_result, \ + 'last_query_pos' : last_query_pos, 'cache_valid' : cache_valid} + + qt = QueryThread(1, "QueryThread", sharedParams=sharedParams) + mt = MapsdThread(2, "MapsdThread", sharedParams=sharedParams) + + qt.start() + mt.start() + # qt.run() + # qt.join() + print("threads started") From 4b2546aff7df5fe2f721ccfb99d45183195142f8 Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 11:50:11 +0100 Subject: [PATCH 03/11] Fixes for travis --- selfdrive/mapd/mapd.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index 86a5273fbc87f0..bab54d54ee638d 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -116,7 +116,7 @@ def run(self): self.logger.debug("parameters, cur_ecef = %s, prev_ecef = %s, dist=%s" % (str(cur_ecef), str(prev_ecef), str(dist))) if dist > 4000: - cache_valid = False + self.sharedParams['cache_valid'] = False # this line was added because logic broke if last_gps is not None: @@ -127,7 +127,7 @@ def run(self): new_result = api.query(q) self.logger.debug("new_result = %s" % str(new_result)) except: - api2 = overpy.Overpass(url=OVERPASS_API_URL2) + api2 = overpy.Overpass(url=self.OVERPASS_API_URL2) self.logger.error("Using backup Server") new_result = api2.query(q) @@ -170,12 +170,12 @@ def run(self): crash.capture_warning(e) query_lock = self.sharedParams.get('query_lock', None) query_lock.acquire() - last_query_result = None + self.sharedParams['last_query_result'] = None query_lock.release() else: query_lock = self.sharedParams.get('query_lock', None) query_lock.acquire() - last_query_result = None + self.sharedParams['last_query_result'] = None query_lock.release() self.logger.debug("end of one cycle in endless loop ...") @@ -252,7 +252,7 @@ def run(self): if gps.accuracy > 2.0 and not speedLimittrafficvalid: fix_ok = False - if not fix_ok or last_query_result is None or not cache_valid: + if not fix_ok or self.sharedParams['last_query_result'] is None or not cache_valid: self.logger.debug("fix_ok %s" % fix_ok) self.logger.error("Error in fix_ok logic") cur_way = None @@ -272,7 +272,7 @@ def run(self): speed = gps.speed query_lock.acquire() - cur_way = Way.closest(last_query_result, lat, lon, heading, cur_way) + cur_way = Way.closest(self.sharedParams['last_query_result'], lat, lon, heading, cur_way) if cur_way is not None: self.logger.debug("cur_way is not None ...") pnts, curvature_valid = cur_way.get_lookahead(lat, lon, heading, MAPS_LOOKAHEAD_DISTANCE) @@ -408,9 +408,13 @@ def run(self): if __name__ == "__main__": + params = Params() + dongle_id = params.get("DongleId") + crash.bind_user(id=dongle_id) + crash.bind_extra(version=version, dirty=dirty, is_eon=True) + crash.install() # initialize gps parameters # initialize last_gps - import time current_milli_time = lambda: int(round(time.time() * 1000)) #from collections import namedtuple From 666e0a98330ff19f6b1c248f3dd604350b53c8a6 Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 12:24:18 +0100 Subject: [PATCH 04/11] m --- selfdrive/mapd/mapd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index bab54d54ee638d..8f1c5c2e9eec8b 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -154,7 +154,7 @@ def run(self): nodes = np.asarray(nodes) nodes = geodetic2ecef(nodes) tree = spatial.cKDTree(nodes) - self.logger.debug("query thread, ...", nodes, tree) + #self.logger.debug("query thread, ...", nodes, tree) # write result query_lock = self.sharedParams.get('query_lock', None) From ed58145767cc7564e32ebbe87ca565cb1de8341f Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 13:12:07 +0100 Subject: [PATCH 05/11] fix nodes and tree logger --- selfdrive/mapd/mapd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index 8f1c5c2e9eec8b..4ef0797a79fc0d 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -154,7 +154,7 @@ def run(self): nodes = np.asarray(nodes) nodes = geodetic2ecef(nodes) tree = spatial.cKDTree(nodes) - #self.logger.debug("query thread, ...", nodes, tree) + self.logger.debug("query thread, ... %s" % (str(nodes), str(tree))) # write result query_lock = self.sharedParams.get('query_lock', None) From 5e9a7e32f8ebf0a63199c0cb08872665dbf7d44b Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 13:31:50 +0100 Subject: [PATCH 06/11] Fix cache_valid --- selfdrive/mapd/mapd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index 4ef0797a79fc0d..cd7020806df4d3 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -252,7 +252,7 @@ def run(self): if gps.accuracy > 2.0 and not speedLimittrafficvalid: fix_ok = False - if not fix_ok or self.sharedParams['last_query_result'] is None or not cache_valid: + if not fix_ok or self.sharedParams['last_query_result'] is None or not self.sharedParams['cache_valid']: self.logger.debug("fix_ok %s" % fix_ok) self.logger.error("Error in fix_ok logic") cur_way = None From ce130bfac7b229d299f7310573b2eb66e45c0de5 Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 13:34:01 +0100 Subject: [PATCH 07/11] redo nodes and trees output --- selfdrive/mapd/mapd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index cd7020806df4d3..be0af7764e64c6 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -154,7 +154,7 @@ def run(self): nodes = np.asarray(nodes) nodes = geodetic2ecef(nodes) tree = spatial.cKDTree(nodes) - self.logger.debug("query thread, ... %s" % (str(nodes), str(tree))) + self.logger.debug("query thread, ... %s %s" % (str(nodes), str(tree))) # write result query_lock = self.sharedParams.get('query_lock', None) From fe4e808ae14022175b8f7b96d8097397607524e5 Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 13:59:39 +0100 Subject: [PATCH 08/11] update ignore for old log files --- .gitignore | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.gitignore b/.gitignore index 44116837ff27f0..7c6d0598755331 100644 --- a/.gitignore +++ b/.gitignore @@ -47,7 +47,17 @@ selfdrive/modeld/_modeld selfdrive/modeld/_dmonitoringmodeld selfdrive/data_collection/gps-data selfdrive/mapd/QueryThread-Thread.log +selfdrive/mapd/QueryThread-Thread.log.1 +selfdrive/mapd/QueryThread-Thread.log.2 +selfdrive/mapd/QueryThread-Thread.log.3 +selfdrive/mapd/QueryThread-Thread.log.4 +selfdrive/mapd/QueryThread-Thread.log.5 selfdrive/mapd/MapsdThread-Thread.log +selfdrive/mapd/MapsdThread-Thread.log.1 +selfdrive/mapd/MapsdThread-Thread.log.2 +selfdrive/mapd/MapsdThread-Thread.log.3 +selfdrive/mapd/MapsdThread-Thread.log.4 +selfdrive/mapd/MapsdThread-Thread.log.5 /src/ one From 990b54feff668c4d11304a7eb8f3b76bb2c21189 Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 14:20:58 +0100 Subject: [PATCH 09/11] q has to be made before query --- selfdrive/mapd/mapd.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index be0af7764e64c6..c502ec9d2fcb55 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -118,10 +118,8 @@ def run(self): if dist > 4000: self.sharedParams['cache_valid'] = False - # this line was added because logic broke - if last_gps is not None: + if last_gps is not None and (self.is_connected_to_internet() or self.is_connected_to_internet2()): q = self.build_way_query(last_gps.latitude, last_gps.longitude, radius=4000) - if self.is_connected_to_internet() or self.is_connected_to_internet2(): try: try: new_result = api.query(q) From b515b04d4cc9e0dbef4a510935694e2954d9d844 Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 20:35:48 +0100 Subject: [PATCH 10/11] Disable logging for now --- selfdrive/mapd/mapd.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index c502ec9d2fcb55..52799e30701fe7 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -4,6 +4,7 @@ import logging import logging.handlers + # Add phonelibs openblas to LD_LIBRARY_PATH if import fails from scipy import spatial @@ -39,7 +40,7 @@ def __init__(self, threadID, name): f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) self.logger.addHandler(h) - self.logger.setLevel(logging.DEBUG) + self.logger.setLevel(logging.CRITICAL) # set to logging.DEBUG to enable logging def run(self): pass # will be overridden in the child class From 6e216bd7183c6a1388a933948eaf9a19c84c892c Mon Sep 17 00:00:00 2001 From: Arne Schwarck Date: Mon, 24 Feb 2020 20:39:36 +0100 Subject: [PATCH 11/11] Run MapsdThread as fast as possible --- selfdrive/mapd/mapd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/selfdrive/mapd/mapd.py b/selfdrive/mapd/mapd.py index 52799e30701fe7..9635a06a22658e 100755 --- a/selfdrive/mapd/mapd.py +++ b/selfdrive/mapd/mapd.py @@ -207,7 +207,7 @@ def run(self): speedLimittrafficvalid = False while True: - time.sleep(1) + #time.sleep(1) self.logger.debug("starting new cycle in endless loop") self.sm.update(0) self.arne_sm.update(0)