diff --git a/CMakeLists.txt b/CMakeLists.txt index 3980de00477b..a6734e62ce14 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -111,7 +111,10 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES") list(APPEND ray_file_list "src/credis/redis/src/redis-server") endif() - if (DEFINED ENV{INCLUDE_UI} AND "$ENV{INCLUDE_UI}" STREQUAL "1") + # The goal of the if statement below is to require the catapult files to be + # present INCLUDE_UI=1 is set and to include the UI files if they are present. + # This should match the logic in build_ui.sh. + if (EXISTS "${CMAKE_BINARY_DIR}/src/catapult_files/index.html" OR "$ENV{INCLUDE_UI}" STREQUAL "1") list(APPEND ray_file_list "src/catapult_files/index.html") list(APPEND ray_file_list "src/catapult_files/trace_viewer_full.html") endif() diff --git a/doc/source/rllib-algorithms.rst b/doc/source/rllib-algorithms.rst index 8b64b04c23c1..66bf08a6c399 100644 --- a/doc/source/rllib-algorithms.rst +++ b/doc/source/rllib-algorithms.rst @@ -119,12 +119,12 @@ SpaceInvaders 692 ~600 :start-after: __sphinx_doc_begin__ :end-before: __sphinx_doc_end__ -Deep Deterministic Policy Gradients (DDPG) ------------------------------------------- +Deep Deterministic Policy Gradients (DDPG, TD3) +----------------------------------------------- `[paper] `__ `[implementation] `__ -DDPG is implemented similarly to DQN (below). The algorithm can be scaled by increasing the number of workers, switching to AsyncGradientsOptimizer, or using Ape-X. +DDPG is implemented similarly to DQN (below). The algorithm can be scaled by increasing the number of workers, switching to AsyncGradientsOptimizer, or using Ape-X. The improvements from `TD3 `__ are available though not enabled by default. -Tuned examples: `Pendulum-v0 `__, `MountainCarContinuous-v0 `__, `HalfCheetah-v2 `__ +Tuned examples: `Pendulum-v0 `__, `TD3 configuration `__, `MountainCarContinuous-v0 `__, `HalfCheetah-v2 `__ **DDPG-specific configs** (see also `common configs `__): diff --git a/doc/source/rllib-env.rst b/doc/source/rllib-env.rst index 81cf42d9d179..c1381f561cd4 100644 --- a/doc/source/rllib-env.rst +++ b/doc/source/rllib-env.rst @@ -15,7 +15,7 @@ PPO **Yes** **Yes** **Yes** **Yes** PG **Yes** **Yes** **Yes** **Yes** IMPALA **Yes** No **Yes** **Yes** DQN, Rainbow **Yes** No **Yes** No -DDPG No **Yes** **Yes** No +DDPG, TD3 No **Yes** **Yes** No APEX-DQN **Yes** No **Yes** No APEX-DDPG No **Yes** **Yes** No ES **Yes** **Yes** No No diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index e811913260be..23c69506e008 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -54,7 +54,7 @@ Algorithms - `Advantage Actor-Critic (A2C, A3C) `__ - - `Deep Deterministic Policy Gradients (DDPG) `__ + - `Deep Deterministic Policy Gradients (DDPG, TD3) `__ - `Deep Q Networks (DQN, Rainbow) `__ diff --git a/python/ray/WebUI.ipynb b/python/ray/WebUI.ipynb index 390263827e03..229366eba10b 100644 --- a/python/ray/WebUI.ipynb +++ b/python/ray/WebUI.ipynb @@ -1,150 +1,97 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Ray UI\n", - "\n", - "Start the UI with **Kernel -> Restart and Run All**." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "import ray\n", - "import ray.experimental.ui as ui\n", - "\n", - "ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Object search." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.object_search_bar()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Task search." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.task_search_bar()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Task trace timeline." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "To view arrows, go to View Options and select Flow Events." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.task_timeline()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Task durations." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.task_completion_time_distribution()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### CPU usage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.cpu_usage()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Cluster usage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ui.cluster_usage()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.6.1" - } - }, - "nbformat": 4, - "nbformat_minor": 2 + "cells": [{ + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Ray UI\n", "\n", + "Start the UI with **Kernel -> Restart and Run All**." + ] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", "import ray\n", + "import ray.experimental.ui as ui\n", "\n", + "ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])" + ] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Task trace timeline."] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To view arrows, go to View Options and select Flow Events." + ] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.task_timeline()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Object transfer timeline."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.object_transfer_timeline()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Task durations."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.task_completion_time_distribution()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### CPU usage."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.cpu_usage()"] + }, { + "cell_type": "markdown", + "metadata": {}, + "source": ["#### Cluster usage."] + }, { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": ["ui.cluster_usage()"] + }], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.1" + } + }, + "nbformat": 4, + "nbformat_minor": 2 } diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 80245881cf78..9c4a452ee268 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -491,8 +491,10 @@ def files_up_to_date(self, node_id): def recover_if_needed(self, node_id): if not self.can_update(node_id): return - last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip.get( - self.provider.internal_ip(node_id), 0) + key = self.provider.internal_ip(node_id) + if key not in self.load_metrics.last_heartbeat_time_by_ip: + self.load_metrics.last_heartbeat_time_by_ip[key] = time.time() + last_heartbeat_time = self.load_metrics.last_heartbeat_time_by_ip[key] delta = time.time() - last_heartbeat_time if delta < AUTOSCALER_HEARTBEAT_TIMEOUT_S: return diff --git a/python/ray/autoscaler/aws/config.py b/python/ray/autoscaler/aws/config.py index b6953c1492c4..62e0b25ee2e2 100644 --- a/python/ray/autoscaler/aws/config.py +++ b/python/ray/autoscaler/aws/config.py @@ -10,6 +10,7 @@ import boto3 from botocore.config import Config +import botocore from ray.ray_constants import BOTO_MAX_RETRIES @@ -272,7 +273,7 @@ def _get_role(role_name, config): try: role.load() return role - except Exception: + except botocore.errorfactory.NoSuchEntityException: return None @@ -282,7 +283,7 @@ def _get_instance_profile(profile_name, config): try: profile.load() return profile - except Exception: + except botocore.errorfactory.NoSuchEntityException: return None diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index c9d4ae8cee9f..d97cc274f76d 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -2,9 +2,7 @@ from __future__ import division from __future__ import print_function -import copy from collections import defaultdict -import heapq import json import redis import sys @@ -398,129 +396,6 @@ def log_files(self): return ip_filename_file - def task_profiles(self, num_tasks, start=None, end=None, fwd=True): - """Fetch and return a list of task profiles. - - Args: - num_tasks: A limit on the number of tasks that task_profiles will - return. - start: The start point of the time window that is queried for - tasks. - end: The end point in time of the time window that is queried for - tasks. - fwd: If True, means that zrange will be used. If False, zrevrange. - This argument is only meaningful in conjunction with the - num_tasks argument. This controls whether the tasks returned - are the most recent or the least recent. - - Returns: - A tuple of two elements. The first element is a dictionary mapping - the task ID of a task to a list of the profiling information - for all of the executions of that task. The second element is a - list of profiling information for tasks where the events have - no task ID. - """ - task_info = {} - event_log_sets = self.redis_client.keys("event_log*") - - # The heap is used to maintain the set of x tasks that occurred the - # most recently across all of the workers, where x is defined as the - # function parameter num. The key is the start time of the "get_task" - # component of each task. Calling heappop will result in the task with - # the earliest "get_task_start" to be removed from the heap. - heap = [] - heapq.heapify(heap) - heap_size = 0 - - # Set up a param dict to pass the redis command - params = {"withscores": True} - if start is not None: - params["min"] = start - elif end is not None: - params["min"] = 0 - - if end is not None: - params["max"] = end - elif start is not None: - params["max"] = time.time() - - if start is None and end is None: - params["end"] = num_tasks - 1 - else: - params["num"] = num_tasks - params["start"] = 0 - - # Parse through event logs to determine task start and end points. - for event_log_set in event_log_sets: - if start is None and end is None: - if fwd: - event_list = self.redis_client.zrange( - event_log_set, **params) - else: - event_list = self.redis_client.zrevrange( - event_log_set, **params) - else: - if fwd: - event_list = self.redis_client.zrangebyscore( - event_log_set, **params) - else: - event_list = self.redis_client.zrevrangebyscore( - event_log_set, **params) - - for (event, score) in event_list: - event_dict = json.loads(decode(event)) - task_id = "" - for event in event_dict: - if "task_id" in event[3]: - task_id = event[3]["task_id"] - task_info[task_id] = {} - task_info[task_id]["score"] = score - # Add task to (min/max) heap by its start point. - # if fwd, we want to delete the largest elements, so -score - heapq.heappush(heap, (-score if fwd else score, task_id)) - heap_size += 1 - - for event in event_dict: - if event[1] == "get_task" and event[2] == 1: - task_info[task_id]["get_task_start"] = event[0] - if event[1] == "get_task" and event[2] == 2: - task_info[task_id]["get_task_end"] = event[0] - if (event[1] == "register_remote_function" - and event[2] == 1): - task_info[task_id]["import_remote_start"] = event[0] - if (event[1] == "register_remote_function" - and event[2] == 2): - task_info[task_id]["import_remote_end"] = event[0] - if (event[1] == "task:deserialize_arguments" - and event[2] == 1): - task_info[task_id]["get_arguments_start"] = event[0] - if (event[1] == "task:deserialize_arguments" - and event[2] == 2): - task_info[task_id]["get_arguments_end"] = event[0] - if event[1] == "task:execute" and event[2] == 1: - task_info[task_id]["execute_start"] = event[0] - if event[1] == "task:execute" and event[2] == 2: - task_info[task_id]["execute_end"] = event[0] - if event[1] == "task:store_outputs" and event[2] == 1: - task_info[task_id]["store_outputs_start"] = event[0] - if event[1] == "task:store_outputs" and event[2] == 2: - task_info[task_id]["store_outputs_end"] = event[0] - if "worker_id" in event[3]: - task_info[task_id]["worker_id"] = event[3]["worker_id"] - if "function_name" in event[3]: - task_info[task_id]["function_name"] = ( - event[3]["function_name"]) - - if heap_size > num_tasks: - min_task, task_id_hex = heapq.heappop(heap) - del task_info[task_id_hex] - heap_size -= 1 - - for key, info in task_info.items(): - self._add_missing_timestamps(info) - - return task_info - def _profile_table(self, component_id): """Get the profile events for a given component. @@ -806,341 +681,6 @@ def chrome_tracing_object_transfer_dump(self, filename=None): else: return all_events - def dump_catapult_trace(self, - path, - task_info, - breakdowns=True, - task_dep=True, - obj_dep=True): - """Dump task profiling information to a file. - - This information can be viewed as a timeline of profiling information - by going to chrome://tracing in the chrome web browser and loading the - appropriate file. - - Args: - path: The filepath to dump the profiling information to. - task_info: The task info to use to generate the trace. Should be - the output of ray.global_state.task_profiles(). - breakdowns: Boolean indicating whether to break down the tasks into - more fine-grained segments. - task_dep: Boolean indicating whether or not task submission edges - should be included in the trace. - obj_dep: Boolean indicating whether or not object dependency edges - should be included in the trace. - """ - workers = self.workers() - - task_table = {} - # TODO(ekl) reduce the number of RPCs here with MGET - for task_id, _ in task_info.items(): - try: - # TODO (hme): do something to correct slider here, - # slider should be correct to begin with, though. - task_table[task_id] = self.task_table(task_id) - task_table[task_id]["TaskSpec"]["Args"] = [ - repr(arg) - for arg in task_table[task_id]["TaskSpec"]["Args"] - ] - except Exception: - print("Could not find task {}".format(task_id)) - - # filter out tasks not in task_table - task_info = {k: v for k, v in task_info.items() if k in task_table} - - start_time = None - for info in task_info.values(): - task_start = min(self._get_times(info)) - if not start_time or task_start < start_time: - start_time = task_start - - def micros(ts): - return int(1e6 * ts) - - def micros_rel(ts): - return micros(ts - start_time) - - seen_obj = {} - - full_trace = [] - for task_id, info in task_info.items(): - worker = workers[info["worker_id"]] - task_t_info = task_table[task_id] - - # The total_info dictionary is what is displayed when selecting a - # task in the timeline. We copy the task spec so that we don't - # modify it in place since we will use the original values later. - total_info = copy.copy(task_table[task_id]["TaskSpec"]) - total_info["Args"] = [ - oid.hex() if isinstance(oid, ray.ObjectID) else oid - for oid in task_t_info["TaskSpec"]["Args"] - ] - total_info["ReturnObjectIDs"] = [ - oid.hex() for oid in task_t_info["TaskSpec"]["ReturnObjectIDs"] - ] - total_info["LocalSchedulerID"] = task_t_info["LocalSchedulerID"] - total_info["get_arguments"] = ( - info["get_arguments_end"] - info["get_arguments_start"]) - total_info["execute"] = ( - info["execute_end"] - info["execute_start"]) - total_info["store_outputs"] = ( - info["store_outputs_end"] - info["store_outputs_start"]) - total_info["function_name"] = info["function_name"] - total_info["worker_id"] = info["worker_id"] - - parent_info = task_info.get( - task_table[task_id]["TaskSpec"]["ParentTaskID"]) - worker = workers[info["worker_id"]] - # The catapult trace format documentation can be found here: - # https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview # noqa: E501 - if breakdowns: - if "get_arguments_end" in info: - get_args_trace = { - "cat": "get_arguments", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["get_arguments_start"]), - "ph": "X", - "name": info["function_name"] + ":get_arguments", - "args": total_info, - "dur": micros(info["get_arguments_end"] - - info["get_arguments_start"]), - "cname": "rail_idle" - } - full_trace.append(get_args_trace) - - if "store_outputs_end" in info: - outputs_trace = { - "cat": "store_outputs", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["store_outputs_start"]), - "ph": "X", - "name": info["function_name"] + ":store_outputs", - "args": total_info, - "dur": micros(info["store_outputs_end"] - - info["store_outputs_start"]), - "cname": "thread_state_runnable" - } - full_trace.append(outputs_trace) - - if "execute_end" in info: - execute_trace = { - "cat": "execute", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["execute_start"]), - "ph": "X", - "name": info["function_name"] + ":execute", - "args": total_info, - "dur": micros(info["execute_end"] - - info["execute_start"]), - "cname": "rail_animation" - } - full_trace.append(execute_trace) - - else: - if parent_info: - parent_worker = workers[parent_info["worker_id"]] - parent_times = self._get_times(parent_info) - parent_profile = task_info.get( - task_table[task_id]["TaskSpec"]["ParentTaskID"]) - - _parent_id = parent_info["worker_id"] + str( - micros(min(parent_times))) - - parent = { - "cat": "submit_task", - "pid": "Node " + parent_worker["node_ip_address"], - "tid": parent_info["worker_id"], - "ts": micros_rel( - parent_profile - and parent_profile["get_arguments_start"] - or start_time), - "ph": "s", - "name": "SubmitTask", - "args": {}, - "id": _parent_id, - } - full_trace.append(parent) - - _id = info["worker_id"] + str(micros(min(parent_times))) - - task_trace = { - "cat": "submit_task", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]), - "ph": "f", - "name": "SubmitTask", - "args": {}, - "id": _id, - "bp": "e", - "cname": "olive" - } - full_trace.append(task_trace) - - task = { - "cat": "task", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "id": task_id, - "ts": micros_rel(info["get_arguments_start"]), - "ph": "X", - "name": info["function_name"], - "args": total_info, - "dur": micros(info["store_outputs_end"] - - info["get_arguments_start"]), - "cname": "thread_state_runnable" - } - full_trace.append(task) - - if task_dep: - if parent_info: - parent_worker = workers[parent_info["worker_id"]] - parent_times = self._get_times(parent_info) - parent_profile = task_info.get( - task_table[task_id]["TaskSpec"]["ParentTaskID"]) - - _parent_id = parent_info["worker_id"] + str( - micros(min(parent_times))) - - parent = { - "cat": "submit_task", - "pid": "Node " + parent_worker["node_ip_address"], - "tid": parent_info["worker_id"], - "ts": micros_rel( - parent_profile - and parent_profile["get_arguments_start"] - or start_time), - "ph": "s", - "name": "SubmitTask", - "args": {}, - "id": _parent_id, - } - full_trace.append(parent) - - _id = info["worker_id"] + str(micros(min(parent_times))) - - task_trace = { - "cat": "submit_task", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]), - "ph": "f", - "name": "SubmitTask", - "args": {}, - "id": _id, - "bp": "e" - } - full_trace.append(task_trace) - - if obj_dep: - args = task_table[task_id]["TaskSpec"]["Args"] - for arg in args: - # Don't visualize arguments that are not object IDs. - if isinstance(arg, ray.ObjectID): - object_info = self._object_table(arg) - # Don't visualize objects that were created by calls to - # put. - if not object_info["IsPut"]: - if arg not in seen_obj: - seen_obj[arg] = 0 - seen_obj[arg] += 1 - owner_task = self._object_table(arg)["TaskID"] - if owner_task in task_info: - owner_worker = (workers[task_info[owner_task][ - "worker_id"]]) - # Adding/subtracting 2 to the time associated - # with the beginning/ending of the flow event - # is necessary to make the flow events show up - # reliably. When these times are exact, this is - # presumably an edge case, and catapult doesn't - # recognize that there is a duration event at - # that exact point in time that the flow event - # should be bound to. This issue is solved by - # adding the 2 ms to the start/end time of the - # flow event, which guarantees overlap with the - # duration event that it's associated with, and - # the flow event therefore always gets drawn. - owner = { - "cat": "obj_dependency", - "pid": ("Node " + - owner_worker["node_ip_address"]), - "tid": task_info[owner_task]["worker_id"], - "ts": micros_rel(task_info[owner_task] - ["store_outputs_end"]) - - 2, - "ph": "s", - "name": "ObjectDependency", - "args": {}, - "bp": "e", - "cname": "cq_build_attempt_failed", - "id": "obj" + str(arg) + str(seen_obj[arg]) - } - full_trace.append(owner) - - dependent = { - "cat": "obj_dependency", - "pid": "Node " + worker["node_ip_address"], - "tid": info["worker_id"], - "ts": micros_rel(info["get_arguments_start"]) + - 2, - "ph": "f", - "name": "ObjectDependency", - "args": {}, - "cname": "cq_build_attempt_failed", - "bp": "e", - "id": "obj" + str(arg) + str(seen_obj[arg]) - } - full_trace.append(dependent) - - print("Creating JSON {}/{}".format(len(full_trace), len(task_info))) - with open(path, "w") as outfile: - json.dump(full_trace, outfile) - - def _get_times(self, data): - """Extract the numerical times from a task profile. - - This is a helper method for dump_catapult_trace. - - Args: - data: This must be a value in the dictionary returned by the - task_profiles function. - """ - all_times = [] - all_times.append(data["acquire_lock_start"]) - all_times.append(data["acquire_lock_end"]) - all_times.append(data["get_arguments_start"]) - all_times.append(data["get_arguments_end"]) - all_times.append(data["execute_start"]) - all_times.append(data["execute_end"]) - all_times.append(data["store_outputs_start"]) - all_times.append(data["store_outputs_end"]) - return all_times - - def _add_missing_timestamps(self, info): - """Fills in any missing timestamp values in a task info. - - Task timestamps may be missing if the task fails or is partially - executed. - """ - - keys = [ - "acquire_lock_start", "acquire_lock_end", "get_arguments_start", - "get_arguments_end", "execute_start", "execute_end", - "store_outputs_start", "store_outputs_end" - ] - - latest_timestamp = 0 - for key in keys: - cur = info.get(key, latest_timestamp) - info[key] = cur - latest_timestamp = cur - def workers(self): """Get a dictionary mapping worker ID to worker information.""" worker_keys = self.redis_client.keys("Worker*") @@ -1151,8 +691,6 @@ def workers(self): worker_id = binary_to_hex(worker_key[len("Workers:"):]) workers_data[worker_id] = { - "local_scheduler_socket": (decode( - worker_info[b"local_scheduler_socket"])), "node_ip_address": decode(worker_info[b"node_ip_address"]), "plasma_store_socket": decode( worker_info[b"plasma_store_socket"]) diff --git a/python/ray/experimental/ui.py b/python/ray/experimental/ui.py index da4ee9e57c83..15a6fd05f839 100644 --- a/python/ray/experimental/ui.py +++ b/python/ray/experimental/ui.py @@ -1,20 +1,23 @@ -import ipywidgets as widgets +import logging import numpy as np import os import pprint -import ray import shutil import tempfile import time +import ipywidgets as widgets from IPython.display import display, IFrame, clear_output +import ray + +logger = logging.getLogger(__name__) + + # Instances of this class maintains keep track of whether or not a # callback is currently executing. Since the execution of the callback # may trigger more calls to the callback, this is used to prevent infinite # recursions. - - class _EventRecursionContextManager(object): def __init__(self): self.should_recurse = True @@ -185,36 +188,6 @@ def update_wrapper(event): range_slider.value = (100 + int( 100 * float(num_tasks_box.value) / num_tasks), 100) - if not update: - return - - diff = largest - smallest - - # Low and high are used to scale the times that are - # queried to be relative to the absolute time. - low, high = map(lambda x: x / 100., range_slider.value) - - # Queries to task_profiles based on the slider and text - # box values. - # (Querying based on the % total amount of time.) - if breakdown_opt.value == total_time_value: - tasks = _truncated_task_profiles( - start=(smallest + diff * low), - end=(smallest + diff * high)) - - # (Querying based on % of total number of tasks that were - # run.) - elif breakdown_opt.value == total_tasks_value: - if range_slider.value[0] == 0: - tasks = _truncated_task_profiles( - num_tasks=(int(num_tasks * high)), fwd=True) - else: - tasks = _truncated_task_profiles( - num_tasks=(int(num_tasks * (high - low))), - fwd=False) - - update(smallest, largest, num_tasks, tasks) - # Get updated values from a slider or text box, and update the rest of # them accordingly. range_slider.observe(update_wrapper, names="value") @@ -268,20 +241,6 @@ def handle_submit(sender): MAX_TASKS_TO_VISUALIZE = 10000 -# Wrapper that enforces a limit on the number of tasks to visualize -def _truncated_task_profiles(start=None, end=None, num_tasks=None, fwd=True): - if num_tasks is None: - num_tasks = MAX_TASKS_TO_VISUALIZE - print("Warning: at most {} tasks will be fetched within this " - "time range.".format(MAX_TASKS_TO_VISUALIZE)) - elif num_tasks > MAX_TASKS_TO_VISUALIZE: - print("Warning: too many tasks to visualize, " - "fetching only the first {} of {}.".format( - MAX_TASKS_TO_VISUALIZE, num_tasks)) - num_tasks = MAX_TASKS_TO_VISUALIZE - return ray.global_state.task_profiles(num_tasks, start, end, fwd) - - # Helper function that guarantees unique and writeable temp files. # Prevents clashes in task trace files when multiple notebooks are running. def _get_temp_file_path(**kwargs): @@ -293,32 +252,43 @@ def _get_temp_file_path(**kwargs): def task_timeline(): - path_input = widgets.Button(description="View task timeline") + # Check that the trace viewer renderer file is present, and copy it to the + # current working directory if it is not present. + if not os.path.exists("trace_viewer_full.html"): + shutil.copy( + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../core/src/catapult_files/trace_viewer_full.html"), + "trace_viewer_full.html") - breakdown_basic = "Basic" - breakdown_task = "Task Breakdowns" + trace_viewer_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../core/src/catapult_files/index.html") - breakdown_opt = widgets.Dropdown( - options=["Basic", "Task Breakdowns"], - value="Task Breakdowns", - disabled=False, - ) - obj_dep = widgets.Checkbox( - value=True, disabled=False, layout=widgets.Layout(width='20px')) - task_dep = widgets.Checkbox( - value=True, disabled=False, layout=widgets.Layout(width='20px')) - # Labels to bypass width limitation for descriptions. - label_tasks = widgets.Label( - value='Task submissions', layout=widgets.Layout(width='110px')) - label_objects = widgets.Label( - value='Object dependencies', layout=widgets.Layout(width='130px')) - label_options = widgets.Label( - value='View options:', layout=widgets.Layout(width='100px')) - start_box, end_box, range_slider, time_opt = get_sliders(False) - display(widgets.HBox([task_dep, label_tasks, obj_dep, label_objects])) - display(widgets.HBox([label_options, breakdown_opt])) - display(path_input) + html_file_path = _get_temp_file_path(suffix=".html") + json_file_path = _get_temp_file_path(suffix=".json") + + ray.global_state.chrome_tracing_dump(filename=json_file_path) + + with open(trace_viewer_path) as f: + data = f.read() + + # Replace the demo data path with our own + # https://github.com/catapult-project/catapult/blob/ + # 33a9271eb3cf5caf925293ec6a4b47c94f1ac968/tracing/bin/index.html#L107 + data = data.replace("../test_data/big_trace.json", json_file_path) + + with open(html_file_path, "w+") as f: + f.write(data) + # Display the task trace within the Jupyter notebook + clear_output(wait=True) + logger.info("To view fullscreen, open chrome://tracing in Google Chrome " + "and load `{}`".format(os.path.abspath(json_file_path))) + display(IFrame(html_file_path, 900, 800)) + + +def object_transfer_timeline(): # Check that the trace viewer renderer file is present, and copy it to the # current working directory if it is not present. if not os.path.exists("trace_viewer_full.html"): @@ -328,76 +298,32 @@ def task_timeline(): "../core/src/catapult_files/trace_viewer_full.html"), "trace_viewer_full.html") - def handle_submit(sender): - json_tmp = tempfile.mktemp() + ".json" - - # Determine whether task components should be displayed or not. - if breakdown_opt.value == breakdown_basic: - breakdown = False - elif breakdown_opt.value == breakdown_task: - breakdown = True - else: - raise ValueError("Unexpected breakdown value '{}'".format( - breakdown_opt.value)) - - low, high = map(lambda x: x / 100., range_slider.value) - - smallest, largest, num_tasks = ray.global_state._job_length() - diff = largest - smallest - - if time_opt.value == total_time_value: - tasks = _truncated_task_profiles( - start=smallest + diff * low, end=smallest + diff * high) - elif time_opt.value == total_tasks_value: - if range_slider.value[0] == 0: - tasks = _truncated_task_profiles( - num_tasks=int(num_tasks * high), fwd=True) - else: - tasks = _truncated_task_profiles( - num_tasks=int(num_tasks * (high - low)), fwd=False) - else: - raise ValueError("Unexpected time value '{}'".format( - time_opt.value)) - # Write trace to a JSON file - print("Collected profiles for {} tasks.".format(len(tasks))) - print("Dumping task profile data to {}, " - "this might take a while...".format(json_tmp)) - ray.global_state.dump_catapult_trace( - json_tmp, - tasks, - breakdowns=breakdown, - obj_dep=obj_dep.value, - task_dep=task_dep.value) - print("Opening html file in browser...") - - trace_viewer_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "../core/src/catapult_files/index.html") - - html_file_path = _get_temp_file_path(suffix=".html") - json_file_path = _get_temp_file_path(suffix=".json") - - print("Pointing to {} named {}".format(json_tmp, json_file_path)) - shutil.copy(json_tmp, json_file_path) - - with open(trace_viewer_path) as f: - data = f.read() - - # Replace the demo data path with our own - # https://github.com/catapult-project/catapult/blob/ - # 33a9271eb3cf5caf925293ec6a4b47c94f1ac968/tracing/bin/index.html#L107 - data = data.replace("../test_data/big_trace.json", json_file_path) - - with open(html_file_path, "w+") as f: - f.write(data) - - # Display the task trace within the Jupyter notebook - clear_output(wait=True) - print("To view fullscreen, open chrome://tracing in Google Chrome " - "and load `{}`".format(json_tmp)) - display(IFrame(html_file_path, 900, 800)) - - path_input.on_click(handle_submit) + trace_viewer_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../core/src/catapult_files/index.html") + + html_file_path = _get_temp_file_path(suffix=".html") + json_file_path = _get_temp_file_path(suffix=".json") + + ray.global_state.chrome_tracing_object_transfer_dump( + filename=json_file_path) + + with open(trace_viewer_path) as f: + data = f.read() + + # Replace the demo data path with our own + # https://github.com/catapult-project/catapult/blob/ + # 33a9271eb3cf5caf925293ec6a4b47c94f1ac968/tracing/bin/index.html#L107 + data = data.replace("../test_data/big_trace.json", json_file_path) + + with open(html_file_path, "w+") as f: + f.write(data) + + # Display the task trace within the Jupyter notebook + clear_output(wait=True) + logger.info("To view fullscreen, open chrome://tracing in Google Chrome " + "and load `{}`".format(os.path.abspath(json_file_path))) + display(IFrame(html_file_path, 900, 800)) def task_completion_time_distribution(): @@ -562,12 +488,7 @@ def cpu_usage(): output_notebook(resources=CDN) # Parse the client table to determine how many CPUs are available - num_cpus = 0 - client_table = ray.global_state.client_table() - for node_ip, client_list in client_table.items(): - for client in client_list: - if "CPU" in client: - num_cpus += client["CPU"] + num_cpus = ray.global_state.cluster_resources()["CPU"] # Update the plot based on the sliders def plot_utilization(): diff --git a/python/ray/import_thread.py b/python/ray/import_thread.py index 85fe0b89dfe1..70dba322370b 100644 --- a/python/ray/import_thread.py +++ b/python/ray/import_thread.py @@ -110,7 +110,7 @@ def fetch_and_execute_function_to_run(self, key): run_on_other_drivers) = self.redis_client.hmget( key, ["driver_id", "function", "run_on_other_drivers"]) - if (run_on_other_drivers == "False" + if (utils.decode(run_on_other_drivers) == "False" and self.worker.mode == ray.SCRIPT_MODE and driver_id != self.worker.task_driver_id.id()): return diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 777ff3955a5c..f0d9510756b9 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -331,7 +331,7 @@ def _setup(self, config): self.env_creator = lambda env_config: None # Merge the supplied config with the class default - merged_config = self._default_config.copy() + merged_config = copy.deepcopy(self._default_config) merged_config = deep_update(merged_config, config, self._allow_unknown_configs, self._allow_unknown_subkeys) diff --git a/python/ray/rllib/agents/ddpg/ddpg_policy_graph.py b/python/ray/rllib/agents/ddpg/ddpg_policy_graph.py index 60e6e30847c5..eb5f14c2d1c9 100644 --- a/python/ray/rllib/agents/ddpg/ddpg_policy_graph.py +++ b/python/ray/rllib/agents/ddpg/ddpg_policy_graph.py @@ -89,8 +89,10 @@ def __init__(self, exploration_value = tf.assign_add( exploration_sample, theta * (.0 - exploration_sample) + sigma * normal_sample) - stochastic_actions = deterministic_actions + eps * ( - high_action - low_action) * exploration_value + stochastic_actions = tf.clip_by_value( + deterministic_actions + + eps * (high_action - low_action) * exploration_value, + low_action, high_action) self.actions = tf.cond(stochastic, lambda: stochastic_actions, lambda: deterministic_actions) diff --git a/python/ray/rllib/agents/pg/pg_policy_graph.py b/python/ray/rllib/agents/pg/pg_policy_graph.py index 3f3c7c7aea9a..2a342c117fb3 100644 --- a/python/ray/rllib/agents/pg/pg_policy_graph.py +++ b/python/ray/rllib/agents/pg/pg_policy_graph.py @@ -79,7 +79,7 @@ def postprocess_trajectory(self, sample_batch, other_agent_batches=None, episode=None): - # This ads the "advantages" column to the sample batch + # This adds the "advantages" column to the sample batch return compute_advantages( sample_batch, 0.0, self.config["gamma"], use_gae=False) diff --git a/python/ray/rllib/evaluation/sampler.py b/python/ray/rllib/evaluation/sampler.py index b7c07f8f251d..2c6411f33510 100644 --- a/python/ray/rllib/evaluation/sampler.py +++ b/python/ray/rllib/evaluation/sampler.py @@ -262,182 +262,240 @@ def new_episode(): unfiltered_obs, rewards, dones, infos, off_policy_actions = \ async_vector_env.poll() - # Map of policy_id to list of PolicyEvalData - to_eval = defaultdict(list) + # Process observations and prepare for policy evaluation + active_envs, to_eval, outputs = _process_observations( + async_vector_env, policies, batch_builder_pool, active_episodes, + unfiltered_obs, rewards, dones, infos, off_policy_actions, horizon, + obs_filters, unroll_length, pack, callbacks) + for o in outputs: + yield o + + # Do batched policy eval + eval_results = _do_policy_eval(tf_sess, to_eval, policies, + active_episodes) + + # Process results and update episode state + actions_to_send = _process_policy_eval_results( + to_eval, eval_results, active_episodes, active_envs, + off_policy_actions) - # Map of env_id -> agent_id -> action replies - actions_to_send = defaultdict(dict) + # Return computed actions to ready envs. We also send to envs that have + # taken off-policy actions; those envs are free to ignore the action. + async_vector_env.send_actions(actions_to_send) - # For each environment - for env_id, agent_obs in unfiltered_obs.items(): - new_episode = env_id not in active_episodes - episode = active_episodes[env_id] - if not new_episode: - episode.length += 1 - episode.batch_builder.count += 1 - episode._add_agent_rewards(rewards[env_id]) - - # Check episode termination conditions - if dones[env_id]["__all__"] or episode.length >= horizon: - all_done = True - atari_metrics = _fetch_atari_metrics(async_vector_env) - if atari_metrics is not None: - for m in atari_metrics: - yield m._replace(custom_metrics=episode.custom_metrics) - else: - yield RolloutMetrics(episode.length, episode.total_reward, - dict(episode.agent_rewards), - episode.custom_metrics) + +def _process_observations(async_vector_env, policies, batch_builder_pool, + active_episodes, unfiltered_obs, rewards, dones, + infos, off_policy_actions, horizon, obs_filters, + unroll_length, pack, callbacks): + """Record new data from the environment and prepare for policy evaluation. + + Returns: + active_envs: set of non-terminated env ids + to_eval: map of policy_id to list of agent PolicyEvalData + outputs: list of metrics and samples to return from the sampler + """ + + active_envs = set() + to_eval = defaultdict(list) + outputs = [] + + # For each environment + for env_id, agent_obs in unfiltered_obs.items(): + new_episode = env_id not in active_episodes + episode = active_episodes[env_id] + if not new_episode: + episode.length += 1 + episode.batch_builder.count += 1 + episode._add_agent_rewards(rewards[env_id]) + + # Check episode termination conditions + if dones[env_id]["__all__"] or episode.length >= horizon: + all_done = True + atari_metrics = _fetch_atari_metrics(async_vector_env) + if atari_metrics is not None: + for m in atari_metrics: + outputs.append( + m._replace(custom_metrics=episode.custom_metrics)) else: - all_done = False - # At least send an empty dict if not done - actions_to_send[env_id] = {} - - # For each agent in the environment - for agent_id, raw_obs in agent_obs.items(): - policy_id = episode.policy_for(agent_id) - filtered_obs = _get_or_raise(obs_filters, policy_id)(raw_obs) - agent_done = bool(all_done or dones[env_id].get(agent_id)) - if not agent_done: - to_eval[policy_id].append( - PolicyEvalData(env_id, agent_id, filtered_obs, - episode.rnn_state_for(agent_id), - episode.last_action_for(agent_id), - rewards[env_id][agent_id] or 0.0)) - - last_observation = episode.last_observation_for(agent_id) - episode._set_last_observation(agent_id, filtered_obs) - - # Record transition info if applicable - if last_observation is not None and \ - infos[env_id][agent_id].get("training_enabled", True): - episode.batch_builder.add_values( - agent_id, - policy_id, - t=episode.length - 1, - eps_id=episode.episode_id, - agent_index=episode._agent_index(agent_id), - obs=last_observation, - actions=episode.last_action_for(agent_id), - rewards=rewards[env_id][agent_id], - prev_actions=episode.prev_action_for(agent_id), - prev_rewards=episode.prev_reward_for(agent_id), - dones=agent_done, - infos=infos[env_id][agent_id], - new_obs=filtered_obs, - **episode.last_pi_info_for(agent_id)) - - # Invoke the step callback after the step is logged to the episode - if callbacks.get("on_episode_step"): - callbacks["on_episode_step"]({ + outputs.append( + RolloutMetrics(episode.length, episode.total_reward, + dict(episode.agent_rewards), + episode.custom_metrics)) + else: + all_done = False + active_envs.add(env_id) + + # For each agent in the environment + for agent_id, raw_obs in agent_obs.items(): + policy_id = episode.policy_for(agent_id) + filtered_obs = _get_or_raise(obs_filters, policy_id)(raw_obs) + agent_done = bool(all_done or dones[env_id].get(agent_id)) + if not agent_done: + to_eval[policy_id].append( + PolicyEvalData(env_id, agent_id, filtered_obs, + episode.rnn_state_for(agent_id), + episode.last_action_for(agent_id), + rewards[env_id][agent_id] or 0.0)) + + last_observation = episode.last_observation_for(agent_id) + episode._set_last_observation(agent_id, filtered_obs) + + # Record transition info if applicable + if last_observation is not None and \ + infos[env_id][agent_id].get("training_enabled", True): + episode.batch_builder.add_values( + agent_id, + policy_id, + t=episode.length - 1, + eps_id=episode.episode_id, + agent_index=episode._agent_index(agent_id), + obs=last_observation, + actions=episode.last_action_for(agent_id), + rewards=rewards[env_id][agent_id], + prev_actions=episode.prev_action_for(agent_id), + prev_rewards=episode.prev_reward_for(agent_id), + dones=agent_done, + infos=infos[env_id][agent_id], + new_obs=filtered_obs, + **episode.last_pi_info_for(agent_id)) + + # Invoke the step callback after the step is logged to the episode + if callbacks.get("on_episode_step"): + callbacks["on_episode_step"]({ + "env": async_vector_env, + "episode": episode + }) + + # Cut the batch if we're not packing multiple episodes into one, + # or if we've exceeded the requested batch size. + if episode.batch_builder.has_pending_data(): + if (all_done and not pack) or \ + episode.batch_builder.count >= unroll_length: + outputs.append(episode.batch_builder.build_and_reset(episode)) + elif all_done: + # Make sure postprocessor stays within one episode + episode.batch_builder.postprocess_batch_so_far(episode) + + if all_done: + # Handle episode termination + batch_builder_pool.append(episode.batch_builder) + if callbacks.get("on_episode_end"): + callbacks["on_episode_end"]({ "env": async_vector_env, "episode": episode }) + del active_episodes[env_id] + resetted_obs = async_vector_env.try_reset(env_id) + if resetted_obs is None: + # Reset not supported, drop this env from the ready list + if horizon != float("inf"): + raise ValueError( + "Setting episode horizon requires reset() support " + "from the environment.") + else: + # Creates a new episode + episode = active_episodes[env_id] + for agent_id, raw_obs in resetted_obs.items(): + policy_id = episode.policy_for(agent_id) + policy = _get_or_raise(policies, policy_id) + filtered_obs = _get_or_raise(obs_filters, + policy_id)(raw_obs) + episode._set_last_observation(agent_id, filtered_obs) + to_eval[policy_id].append( + PolicyEvalData( + env_id, agent_id, filtered_obs, + episode.rnn_state_for(agent_id), + np.zeros_like( + _flatten_action(policy.action_space.sample())), + 0.0)) - # Cut the batch if we're not packing multiple episodes into one, - # or if we've exceeded the requested batch size. - if episode.batch_builder.has_pending_data(): - if (all_done and not pack) or \ - episode.batch_builder.count >= unroll_length: - yield episode.batch_builder.build_and_reset(episode) - elif all_done: - # Make sure postprocessor stays within one episode - episode.batch_builder.postprocess_batch_so_far(episode) - - if all_done: - # Handle episode termination - batch_builder_pool.append(episode.batch_builder) - if callbacks.get("on_episode_end"): - callbacks["on_episode_end"]({ - "env": async_vector_env, - "episode": episode - }) - del active_episodes[env_id] - resetted_obs = async_vector_env.try_reset(env_id) - if resetted_obs is None: - # Reset not supported, drop this env from the ready list - assert horizon == float("inf"), \ - "Setting episode horizon requires reset() support." - else: - # Creates a new episode - episode = active_episodes[env_id] - for agent_id, raw_obs in resetted_obs.items(): - policy_id = episode.policy_for(agent_id) - policy = _get_or_raise(policies, policy_id) - filtered_obs = _get_or_raise(obs_filters, - policy_id)(raw_obs) - episode._set_last_observation(agent_id, filtered_obs) - to_eval[policy_id].append( - PolicyEvalData( - env_id, agent_id, filtered_obs, - episode.rnn_state_for(agent_id), - np.zeros_like( - _flatten_action( - policy.action_space.sample())), 0.0)) - - # Batch eval policy actions if possible - if tf_sess: - builder = TFRunBuilder(tf_sess, "policy_eval") - pending_fetches = {} + return active_envs, to_eval, outputs + + +def _do_policy_eval(tf_sess, to_eval, policies, active_episodes): + """Call compute actions on observation batches to get next actions. + + Returns: + eval_results: dict of policy to compute_action() outputs. + """ + + eval_results = {} + + if tf_sess: + builder = TFRunBuilder(tf_sess, "policy_eval") + pending_fetches = {} + else: + builder = None + for policy_id, eval_data in to_eval.items(): + rnn_in_cols = _to_column_format([t.rnn_state for t in eval_data]) + policy = _get_or_raise(policies, policy_id) + if builder and (policy.compute_actions.__code__ is + TFPolicyGraph.compute_actions.__code__): + pending_fetches[policy_id] = policy.build_compute_actions( + builder, [t.obs for t in eval_data], + rnn_in_cols, + prev_action_batch=[t.prev_action for t in eval_data], + prev_reward_batch=[t.prev_reward for t in eval_data]) else: - builder = None - eval_results = {} - rnn_in_cols = {} - for policy_id, eval_data in to_eval.items(): - rnn_in = _to_column_format([t.rnn_state for t in eval_data]) - rnn_in_cols[policy_id] = rnn_in - policy = _get_or_raise(policies, policy_id) - if builder and (policy.compute_actions.__code__ is - TFPolicyGraph.compute_actions.__code__): - pending_fetches[policy_id] = policy.build_compute_actions( - builder, [t.obs for t in eval_data], - rnn_in, - prev_action_batch=[t.prev_action for t in eval_data], - prev_reward_batch=[t.prev_reward for t in eval_data]) + eval_results[policy_id] = policy.compute_actions( + [t.obs for t in eval_data], + rnn_in_cols, + prev_action_batch=[t.prev_action for t in eval_data], + prev_reward_batch=[t.prev_reward for t in eval_data], + episodes=[active_episodes[t.env_id] for t in eval_data]) + if builder: + for k, v in pending_fetches.items(): + eval_results[k] = builder.get(v) + + return eval_results + + +def _process_policy_eval_results(to_eval, eval_results, active_episodes, + active_envs, off_policy_actions): + """Process the output of policy neural network evaluation. + + Records policy evaluation results into the given episode objects and + returns replies to send back to agents in the env. + + Returns: + actions_to_send: nested dict of env id -> agent id -> agent replies. + """ + + actions_to_send = defaultdict(dict) + for env_id in active_envs: + actions_to_send[env_id] = {} # at minimum send empty dict + + for policy_id, eval_data in to_eval.items(): + rnn_in_cols = _to_column_format([t.rnn_state for t in eval_data]) + actions, rnn_out_cols, pi_info_cols = eval_results[policy_id] + if len(rnn_in_cols) != len(rnn_out_cols): + raise ValueError("Length of RNN in did not match RNN out, got: " + "{} vs {}".format(rnn_in_cols, rnn_out_cols)) + # Add RNN state info + for f_i, column in enumerate(rnn_in_cols): + pi_info_cols["state_in_{}".format(f_i)] = column + for f_i, column in enumerate(rnn_out_cols): + pi_info_cols["state_out_{}".format(f_i)] = column + # Save output rows + actions = _unbatch_tuple_actions(actions) + for i, action in enumerate(actions): + env_id = eval_data[i].env_id + agent_id = eval_data[i].agent_id + actions_to_send[env_id][agent_id] = action + episode = active_episodes[env_id] + episode._set_rnn_state(agent_id, [c[i] for c in rnn_out_cols]) + episode._set_last_pi_info( + agent_id, {k: v[i] + for k, v in pi_info_cols.items()}) + if env_id in off_policy_actions and \ + agent_id in off_policy_actions[env_id]: + episode._set_last_action(agent_id, + off_policy_actions[env_id][agent_id]) else: - eval_results[policy_id] = policy.compute_actions( - [t.obs for t in eval_data], - rnn_in, - prev_action_batch=[t.prev_action for t in eval_data], - prev_reward_batch=[t.prev_reward for t in eval_data], - episodes=[active_episodes[t.env_id] for t in eval_data]) - if builder: - for k, v in pending_fetches.items(): - eval_results[k] = builder.get(v) - - # Record the policy eval results - for policy_id, eval_data in to_eval.items(): - actions, rnn_out_cols, pi_info_cols = eval_results[policy_id] - if len(rnn_in_cols[policy_id]) != len(rnn_out_cols): - raise ValueError( - "Length of RNN in did not match RNN out, got: " - "{} vs {}".format(rnn_in_cols[policy_id], rnn_out_cols)) - # Add RNN state info - for f_i, column in enumerate(rnn_in_cols[policy_id]): - pi_info_cols["state_in_{}".format(f_i)] = column - for f_i, column in enumerate(rnn_out_cols): - pi_info_cols["state_out_{}".format(f_i)] = column - # Save output rows - actions = _unbatch_tuple_actions(actions) - for i, action in enumerate(actions): - env_id = eval_data[i].env_id - agent_id = eval_data[i].agent_id - actions_to_send[env_id][agent_id] = action - episode = active_episodes[env_id] - episode._set_rnn_state(agent_id, [c[i] for c in rnn_out_cols]) - episode._set_last_pi_info( - agent_id, {k: v[i] - for k, v in pi_info_cols.items()}) - if env_id in off_policy_actions and \ - agent_id in off_policy_actions[env_id]: - episode._set_last_action( - agent_id, off_policy_actions[env_id][agent_id]) - else: - episode._set_last_action(agent_id, action) + episode._set_last_action(agent_id, action) - # Return computed actions to ready envs. We also send to envs that have - # taken off-policy actions; those envs are free to ignore the action. - async_vector_env.send_actions(dict(actions_to_send)) + return actions_to_send def _fetch_atari_metrics(async_vector_env): diff --git a/python/ray/rllib/models/action_dist.py b/python/ray/rllib/models/action_dist.py index 91b8d2fce21d..75a43deeb789 100644 --- a/python/ray/rllib/models/action_dist.py +++ b/python/ray/rllib/models/action_dist.py @@ -102,9 +102,9 @@ def __init__(self, inputs, low=None, high=None): self.low = low self.high = high - # Squash to range if specified. - # TODO(ekl) might make sense to use a beta distribution instead: - # http://proceedings.mlr.press/v70/chou17a/chou17a.pdf + # Squash to range if specified. We use a sigmoid here this to avoid the + # mean drifting too far past the bounds and causing nan outputs. + # https://github.com/ray-project/ray/issues/1862 if low is not None: self.mean = low + tf.sigmoid(self.mean) * (high - low) diff --git a/python/ray/rllib/models/catalog.py b/python/ray/rllib/models/catalog.py index 35ca2e7c7db3..8f0b8ac82540 100644 --- a/python/ray/rllib/models/catalog.py +++ b/python/ray/rllib/models/catalog.py @@ -200,7 +200,9 @@ def get_model(input_dict, if options.get("use_lstm"): copy = dict(input_dict) copy["obs"] = model.last_layer - model = LSTM(copy, obs_space, num_outputs, options, state_in, + feature_space = gym.spaces.Box( + -1, 1, shape=(model.last_layer.shape[1], )) + model = LSTM(copy, feature_space, num_outputs, options, state_in, seq_lens) logger.debug("Created model {}: ({} of {}, {}, {}) -> {}, {}".format( diff --git a/python/ray/rllib/optimizers/multi_gpu_optimizer.py b/python/ray/rllib/optimizers/multi_gpu_optimizer.py index 7e01ee9041dc..771acb5ac72c 100644 --- a/python/ray/rllib/optimizers/multi_gpu_optimizer.py +++ b/python/ray/rllib/optimizers/multi_gpu_optimizer.py @@ -3,6 +3,7 @@ from __future__ import print_function import logging +import math import numpy as np from collections import defaultdict import tensorflow as tf @@ -44,7 +45,9 @@ def _init(self, if not num_gpus: self.devices = ["/cpu:0"] else: - self.devices = ["/gpu:{}".format(i) for i in range(num_gpus)] + self.devices = [ + "/gpu:{}".format(i) for i in range(int(math.ceil(num_gpus))) + ] self.batch_size = int(sgd_batch_size / len(self.devices)) * len( self.devices) assert self.batch_size % len(self.devices) == 0 diff --git a/python/ray/rllib/test/test_external_env.py b/python/ray/rllib/test/test_external_env.py index c574ba633bac..f7e8308a5ff1 100644 --- a/python/ray/rllib/test/test_external_env.py +++ b/python/ray/rllib/test/test_external_env.py @@ -192,8 +192,7 @@ def testExternalEnvHorizonNotSupported(self): episode_horizon=20, batch_steps=10, batch_mode="complete_episodes") - ev.sample() - self.assertRaises(Exception, lambda: ev.sample()) + self.assertRaises(ValueError, lambda: ev.sample()) if __name__ == '__main__': diff --git a/python/ray/rllib/test/test_nested_spaces.py b/python/ray/rllib/test/test_nested_spaces.py index 3d9a569f604f..490e6af1520a 100644 --- a/python/ray/rllib/test/test_nested_spaces.py +++ b/python/ray/rllib/test/test_nested_spaces.py @@ -174,7 +174,7 @@ def testInvalidModel2(self): }, })) - def doTestNestedDict(self, make_env): + def doTestNestedDict(self, make_env, test_lstm=False): ModelCatalog.register_custom_model("composite", DictSpyModel) register_env("nested", make_env) pg = PGAgent( @@ -184,6 +184,7 @@ def doTestNestedDict(self, make_env): "sample_batch_size": 5, "model": { "custom_model": "composite", + "use_lstm": test_lstm, }, }) pg.train() @@ -230,6 +231,9 @@ def doTestNestedTuple(self, make_env): def testNestedDictGym(self): self.doTestNestedDict(lambda _: NestedDictEnv()) + def testNestedDictGymLSTM(self): + self.doTestNestedDict(lambda _: NestedDictEnv(), test_lstm=True) + def testNestedDictVector(self): self.doTestNestedDict( lambda _: VectorEnv.wrap(lambda i: NestedDictEnv())) diff --git a/python/ray/rllib/test/test_policy_evaluator.py b/python/ray/rllib/test/test_policy_evaluator.py index 5070d2b5a1c4..cf319a7e922b 100644 --- a/python/ray/rllib/test/test_policy_evaluator.py +++ b/python/ray/rllib/test/test_policy_evaluator.py @@ -150,6 +150,20 @@ def to_prev(vec): to_prev(batch["actions"])) self.assertGreater(batch["advantages"][0], 1) + # 11/23/18: Samples per second 8501.125113727468 + def testBaselinePerformance(self): + ev = PolicyEvaluator( + env_creator=lambda _: gym.make("CartPole-v0"), + policy_graph=MockPolicyGraph, + batch_steps=100) + start = time.time() + count = 0 + while time.time() - start < 1: + count += ev.sample().count + print() + print("Samples per second {}".format(count / (time.time() - start))) + print() + def testGlobalVarsUpdate(self): agent = A2CAgent( env="CartPole-v0", diff --git a/python/ray/rllib/test/test_supported_spaces.py b/python/ray/rllib/test/test_supported_spaces.py index 9f9575200438..b98a006bca3b 100644 --- a/python/ray/rllib/test/test_supported_spaces.py +++ b/python/ray/rllib/test/test_supported_spaces.py @@ -112,7 +112,13 @@ def tearDown(self): def testAll(self): stats = {} check_support("IMPALA", {"num_gpus": 0}, stats) - check_support("DDPG", {"timesteps_per_iteration": 1}, stats) + check_support( + "DDPG", { + "noise_scale": 100.0, + "timesteps_per_iteration": 1 + }, + stats, + check_bounds=True) check_support("DQN", {"timesteps_per_iteration": 1}, stats) check_support("A3C", { "num_workers": 1, diff --git a/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml b/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml index 34c60e5219b4..f02399ab33ff 100644 --- a/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml +++ b/python/ray/rllib/tuned_examples/halfcheetah-ddpg.yaml @@ -34,8 +34,9 @@ halfcheetah-ddpg: clip_rewards: False # === Optimization === - actor_lr: 0.0001 - critic_lr: 0.001 + lr: 0.001 + actor_loss_coeff: 0.1 + critic_loss_coeff: 1.0 use_huber: False huber_threshold: 1.0 l2_reg: 0.000001 diff --git a/python/ray/services.py b/python/ray/services.py index 981d06160674..841fababd1b5 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1162,7 +1162,6 @@ def start_worker(node_ip_address, sys.executable, "-u", worker_path, "--node-ip-address=" + node_ip_address, "--object-store-name=" + object_store_name, - "--local-scheduler-name=" + local_scheduler_name, "--redis-address=" + str(redis_address), "--temp-dir=" + get_temp_root() ] diff --git a/python/ray/worker.py b/python/ray/worker.py index bceaa6aab87c..f68bb42886f0 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -713,7 +713,7 @@ def run_function_on_all_workers(self, function, "driver_id": self.task_driver_id.id(), "function_id": function_to_run_id, "function": pickled_function, - "run_on_other_drivers": run_on_other_drivers + "run_on_other_drivers": str(run_on_other_drivers) }) self.redis_client.rpush("Exports", key) # TODO(rkn): If the worker fails after it calls setnx and before it @@ -1446,12 +1446,8 @@ def _init(address_info=None, # Use 1 local scheduler if num_local_schedulers is not provided. If # existing local schedulers are provided, use that count as # num_local_schedulers. - local_schedulers = address_info.get("local_scheduler_socket_names", []) if num_local_schedulers is None: - if len(local_schedulers) > 0: - num_local_schedulers = len(local_schedulers) - else: - num_local_schedulers = 1 + num_local_schedulers = 1 # Use 1 additional redis shard if num_redis_shards is not provided. num_redis_shards = 1 if num_redis_shards is None else num_redis_shards @@ -2013,13 +2009,13 @@ def connect(info, "driver_id": worker.worker_id, "start_time": time.time(), "plasma_store_socket": info["store_socket_name"], - "local_scheduler_socket": info.get("local_scheduler_socket_name"), "raylet_socket": info.get("raylet_socket_name") } driver_info["name"] = (main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE") worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info) - if not worker.redis_client.exists("webui"): + if (not worker.redis_client.exists("webui") + and info["webui_url"] is not None): worker.redis_client.hmset("webui", {"url": info["webui_url"]}) is_worker = False elif mode == WORKER_MODE: @@ -2027,7 +2023,6 @@ def connect(info, worker_dict = { "node_ip_address": worker.node_ip_address, "plasma_store_socket": info["store_socket_name"], - "local_scheduler_socket": info["local_scheduler_socket_name"] } if redirect_worker_output: worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name) @@ -2041,7 +2036,7 @@ def connect(info, worker.plasma_client = thread_safe_client( plasma.connect(info["store_socket_name"], "", 64)) - local_scheduler_socket = info["raylet_socket_name"] + raylet_socket = info["raylet_socket_name"] # If this is a driver, set the current task ID, the task driver ID, and set # the task index to 0. @@ -2100,8 +2095,7 @@ def connect(info, worker.multithreading_warned = False worker.local_scheduler_client = ray.raylet.LocalSchedulerClient( - local_scheduler_socket, worker.worker_id, is_worker, - worker.current_task_id) + raylet_socket, worker.worker_id, is_worker, worker.current_task_id) # Start the import thread import_thread.ImportThread(worker, mode).start() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 4ec9e4d14e56..b9c9500e7087 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -40,11 +40,6 @@ required=False, type=str, help="the object store manager's name") -parser.add_argument( - "--local-scheduler-name", - required=False, - type=str, - help="the local scheduler's name") parser.add_argument( "--raylet-name", required=False, type=str, help="the raylet's name") parser.add_argument( @@ -76,7 +71,6 @@ "redis_password": args.redis_password, "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, - "local_scheduler_socket_name": args.local_scheduler_name, "raylet_socket_name": args.raylet_name } diff --git a/python/setup.py b/python/setup.py index 0ad412e1878c..7198de2329bf 100644 --- a/python/setup.py +++ b/python/setup.py @@ -131,6 +131,23 @@ def find_version(*filepath): raise RuntimeError("Unable to find version string.") +requires = [ + "numpy", + "funcsigs", + "click", + "colorama", + "pytest", + "pyyaml", + "redis", + "setproctitle", + # The six module is required by pyarrow. + "six >= 1.0.0", + "flatbuffers", +] + +if sys.version_info < (3, 0): + requires.append("faulthandler") + setup( name="ray", version=find_version("ray", "__init__.py"), @@ -144,20 +161,7 @@ def find_version(*filepath): cmdclass={"build_ext": build_ext}, # The BinaryDistribution argument triggers build_ext. distclass=BinaryDistribution, - install_requires=[ - "numpy", - "funcsigs", - "click", - "colorama", - "pytest", - "pyyaml", - "redis~=2.10.6", - "faulthandler;python_version<'3'", - "setproctitle", - # The six module is required by pyarrow. - "six >= 1.0.0", - "flatbuffers" - ], + install_requires=requires, setup_requires=["cython >= 0.27, < 0.28"], extras_require=extras, entry_points={ diff --git a/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh b/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh deleted file mode 100644 index c4505832c16a..000000000000 --- a/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/sh - -# TODO: Test AC3 -ALGS='DQN PPO' -GYM_ENV='CartPole-v0' - -for ALG in $ALGS -do - EXPERIMENT_NAME=$GYM_ENV'_'$ALG - python /ray/python/ray/rllib/train.py --run $ALG --env $GYM_ENV \ - --stop '{"training_iteration": 2}' --experiment-name $EXPERIMENT_NAME \ - --checkpoint-freq 1 - - EXPERIMENT_PATH='/tmp/ray/'$EXPERIMENT_NAME - CHECKPOINT_FOLDER=$(ls $EXPERIMENT_PATH) - CHECKPOINT=$EXPERIMENT_PATH'/'$CHECKPOINT_FOLDER'/checkpoint-1' - - python /ray/python/ray/rllib/eval.py $CHECKPOINT --run $ALG \ - --env $GYM_ENV --no-render - - # Clean up - rm -rf $EXPERIMENT_PATH -done diff --git a/test/jenkins_tests/run_multi_node_tests.sh b/test/jenkins_tests/run_multi_node_tests.sh index 2ea97d4926ab..0d19a06f7b6c 100755 --- a/test/jenkins_tests/run_multi_node_tests.sh +++ b/test/jenkins_tests/run_multi_node_tests.sh @@ -53,6 +53,14 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"simple_optimizer": true, "num_sgd_iter": 2, "model": {"use_lstm": true}}' +docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ + python /ray/python/ray/rllib/train.py \ + --env CartPole-v1 \ + --run PPO \ + --stop '{"training_iteration": 2}' \ + --config '{"num_gpus": 0.1}' \ + --ray-num-gpus 1 + docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/train.py \ --env CartPole-v1 \ @@ -234,9 +242,6 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"num_workers": 2, "optimizer": {"num_replay_buffer_shards": 1}, "learning_starts": 100, "min_iter_time_s": 1}' -docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ - sh /ray/test/jenkins_tests/multi_node_tests/test_rllib_eval.sh - docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \ python /ray/python/ray/rllib/test/test_local.py diff --git a/test/runtest.py b/test/runtest.py index cea97090576b..767960a16658 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2326,7 +2326,6 @@ def f(): assert len(worker_info) >= num_workers for worker_id, info in worker_info.items(): assert "node_ip_address" in info - assert "local_scheduler_socket" in info assert "plasma_store_socket" in info assert "stderr_file" in info assert "stdout_file" in info