diff --git a/pokemongo_bot/test/polyline_generator_test.py b/pokemongo_bot/test/polyline_generator_test.py index 34ef181bf3..d0ee89c451 100644 --- a/pokemongo_bot/test/polyline_generator_test.py +++ b/pokemongo_bot/test/polyline_generator_test.py @@ -5,11 +5,6 @@ import datetime import time -mock_start, mock_seven_sec, mock_end = Mock(), Mock(), Mock() -mock_start.return_value = time.mktime(datetime.datetime(2016, 8, 20).timetuple()) -mock_seven_sec.return_value = time.mktime(datetime.datetime(2016, 8, 20, 0 , 0, 7).timetuple()) -mock_end.return_value = time.mktime(datetime.datetime(2016, 8, 21).timetuple()) - ex_orig = (47.1706378, 8.5167405) ex_dest = (47.1700271, 8.518072999999998) ex_speed = 2.5 @@ -22,7 +17,6 @@ class PolylineTestCase(unittest.TestCase): - @patch('time.time', mock_start) def setUp(self): directions_path = os.path.join(os.path.dirname(__file__), 'resources', ex_resp_directions) with open(directions_path, 'rb') as directions: @@ -39,100 +33,39 @@ def setUp(self): ), json=ex_elevations, status_code=200) self.polyline = Polyline(ex_orig, ex_dest, ex_speed) - @patch('time.time', mock_start) - def test_reset_timestamps(self): - timestamp = self.polyline._timestamp - is_paused = self.polyline.is_paused - last_pause = self.polyline._last_paused_timestamp - total_pause = self.polyline._paused_total - self.polyline.pause() - @patch('time.time', mock_seven_sec) - def unpause(): - self.polyline.unpause() - unpause() - self.polyline.reset_timestamps() - self.assertEquals(timestamp, self.polyline._timestamp) - self.assertEquals(is_paused, self.polyline.is_paused) - self.assertEquals(last_pause, self.polyline._last_paused_timestamp) - self.assertEquals(total_pause, self.polyline._paused_total) - - def test_walk_step_no_points(self): - self.assertEquals(self.polyline.walk_steps(None), []) - self.assertEquals(self.polyline.walk_steps([]), []) - def test_first_point(self): - self.assertEqual(self.polyline.points[0], ex_orig) + self.assertEqual(self.polyline._points[0], ex_orig) def test_last_point(self): - self.assertEqual(self.polyline.points[-1], ex_dest) + self.assertEqual(self.polyline._points[-1], ex_dest) - @patch('time.time', mock_start) def test_pos_and_alt_at_time_mock_start(self): - self.assertEquals(self.polyline.get_pos(), ex_orig) - self.assertEquals(self.polyline.get_alt(), self.polyline.polyline_elevations[0]) + self.polyline.set_speed(0) + lat, lng = self.polyline.get_pos() + self.assertAlmostEqual(lat, ex_orig[0], places=5) + self.assertAlmostEqual(lng, ex_orig[1], places=5) + self.assertEquals(self.polyline.get_alt(), 429.5892333984375) - @patch('time.time', mock_seven_sec) def test_pos_and_alt_at_time_mock_seven_sec(self): - self.assertEquals(self.polyline.get_pos(), (47.17048868132221, 8.516689560440737)) - self.assertAlmostEqual(self.polyline.get_alt(), self.polyline.polyline_elevations[6], places=1) + self.polyline.set_speed(self.polyline.speed*7) + lat, lng = self.polyline.get_pos() + self.assertAlmostEqual(lat, 47.17048865309, places=5) + self.assertAlmostEqual(lng, 8.516689707618, places=5) + self.assertAlmostEqual(self.polyline.get_alt(), 428.65, places=1) - @patch('time.time', mock_end) def test_pos_and_alt_at_time_mock_end(self): - self.assertEquals(self.polyline.get_pos(), ex_dest) - self.assertEquals(self.polyline.get_alt(), self.polyline.polyline_elevations[-1]) + self.polyline.set_speed(300) + lat, lng = self.polyline.get_pos() + self.assertAlmostEqual(lat, ex_dest[0], places=5) + self.assertAlmostEqual(lng, ex_dest[1], places=5) + self.assertEquals(self.polyline.get_alt(), 437.9155883789062) def test_nr_of_elevations_returned(self): - total_seconds = self.polyline.get_total_distance(self.polyline.points)/self.polyline.speed + total_seconds = self.polyline.get_total_distance() / self.polyline.speed self.assertAlmostEqual(total_seconds, ex_nr_samples, places=0) - def test_conversion_factor(self): - self.polyline.speed = 0.0001 - self.polyline.polyline_elevations = [100]*512 - self.assertEquals(self.polyline.get_alt(), 100) - - def test_no_points(self): - self.polyline.points = [ex_orig] - self.assertEquals(self.polyline.get_pos(), ex_orig) - self.polyline.points = [ex_dest] - self.assertEquals(self.polyline.get_pos(), ex_dest) - - def test_pause(self): - self.assertEquals(self.polyline._last_paused_timestamp, None) - @patch('time.time', mock_start) - def pause(): - self.polyline.pause() - self.assertEquals(self.polyline.is_paused, True) - self.assertEquals(self.polyline._last_paused_timestamp, time.time()) - pause() - - @patch('time.time', mock_seven_sec) - def position_check(): - self.assertEquals(self.polyline.get_pos(), ex_orig) - self.assertEquals(self.polyline.get_alt(), self.polyline.polyline_elevations[0]) - position_check() - - def test_unpause(self): - @patch('time.time', mock_start) - def pause(): - self.polyline.pause() - pause() - self.assertEquals(self.polyline.is_paused, True) - @patch('time.time', mock_seven_sec) - def unpause(): - self.polyline.unpause() - unpause() - self.assertEquals(self.polyline.is_paused, False) - self.assertEquals(self.polyline._last_paused_timestamp, None) - self.assertEquals(self.polyline._paused_total, 7) - - @patch('time.time', mock_seven_sec) - def position_check(): - self.assertEquals(self.polyline.get_pos(), ex_orig) - self.assertEquals(self.polyline.get_alt(), self.polyline.polyline_elevations[0]) - position_check() - def test_total_distance(self): - self.assertEquals(self.polyline.get_total_distance(self.polyline.points), ex_total_distance) + self.assertEquals(self.polyline.get_total_distance(), ex_total_distance) def test_get_last_pos(self): self.assertEquals(self.polyline.get_last_pos(), self.polyline._last_pos) diff --git a/pokemongo_bot/walkers/polyline_generator.py b/pokemongo_bot/walkers/polyline_generator.py index 80ab744d13..dbfb8254f7 100644 --- a/pokemongo_bot/walkers/polyline_generator.py +++ b/pokemongo_bot/walkers/polyline_generator.py @@ -1,12 +1,14 @@ -import time +# -*- coding: utf-8 -*- +from geopy.distance import VincentyDistance +from geopy import Point from itertools import chain -from math import ceil -from random import uniform import haversine +import math import polyline import requests + class PolylineObjectHandler: ''' Does this need to be a class? @@ -55,19 +57,19 @@ def cached_polyline(origin, destination, speed, google_map_api_key=None): class Polyline(object): def __init__(self, origin, destination, speed, google_map_api_key=None): - self.DIRECTIONS_API_URL='https://maps.googleapis.com/maps/api/directions/json?mode=walking' + self.speed = float(speed) self.origin = origin self.destination = tuple(destination) + self.DIRECTIONS_API_URL='https://maps.googleapis.com/maps/api/directions/json?mode=walking' self.DIRECTIONS_URL = '{}&origin={}&destination={}'.format(self.DIRECTIONS_API_URL, '{},{}'.format(*self.origin), '{},{}'.format(*self.destination)) if google_map_api_key: self.DIRECTIONS_URL = '{}&key={}'.format(self.DIRECTIONS_URL, google_map_api_key) - - self.directions_response = requests.get(self.DIRECTIONS_URL).json() + self._directions_response = requests.get(self.DIRECTIONS_URL).json() try: - self.polyline_points = [x['polyline']['points'] for x in - self.directions_response['routes'][0]['legs'][0]['steps']] + self._directions_encoded_points = [x['polyline']['points'] for x in + self._directions_response['routes'][0]['legs'][0]['steps']] except IndexError: # This handles both cases: # a) In case of API limit reached we get back we get a status 200 code with an empty routes [] @@ -81,128 +83,142 @@ def __init__(self, origin, destination, speed, google_map_api_key=None): # "routes" : [], # "status" : "ZERO_RESULTS" # } - self.polyline_points = self.directions_response['routes'] - self.points = [self.origin] + self.get_points(self.polyline_points) + [self.destination] - self.speed = float(speed) - self.lat, self.long = self.points[0][0], self.points[0][1] - self.polyline = self.combine_polylines(self.points) - self.elevation_samples = int(min(self.get_total_distance(self.points)/self.speed +1, 512)) + self._directions_encoded_points = self._directions_response['routes'] + self._points = [self.origin] + self._get_directions_points() + [self.destination] + self._polyline = self._get_encoded_points() + self._last_pos = self._points[0] + self._step_dict = self._get_steps_dict() + self._step_keys = sorted(self._step_dict.keys()) + self._last_step = 0 + + self._nr_samples = int(min(self.get_total_distance() / self.speed + 1, 512)) self.ELEVATION_API_URL='https://maps.googleapis.com/maps/api/elevation/json?path=enc:' self.ELEVATION_URL = '{}{}&samples={}'.format(self.ELEVATION_API_URL, - self.polyline, self.elevation_samples) - + self._polyline, self._nr_samples) if google_map_api_key: self.ELEVATION_URL = '{}&key={}'.format(self.ELEVATION_URL, google_map_api_key) - - self.elevation_response = requests.get(self.ELEVATION_URL).json() - self.polyline_elevations = [x['elevation'] for x in self.elevation_response['results']] or [None] - self._timestamp = time.time() - self.is_paused = False - self._last_paused_timestamp = None - self._paused_total = 0.0 - self._last_pos = (None, None) - - def reset_timestamps(self): - self._timestamp = time.time() - self.is_paused = False - self._last_paused_timestamp = None - self._paused_total = 0.0 - - def get_points(self, polyline_points): - crd_points = [] - for points in polyline_points: - crd_points += polyline.decode(points) - crd_points = [x for n,x in enumerate(crd_points) if x not in crd_points[:n]] - return crd_points - - def combine_polylines(self, points): - return polyline.encode(points) - - def pause(self): - if not self.is_paused: - self.is_paused = True - self._last_paused_timestamp = time.time() - - def unpause(self): - if self.is_paused: - self.is_paused = False - self._paused_total += time.time() - self._last_paused_timestamp - self._last_paused_timestamp = None - - def walk_steps(self, points): - if points: - steps = zip(chain([points[0]], points), - chain(points, [points[-1]])) + self._elevation_response = requests.get(self.ELEVATION_URL).json() + self._elevation_at_point = dict((tuple(x['location'].values()), + x['elevation']) for x in + self._elevation_response['results']) + + def _get_directions_points(self): + points = [] + for point in self._directions_encoded_points: + points += polyline.decode(point) + return [x for n,x in enumerate(points) if x not in points[:n]] + + def _get_encoded_points(self): + return polyline.encode(self._points) + + def _get_walk_steps(self): + if self._points: + steps = zip(chain([self._points[0]], self._points), + chain(self._points, [self._points[-1]])) steps = filter(None, [(o, d) if o != d else None for o, d in steps]) - # consume the filter as list https://github.com/th3w4y/PokemonGo-Bot/issues/27 + # consume the filter as list return list(steps) else: return [] + def _get_steps_dict(self): + walked_distance = 0.0 + steps_dict = {} + for step in self._get_walk_steps(): + walked_distance += haversine.haversine(*step) * 1000 + steps_dict[walked_distance] = step + return steps_dict + def get_alt(self): - max_nr_samples = 512.0 - total_seconds = self.get_total_distance(self.points)/self.speed - if total_seconds >= max_nr_samples: - conversion_factor = max_nr_samples/total_seconds - else: - conversion_factor = 1 - if not self.is_paused: - time_passed = time.time() - else: - time_passed = self._last_paused_timestamp - seconds_passed = abs(time_passed - self._timestamp - self._paused_total) - elevation_index = int(seconds_passed*conversion_factor) - try: - return self.polyline_elevations[elevation_index] - except IndexError: - try: - return self.polyline_elevations[-1] - except IndexError: - return uniform(max_nr_samples/2, max_nr_samples) + closest_sample = None + best_distance = float("inf") + for point in self._elevation_at_point.keys(): + local_distance = haversine.haversine(self._last_pos, point)*1000 + if local_distance < best_distance: + closest_sample = point + best_distance = local_distance + return self._elevation_at_point[closest_sample] def get_pos(self): - walked_distance = 0.0 - if not self.is_paused: - time_passed = time.time() - else: - time_passed = self._last_paused_timestamp - time_passed_distance = self.speed * abs(time_passed - self._timestamp - self._paused_total) - # check if there are any steps to take https://github.com/th3w4y/PokemonGo-Bot/issues/27 - if self.walk_steps(self.points): - steps_dict = {} - for step in self.walk_steps(self.points): - walked_distance += haversine.haversine(*step)*1000 - steps_dict[walked_distance] = step - for walked_end_step in sorted(steps_dict.keys()): - if walked_end_step >= time_passed_distance: - break - step_distance = haversine.haversine(*steps_dict[walked_end_step])*1000 - if walked_end_step >= time_passed_distance: - percentage_walked = (time_passed_distance - (walked_end_step - step_distance)) / step_distance + if self.speed > self.get_total_distance(): + self._last_pos = self.destination + self._last_step = len(self._step_keys)-1 + if self.get_last_pos() == self.destination: + return self.get_last_pos() + distance = self.speed + origin = Point(*self._last_pos) + ((so_lat, so_lng), (sd_lat, sd_lng)) = self._step_dict[self._step_keys[self._last_step]] + bearing = self._calc_bearing(so_lat, so_lng, sd_lat, sd_lng) + while haversine.haversine(self._last_pos, (sd_lat, sd_lng))*1000 < distance: + distance -= haversine.haversine(self._last_pos, (sd_lat, sd_lng))*1000 + self._last_pos = (sd_lat, sd_lng) + if self._last_step < len(self._step_keys)-1: + self._last_step += 1 + ((so_lat, so_lng), (sd_lat, sd_lng)) = self._step_dict[self._step_keys[self._last_step]] + bearing = self._calc_bearing(so_lat, so_lng, sd_lat, sd_lng) + origin = Point(so_lat, so_lng) + lat, lng = self._calc_next_pos(origin, distance, bearing) + if haversine.haversine(self._last_pos, (lat, lng))*1000 < distance: + distance -= haversine.haversine(self._last_pos, (lat, lng))*1000 + self._last_pos = (lat, lng) else: - percentage_walked = 1.0 - result = self.calculate_coord(percentage_walked, *steps_dict[walked_end_step]) - self._last_pos = tuple(result[0]) - return self._last_pos + return self.get_last_pos() else: - # otherwise return the destination https://github.com/th3w4y/PokemonGo-Bot/issues/27 - self._last_pos = tuple(self.points[-1]) - return self._last_pos + lat, lng = self._calc_next_pos(origin, distance, bearing) + self._last_pos = (lat, lng) + return self.get_last_pos() + + + def get_total_distance(self): + return math.ceil(sum([haversine.haversine(*x) * 1000 for x in self._get_walk_steps()])) + def get_last_pos(self): return self._last_pos - def calculate_coord(self, percentage, o, d): - # If this is the destination then returning as such when percentage complete = 1.0 - # Here there was a bug causing this to teleport when API quota was reached!!! - if self.points[-1] == d and percentage == 1.0 : - return [d] - else: - # intermediary points returned with 5 decimals precision only - # this ensures ~3-50cm ofset from the geometrical point calculated - lat = o[0]+ (d[0] -o[0]) * percentage - lon = o[1]+ (d[1] -o[1]) * percentage - return [(lat, lon)] - - def get_total_distance(self, points): - return ceil(sum([haversine.haversine(*x)*1000 for x in self.walk_steps(points)])) + def set_speed(self, speed): + self.speed = speed + + def _calc_next_pos(self, origin, distance, bearing): + lat, lng, _ = VincentyDistance(kilometers=distance * 1e-3).destination(origin, bearing) + return (lat, lng) + + def _calc_bearing(self, start_lat, start_lng, dest_lat, dest_lng): + """ + Calculates the bearing between two points. + + The formulae used is the following: + θ = atan2(sin(Δlong).cos(lat2), + cos(lat1).sin(lat2) − sin(lat1).cos(lat2).cos(Δlong)) + + :Parameters: + - `start_lat in decimal degrees + - `start_lng in decimal degrees + - `dest_lat in decimal degrees + - `dest_lng in decimal degrees + + :Returns: + The bearing in degrees + + :Returns Type: + float + """ + + lat1 = math.radians(start_lat) + lat2 = math.radians(dest_lat) + + diffLong = math.radians(dest_lng - start_lng) + + x = math.sin(diffLong) * math.cos(lat2) + y = math.cos(lat1) * math.sin(lat2) - (math.sin(lat1) + * math.cos(lat2) * math.cos(diffLong)) + + initial_bearing = math.atan2(x, y) + + # Now we have the initial bearing but math.atan2 return values + # from -180° to + 180° which is not what we want for a compass bearing + # The solution is to normalize the initial bearing as shown below + initial_bearing = math.degrees(initial_bearing) + compass_bearing = (initial_bearing + 360) % 360 + + return compass_bearing